aboutsummaryrefslogtreecommitdiffstats
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/clm/api/clusterhandler_test.go6
-rw-r--r--src/clm/pkg/cluster/cluster.go14
-rw-r--r--src/monitor/build/Dockerfile21
-rw-r--r--src/monitor/deploy/cluster_role.yaml72
-rw-r--r--src/monitor/deploy/clusterrole_binding.yaml12
-rwxr-xr-xsrc/monitor/deploy/monitor-cleanup.sh6
-rwxr-xr-xsrc/monitor/deploy/monitor-deploy.sh6
-rw-r--r--src/monitor/deploy/operator.yaml14
-rw-r--r--src/monitor/deploy/role.yaml12
-rw-r--r--src/monitor/go.mod2
-rw-r--r--src/monitor/pkg/apis/k8splugin/v1alpha1/types.go4
-rw-r--r--src/ncm/go.mod2
-rw-r--r--src/ncm/internal/grpc/rsyncclient.go41
-rw-r--r--src/ncm/pkg/scheduler/scheduler.go43
-rw-r--r--src/orchestrator/api/api.go2
-rw-r--r--src/orchestrator/api/instantiation_handler.go46
-rw-r--r--src/orchestrator/go.mod15
-rw-r--r--src/orchestrator/pkg/appcontext/appcontext.go56
-rw-r--r--src/orchestrator/pkg/appcontext/appcontext_test.go4
-rw-r--r--src/orchestrator/pkg/grpc/installappclient/client.go66
-rw-r--r--src/orchestrator/pkg/module/deployment_intent_groups.go40
-rw-r--r--src/orchestrator/pkg/module/instantiation.go138
-rw-r--r--src/orchestrator/pkg/module/instantiation_appcontext_helper.go43
-rw-r--r--src/orchestrator/pkg/module/instantiation_scheduler_helper.go16
-rw-r--r--src/orchestrator/pkg/rtcontext/rtcontext.go26
-rw-r--r--src/rsync/go.mod5
-rw-r--r--src/rsync/pkg/connector/connector.go6
-rw-r--r--src/rsync/pkg/context/context.go400
-rw-r--r--src/rsync/pkg/resource/resource.go91
-rw-r--r--src/rsync/pkg/status/status.go222
30 files changed, 1048 insertions, 383 deletions
diff --git a/src/clm/api/clusterhandler_test.go b/src/clm/api/clusterhandler_test.go
index 7095e87c..4bbc91b1 100644
--- a/src/clm/api/clusterhandler_test.go
+++ b/src/clm/api/clusterhandler_test.go
@@ -102,12 +102,12 @@ func (m *mockClusterManager) GetClusterContent(provider, name string) (cluster.C
return m.ClusterContentItems[0], nil
}
-func (m *mockClusterManager) GetClusterContext(provider, name string) (appcontext.AppContext, error) {
+func (m *mockClusterManager) GetClusterContext(provider, name string) (appcontext.AppContext, string, error) {
if m.Err != nil {
- return appcontext.AppContext{}, m.Err
+ return appcontext.AppContext{}, "", m.Err
}
- return m.ClusterContextItems[0], nil
+ return m.ClusterContextItems[0], "", nil
}
func (m *mockClusterManager) GetClusters(provider string) ([]cluster.Cluster, error) {
diff --git a/src/clm/pkg/cluster/cluster.go b/src/clm/pkg/cluster/cluster.go
index 06faafd2..ac7f31f7 100644
--- a/src/clm/pkg/cluster/cluster.go
+++ b/src/clm/pkg/cluster/cluster.go
@@ -101,7 +101,7 @@ type ClusterManager interface {
CreateCluster(provider string, pr Cluster, qr ClusterContent) (Cluster, error)
GetCluster(provider, name string) (Cluster, error)
GetClusterContent(provider, name string) (ClusterContent, error)
- GetClusterContext(provider, name string) (appcontext.AppContext, error)
+ GetClusterContext(provider, name string) (appcontext.AppContext, string, error)
GetClusters(provider string) ([]Cluster, error)
GetClustersWithLabel(provider, label string) ([]string, error)
DeleteCluster(provider, name string) error
@@ -310,7 +310,7 @@ func (v *ClusterClient) GetClusterContent(provider, name string) (ClusterContent
}
// GetClusterContext returns the AppContext for corresponding provider and name
-func (v *ClusterClient) GetClusterContext(provider, name string) (appcontext.AppContext, error) {
+func (v *ClusterClient) GetClusterContext(provider, name string) (appcontext.AppContext, string, error) {
//Construct key and tag to select the entry
key := ClusterKey{
ClusterProviderName: provider,
@@ -319,7 +319,7 @@ func (v *ClusterClient) GetClusterContext(provider, name string) (appcontext.App
value, err := db.DBconn.Find(v.db.storeName, key, v.db.tagContext)
if err != nil {
- return appcontext.AppContext{}, pkgerrors.Wrap(err, "Get Cluster Context")
+ return appcontext.AppContext{}, "", pkgerrors.Wrap(err, "Get Cluster Context")
}
//value is a byte array
@@ -328,12 +328,12 @@ func (v *ClusterClient) GetClusterContext(provider, name string) (appcontext.App
var cc appcontext.AppContext
_, err = cc.LoadAppContext(ctxVal)
if err != nil {
- return appcontext.AppContext{}, pkgerrors.Wrap(err, "Reinitializing Cluster AppContext")
+ return appcontext.AppContext{}, "", pkgerrors.Wrap(err, "Reinitializing Cluster AppContext")
}
- return cc, nil
+ return cc, ctxVal, nil
}
- return appcontext.AppContext{}, pkgerrors.New("Error getting Cluster AppContext")
+ return appcontext.AppContext{}, "", pkgerrors.New("Error getting Cluster AppContext")
}
// GetClusters returns all the Clusters for corresponding provider
@@ -393,7 +393,7 @@ func (v *ClusterClient) DeleteCluster(provider, name string) error {
ClusterProviderName: provider,
ClusterName: name,
}
- _, err := v.GetClusterContext(provider, name)
+ _, _, err := v.GetClusterContext(provider, name)
if err == nil {
return pkgerrors.Errorf("Cannot delete cluster until context is deleted: %v, %v", provider, name)
}
diff --git a/src/monitor/build/Dockerfile b/src/monitor/build/Dockerfile
index 812eb47e..9ecff169 100644
--- a/src/monitor/build/Dockerfile
+++ b/src/monitor/build/Dockerfile
@@ -1,15 +1,16 @@
-FROM registry.access.redhat.com/ubi7/ubi-minimal:latest
+FROM golang:1.14.1
-ENV OPERATOR=/usr/local/bin/monitor \
- USER_UID=1001 \
- USER_NAME=monitor
+WORKDIR /go/src/github.com/onap/multicloud-k8s/src/monitor
+COPY ./ ./
+RUN make all
-# install operator binary
-COPY _output/bin/monitor ${OPERATOR}
+FROM ubuntu:16.04
-COPY bin /usr/local/bin
-RUN /usr/local/bin/user_setup
+WORKDIR /opt/monitor
+RUN groupadd -r monitor && useradd -r -g monitor monitor
+RUN chown monitor:monitor /opt/monitor -R
+COPY --chown=monitor --from=0 /go/src/github.com/onap/multicloud-k8s/src/monitor/monitor ./
-ENTRYPOINT ["/usr/local/bin/entrypoint"]
+USER monitor
+ENTRYPOINT ["/opt/monitor/monitor"]
-USER ${USER_UID}
diff --git a/src/monitor/deploy/cluster_role.yaml b/src/monitor/deploy/cluster_role.yaml
new file mode 100644
index 00000000..0732e8d3
--- /dev/null
+++ b/src/monitor/deploy/cluster_role.yaml
@@ -0,0 +1,72 @@
+apiVersion: rbac.authorization.k8s.io/v1
+kind: ClusterRole
+metadata:
+ creationTimestamp: null
+ name: monitor
+rules:
+- apiGroups:
+ - ""
+ resources:
+ - pods
+ - services
+ - endpoints
+ - persistentvolumeclaims
+ - events
+ - configmaps
+ - secrets
+ verbs:
+ - '*'
+- apiGroups:
+ - apps
+ resources:
+ - deployments
+ - daemonsets
+ - replicasets
+ - statefulsets
+ verbs:
+ - '*'
+- apiGroups:
+ - monitoring.coreos.com
+ resources:
+ - servicemonitors
+ verbs:
+ - get
+ - create
+- apiGroups:
+ - apps
+ resourceNames:
+ - monitor
+ resources:
+ - deployments/finalizers
+ verbs:
+ - update
+- apiGroups:
+ - ""
+ resources:
+ - pods
+ verbs:
+ - get
+- apiGroups:
+ - apps
+ resources:
+ - replicasets
+ verbs:
+ - get
+- apiGroups:
+ - k8splugin.io
+ resources:
+ - '*'
+ verbs:
+ - '*'
+- apiGroups:
+ - batch
+ resources:
+ - '*'
+ verbs:
+ - '*'
+- apiGroups:
+ - extensions
+ resources:
+ - '*'
+ verbs:
+ - '*'
diff --git a/src/monitor/deploy/clusterrole_binding.yaml b/src/monitor/deploy/clusterrole_binding.yaml
new file mode 100644
index 00000000..73e74039
--- /dev/null
+++ b/src/monitor/deploy/clusterrole_binding.yaml
@@ -0,0 +1,12 @@
+kind: ClusterRoleBinding
+apiVersion: rbac.authorization.k8s.io/v1
+metadata:
+ name: monitor
+subjects:
+- kind: ServiceAccount
+ name: monitor
+ namespace: default
+roleRef:
+ kind: ClusterRole
+ name: monitor
+ apiGroup: rbac.authorization.k8s.io
diff --git a/src/monitor/deploy/monitor-cleanup.sh b/src/monitor/deploy/monitor-cleanup.sh
new file mode 100755
index 00000000..21725073
--- /dev/null
+++ b/src/monitor/deploy/monitor-cleanup.sh
@@ -0,0 +1,6 @@
+kubectl delete rolebinding monitor
+kubectl delete clusterrolebinding monitor
+kubectl delete role monitor
+kubectl delete clusterrole monitor
+kubectl delete serviceaccount monitor
+kubectl delete deploy monitor
diff --git a/src/monitor/deploy/monitor-deploy.sh b/src/monitor/deploy/monitor-deploy.sh
new file mode 100755
index 00000000..47c7120f
--- /dev/null
+++ b/src/monitor/deploy/monitor-deploy.sh
@@ -0,0 +1,6 @@
+kubectl apply -f role.yaml
+kubectl apply -f cluster_role.yaml
+kubectl apply -f role_binding.yaml
+kubectl apply -f clusterrole_binding.yaml
+kubectl apply -f service_account.yaml
+kubectl apply -f operator.yaml
diff --git a/src/monitor/deploy/operator.yaml b/src/monitor/deploy/operator.yaml
index a06c07d3..93e4522c 100644
--- a/src/monitor/deploy/operator.yaml
+++ b/src/monitor/deploy/operator.yaml
@@ -3,30 +3,28 @@ kind: Deployment
metadata:
name: monitor
labels:
- "emco/deployment-id": "bionic-beaver"
+ "emco/deployment-id": "monitor"
spec:
replicas: 1
selector:
matchLabels:
- "emco/deployment-id": "bionic-beaver"
+ "emco/deployment-id": "monitor"
template:
metadata:
labels:
- "emco/deployment-id": "bionic-beaver"
+ "emco/deployment-id": "monitor"
spec:
serviceAccountName: monitor
containers:
- name: monitor
# Replace this with the built image name
- image: k8splugin.io/monitor:latest
+ image: ewmduck/monitor:latest
command:
- - monitor
+ - /opt/monitor/monitor
imagePullPolicy: IfNotPresent
env:
- name: WATCH_NAMESPACE
- valueFrom:
- fieldRef:
- fieldPath: metadata.namespace
+ value: ""
- name: POD_NAME
valueFrom:
fieldRef:
diff --git a/src/monitor/deploy/role.yaml b/src/monitor/deploy/role.yaml
index 4d0fd1b6..c48141ac 100644
--- a/src/monitor/deploy/role.yaml
+++ b/src/monitor/deploy/role.yaml
@@ -58,3 +58,15 @@ rules:
- '*'
verbs:
- '*'
+- apiGroups:
+ - batch
+ resources:
+ - '*'
+ verbs:
+ - '*'
+- apiGroups:
+ - extensions
+ resources:
+ - '*'
+ verbs:
+ - '*'
diff --git a/src/monitor/go.mod b/src/monitor/go.mod
index ec48d268..6eff59af 100644
--- a/src/monitor/go.mod
+++ b/src/monitor/go.mod
@@ -32,6 +32,4 @@ replace (
sigs.k8s.io/controller-tools => sigs.k8s.io/controller-tools v0.1.11-0.20190411181648-9d55346c2bde
)
-// Remove hg dependency using this mirror
replace github.com/operator-framework/operator-sdk => github.com/operator-framework/operator-sdk v0.9.0
-
diff --git a/src/monitor/pkg/apis/k8splugin/v1alpha1/types.go b/src/monitor/pkg/apis/k8splugin/v1alpha1/types.go
index 231f226e..064591fc 100644
--- a/src/monitor/pkg/apis/k8splugin/v1alpha1/types.go
+++ b/src/monitor/pkg/apis/k8splugin/v1alpha1/types.go
@@ -15,8 +15,8 @@ import (
// +kubebuilder:subresource:status
// +genclient
type ResourceBundleState struct {
- metav1.TypeMeta `json:",inline"`
- metav1.ObjectMeta `json:"metadata,omitempty" protobuf:"bytes,1,opt,name=metadata"`
+ metav1.TypeMeta `json:",inline" yaml:",inline"`
+ metav1.ObjectMeta `json:"metadata,omitempty" protobuf:"bytes,1,opt,name=metadata" yaml:"metadata"`
Spec ResourceBundleStateSpec `json:"spec,omitempty" protobuf:"bytes,2,opt,name=spec"`
Status ResourceBundleStatus `json:"status,omitempty" protobuf:"bytes,3,opt,name=status"`
diff --git a/src/ncm/go.mod b/src/ncm/go.mod
index 233d8880..d3d2924a 100644
--- a/src/ncm/go.mod
+++ b/src/ncm/go.mod
@@ -5,6 +5,8 @@ require (
github.com/gorilla/handlers v1.3.0
github.com/gorilla/mux v1.7.3
github.com/k8snetworkplumbingwg/network-attachment-definition-client v0.0.0-20200127152046-0ee521d56061
+ github.com/onap/multicloud-k8s/src/clm v0.0.0-00010101000000-000000000000
+ github.com/onap/multicloud-k8s/src/orchestrator v0.0.0-20200601021239-7959bd4c6fd4
github.com/pkg/errors v0.8.1
google.golang.org/grpc v1.27.1
gopkg.in/yaml.v2 v2.2.8
diff --git a/src/ncm/internal/grpc/rsyncclient.go b/src/ncm/internal/grpc/rsyncclient.go
deleted file mode 100644
index 5eb870a7..00000000
--- a/src/ncm/internal/grpc/rsyncclient.go
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
-Copyright 2020 Intel Corporation.
-Licensed under the Apache License, Version 2.0 (the "License");
-you may not use this file except in compliance with the License.
-You may obtain a copy of the License at
- http://www.apache.org/licenses/LICENSE-2.0
-Unless required by applicable law or agreed to in writing, software
-distributed under the License is distributed on an "AS IS" BASIS,
-WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-See the License for the specific language governing permissions and
-limitations under the License.
-*/
-
-package grpc
-
-import (
- log "github.com/onap/multicloud-k8s/src/orchestrator/pkg/infra/logutils"
- "github.com/onap/multicloud-k8s/src/orchestrator/pkg/infra/rpc"
- controller "github.com/onap/multicloud-k8s/src/orchestrator/pkg/module/controller"
-)
-
-const RsyncName = "rsync"
-
-// InitRsyncClient initializes connctions to the Resource Synchronizer serivice
-func InitRsyncClient() bool {
- client := controller.NewControllerClient()
-
- vals, _ := client.GetControllers()
- found := false
- for _, v := range vals {
- if v.Metadata.Name == RsyncName {
- log.Info("Initializing RPC connection to resource synchronizer", log.Fields{
- "Controller": v.Metadata.Name,
- })
- rpc.UpdateRpcConn(v.Metadata.Name, v.Spec.Host, v.Spec.Port)
- found = true
- break
- }
- }
- return found
-}
diff --git a/src/ncm/pkg/scheduler/scheduler.go b/src/ncm/pkg/scheduler/scheduler.go
index 29d67662..4886a67e 100644
--- a/src/ncm/pkg/scheduler/scheduler.go
+++ b/src/ncm/pkg/scheduler/scheduler.go
@@ -17,20 +17,16 @@
package scheduler
import (
- "context"
"encoding/json"
- "time"
clusterPkg "github.com/onap/multicloud-k8s/src/clm/pkg/cluster"
- "github.com/onap/multicloud-k8s/src/ncm/internal/grpc"
oc "github.com/onap/multicloud-k8s/src/ncm/internal/ovncontroller"
ncmtypes "github.com/onap/multicloud-k8s/src/ncm/pkg/module/types"
nettypes "github.com/onap/multicloud-k8s/src/ncm/pkg/networkintents/types"
appcontext "github.com/onap/multicloud-k8s/src/orchestrator/pkg/appcontext"
+ "github.com/onap/multicloud-k8s/src/orchestrator/pkg/grpc/installappclient"
"github.com/onap/multicloud-k8s/src/orchestrator/pkg/infra/db"
log "github.com/onap/multicloud-k8s/src/orchestrator/pkg/infra/logutils"
- "github.com/onap/multicloud-k8s/src/orchestrator/pkg/infra/rpc"
- installpb "github.com/onap/multicloud-k8s/src/rsync/pkg/grpc/installapp"
pkgerrors "github.com/pkg/errors"
)
@@ -63,7 +59,7 @@ func NewSchedulerClient() *SchedulerClient {
// Apply Network Intents associated with a cluster
func (v *SchedulerClient) ApplyNetworkIntents(clusterProvider, cluster string) error {
- _, err := clusterPkg.NewClusterClient().GetClusterContext(clusterProvider, cluster)
+ _, _, err := clusterPkg.NewClusterClient().GetClusterContext(clusterProvider, cluster)
if err == nil {
return pkgerrors.Errorf("Cluster network intents have already been applied: %v, %v", clusterProvider, cluster)
}
@@ -157,30 +153,9 @@ func (v *SchedulerClient) ApplyNetworkIntents(clusterProvider, cluster string) e
}
// call resource synchronizer to instantiate the CRs in the cluster
- conn := rpc.GetRpcConn(grpc.RsyncName)
- if conn == nil {
- grpc.InitRsyncClient()
- conn = rpc.GetRpcConn(grpc.RsyncName)
- }
-
- var rpcClient installpb.InstallappClient
- var installRes *installpb.InstallAppResponse
- ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
- defer cancel()
-
- if conn != nil {
- rpcClient = installpb.NewInstallappClient(conn)
- installReq := new(installpb.InstallAppRequest)
- installReq.AppContext = ctxVal.(string)
- installRes, err = rpcClient.InstallApp(ctx, installReq)
- if err == nil {
- log.Info("Response from InstappApp GRPC call", log.Fields{
- "Succeeded": installRes.AppContextInstalled,
- "Message": installRes.AppContextInstallMessage,
- })
- }
- } else {
- return pkgerrors.Errorf("InstallApp Failed - Could not get InstallAppClient: %v", grpc.RsyncName)
+ err = installappclient.InvokeInstallApp(ctxVal.(string))
+ if err != nil {
+ return err
}
return nil
@@ -188,12 +163,16 @@ func (v *SchedulerClient) ApplyNetworkIntents(clusterProvider, cluster string) e
// Terminate Network Intents associated with a cluster
func (v *SchedulerClient) TerminateNetworkIntents(clusterProvider, cluster string) error {
- context, err := clusterPkg.NewClusterClient().GetClusterContext(clusterProvider, cluster)
+ context, ctxVal, err := clusterPkg.NewClusterClient().GetClusterContext(clusterProvider, cluster)
if err != nil {
return pkgerrors.Wrapf(err, "Error finding AppContext for cluster: %v, %v", clusterProvider, cluster)
}
- // TODO: call resource synchronizer to terminate the CRs in the cluster
+ // call resource synchronizer to terminate the CRs in the cluster
+ err = installappclient.InvokeUninstallApp(ctxVal)
+ if err != nil {
+ return err
+ }
// remove the app context
cleanuperr := context.DeleteCompositeApp()
diff --git a/src/orchestrator/api/api.go b/src/orchestrator/api/api.go
index 2470a1be..5abbb96d 100644
--- a/src/orchestrator/api/api.go
+++ b/src/orchestrator/api/api.go
@@ -180,6 +180,8 @@ func NewRouter(projectClient moduleLib.ProjectManager,
}
router.HandleFunc("/projects/{project-name}/composite-apps/{composite-app-name}/{composite-app-version}/deployment-intent-groups/{deployment-intent-group-name}/instantiate", instantiationHandler.instantiateHandler).Methods("POST")
+ router.HandleFunc("/projects/{project-name}/composite-apps/{composite-app-name}/{composite-app-version}/deployment-intent-groups/{deployment-intent-group-name}/terminate", instantiationHandler.terminateHandler).Methods("POST")
+ router.HandleFunc("/projects/{project-name}/composite-apps/{composite-app-name}/{composite-app-version}/deployment-intent-groups/{deployment-intent-group-name}/status", instantiationHandler.statusHandler).Methods("GET")
return router
}
diff --git a/src/orchestrator/api/instantiation_handler.go b/src/orchestrator/api/instantiation_handler.go
index c95785f2..ce50e5b8 100644
--- a/src/orchestrator/api/instantiation_handler.go
+++ b/src/orchestrator/api/instantiation_handler.go
@@ -17,9 +17,11 @@
package api
import (
+ "encoding/json"
+ "net/http"
+
"github.com/gorilla/mux"
moduleLib "github.com/onap/multicloud-k8s/src/orchestrator/pkg/module"
- "net/http"
)
/* Used to store backend implementation objects
@@ -45,3 +47,45 @@ func (h instantiationHandler) instantiateHandler(w http.ResponseWriter, r *http.
w.WriteHeader(http.StatusAccepted)
}
+
+func (h instantiationHandler) terminateHandler(w http.ResponseWriter, r *http.Request) {
+
+ vars := mux.Vars(r)
+ p := vars["project-name"]
+ ca := vars["composite-app-name"]
+ v := vars["composite-app-version"]
+ di := vars["deployment-intent-group-name"]
+
+ iErr := h.client.Terminate(p, ca, v, di)
+ if iErr != nil {
+ http.Error(w, iErr.Error(), http.StatusInternalServerError)
+ return
+ }
+ w.WriteHeader(http.StatusAccepted)
+
+}
+
+func (h instantiationHandler) statusHandler(w http.ResponseWriter, r *http.Request) {
+
+ vars := mux.Vars(r)
+ p := vars["project-name"]
+ ca := vars["composite-app-name"]
+ v := vars["composite-app-version"]
+ di := vars["deployment-intent-group-name"]
+
+ status, iErr := h.client.Status(p, ca, v, di)
+ if iErr != nil {
+ http.Error(w, iErr.Error(), http.StatusInternalServerError)
+ return
+ }
+
+ w.Header().Set("Content-Type", "application/json")
+ w.WriteHeader(http.StatusOK)
+ iErr = json.NewEncoder(w).Encode(status)
+ if iErr != nil {
+ http.Error(w, iErr.Error(), http.StatusInternalServerError)
+ return
+ }
+ w.WriteHeader(http.StatusAccepted)
+
+}
diff --git a/src/orchestrator/go.mod b/src/orchestrator/go.mod
index 223dc068..3f14f00b 100644
--- a/src/orchestrator/go.mod
+++ b/src/orchestrator/go.mod
@@ -2,17 +2,12 @@ module github.com/onap/multicloud-k8s/src/orchestrator
require (
github.com/MakeNowJust/heredoc v1.0.0 // indirect
- github.com/Masterminds/goutils v1.1.0 // indirect
github.com/Masterminds/semver v1.5.0 // indirect
github.com/Masterminds/sprig v2.22.0+incompatible // indirect
github.com/chai2010/gettext-go v0.0.0-20170215093142-bf70f2a70fb1
github.com/coreos/etcd v3.3.12+incompatible
- github.com/cyphar/filepath-securejoin v0.2.2 // indirect
github.com/docker/docker v1.13.1 // indirect
- github.com/docker/spdystream v0.0.0-20181023171402-6480d4af844c // indirect
- github.com/exponent-io/jsonpath v0.0.0-20151013193312-d6023ce2651d // indirect
github.com/ghodss/yaml v1.0.0
- github.com/gobwas/glob v0.2.3 // indirect
github.com/golang/protobuf v1.4.1
github.com/gorilla/handlers v1.3.0
github.com/gorilla/mux v1.7.3
@@ -21,34 +16,32 @@ require (
github.com/lib/pq v1.6.0 // indirect
github.com/liggitt/tabwriter v0.0.0-20181228230101-89fcab3d43de // indirect
github.com/mitchellh/copystructure v1.0.0 // indirect
- github.com/mitchellh/go-wordwrap v1.0.0 // indirect
github.com/onap/multicloud-k8s/src/clm v0.0.0-00010101000000-000000000000
+ github.com/onap/multicloud-k8s/src/monitor v0.0.0-20200630152613-7c20f73e7c5d
github.com/onap/multicloud-k8s/src/ncm v0.0.0-20200515060444-c77850a75eee
+ github.com/onap/multicloud-k8s/src/rsync v0.0.0-20200630152613-7c20f73e7c5d
github.com/pkg/errors v0.8.1
github.com/rubenv/sql-migrate v0.0.0-20200429072036-ae26b214fa43 // indirect
github.com/russross/blackfriday v1.5.2
github.com/sirupsen/logrus v1.4.2
github.com/spf13/cobra v1.0.0 // indirect
- github.com/technosophos/moniker v0.0.0-20180509230615-a5dbd03a2245 // indirect
go.etcd.io/etcd v3.3.12+incompatible
go.mongodb.org/mongo-driver v1.0.0
golang.org/x/net v0.0.0-20200301022130-244492dfa37a
google.golang.org/grpc v1.27.1
google.golang.org/protobuf v1.24.0
gopkg.in/square/go-jose.v2 v2.5.1 // indirect
+ gopkg.in/yaml.v2 v2.2.8
gopkg.in/yaml.v3 v3.0.0-20200506231410-2ff61e1afc86
- k8s.io/apiextensions-apiserver v0.0.0-00010101000000-000000000000 // indirect
k8s.io/apimachinery v0.0.0-20190831074630-461753078381
- k8s.io/apiserver v0.0.0-00010101000000-000000000000 // indirect
- k8s.io/cli-runtime v0.0.0-00010101000000-000000000000 // indirect
k8s.io/cloud-provider v0.0.0-00010101000000-000000000000 // indirect
k8s.io/helm v2.14.3+incompatible
sigs.k8s.io/kustomize v2.0.3+incompatible // indirect
- vbom.ml/util v0.0.0-20180919145318-efcd4e0f9787 // indirect
)
replace (
github.com/onap/multicloud-k8s/src/clm => ../clm
+ github.com/onap/multicloud-k8s/src/monitor => ../monitor
k8s.io/api => k8s.io/api v0.0.0-20190409021203-6e4e0e4f393b
k8s.io/apiextensions-apiserver => k8s.io/apiextensions-apiserver v0.0.0-20190409022649-727a075fdec8
k8s.io/apimachinery => k8s.io/apimachinery v0.0.0-20190404173353-6a84e37a896d
diff --git a/src/orchestrator/pkg/appcontext/appcontext.go b/src/orchestrator/pkg/appcontext/appcontext.go
index a847ae32..cdf23bfa 100644
--- a/src/orchestrator/pkg/appcontext/appcontext.go
+++ b/src/orchestrator/pkg/appcontext/appcontext.go
@@ -18,10 +18,11 @@ package appcontext
import (
"fmt"
+ "strings"
+
log "github.com/onap/multicloud-k8s/src/orchestrator/pkg/infra/logutils"
"github.com/onap/multicloud-k8s/src/orchestrator/pkg/rtcontext"
pkgerrors "github.com/pkg/errors"
- "strings"
)
// metaPrefix used for denoting clusterMeta level
@@ -413,6 +414,59 @@ func (ac *AppContext) GetResourceInstruction(appname string, clustername string,
return v, nil
}
+//AddStatus for holding status of all resources under app and cluster
+// handle should be a cluster handle
+func (ac *AppContext) AddStatus(handle interface{}, value interface{}) (interface{}, error) {
+ h, err := ac.rtc.RtcAddStatus(handle, value)
+ if err != nil {
+ return nil, err
+ }
+ log.Info(":: Added status handle ::", log.Fields{"StatusHandler": h})
+
+ return h, nil
+}
+
+//DeleteStatus for the given the handle
+func (ac *AppContext) DeleteStatus(handle interface{}) error {
+ err := ac.rtc.RtcDeletePair(handle)
+ if err != nil {
+ return err
+ }
+ return nil
+}
+
+//Return the handle for status for a given app and cluster
+func (ac *AppContext) GetStatusHandle(appname string, clustername string) (interface{}, error) {
+ if appname == "" {
+ return nil, pkgerrors.Errorf("Not a valid run time context app name")
+ }
+ if clustername == "" {
+ return nil, pkgerrors.Errorf("Not a valid run time context cluster name")
+ }
+
+ rh, err := ac.rtc.RtcGet()
+ if err != nil {
+ return nil, err
+ }
+
+ acrh := fmt.Sprintf("%v", rh) + "app/" + appname + "/cluster/" + clustername + "/status/"
+ hs, err := ac.rtc.RtcGetHandles(acrh)
+ if err != nil {
+ return nil, err
+ }
+ for _, v := range hs {
+ if v == acrh {
+ return v, nil
+ }
+ }
+ return nil, pkgerrors.Errorf("No handle was found for the given resource")
+}
+
+//UpdateStatusValue updates the status value with the given handle
+func (ac *AppContext) UpdateStatusValue(handle interface{}, value interface{}) error {
+ return ac.rtc.RtcUpdateValue(handle, value)
+}
+
//Return all the handles under the composite app
func (ac *AppContext) GetAllHandles(handle interface{}) ([]interface{}, error) {
hs, err := ac.rtc.RtcGetHandles(handle)
diff --git a/src/orchestrator/pkg/appcontext/appcontext_test.go b/src/orchestrator/pkg/appcontext/appcontext_test.go
index 05c73703..92c43113 100644
--- a/src/orchestrator/pkg/appcontext/appcontext_test.go
+++ b/src/orchestrator/pkg/appcontext/appcontext_test.go
@@ -145,6 +145,10 @@ func (c *MockRunTimeContext) RtcUpdateValue(handle interface{}, value interface{
return c.Err
}
+func (rtc *MockRunTimeContext) RtcAddStatus(handle interface{}, value interface{}) (interface{}, error) {
+ return nil, nil
+}
+
func TestCreateCompositeApp(t *testing.T) {
var ac = AppContext{}
testCases := []struct {
diff --git a/src/orchestrator/pkg/grpc/installappclient/client.go b/src/orchestrator/pkg/grpc/installappclient/client.go
index 4c652a84..0e9141a6 100644
--- a/src/orchestrator/pkg/grpc/installappclient/client.go
+++ b/src/orchestrator/pkg/grpc/installappclient/client.go
@@ -19,10 +19,32 @@ import (
log "github.com/onap/multicloud-k8s/src/orchestrator/pkg/infra/logutils"
"github.com/onap/multicloud-k8s/src/orchestrator/pkg/infra/rpc"
+ "github.com/onap/multicloud-k8s/src/orchestrator/pkg/module/controller"
installpb "github.com/onap/multicloud-k8s/src/rsync/pkg/grpc/installapp"
pkgerrors "github.com/pkg/errors"
)
+const rsyncName = "rsync"
+
+// InitRsyncClient initializes connctions to the Resource Synchronizer service
+func initRsyncClient() bool {
+ client := controller.NewControllerClient()
+
+ vals, _ := client.GetControllers()
+ found := false
+ for _, v := range vals {
+ if v.Metadata.Name == rsyncName {
+ log.Info("Initializing RPC connection to resource synchronizer", log.Fields{
+ "Controller": v.Metadata.Name,
+ })
+ rpc.UpdateRpcConn(v.Metadata.Name, v.Spec.Host, v.Spec.Port)
+ found = true
+ break
+ }
+ }
+ return found
+}
+
// InvokeInstallApp will make the grpc call to the resource synchronizer
// or rsync controller.
// rsync will deply the resources in the app context to the clusters as
@@ -34,7 +56,11 @@ func InvokeInstallApp(appContextId string) error {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
- conn := rpc.GetRpcConn("rsync")
+ conn := rpc.GetRpcConn(rsyncName)
+ if conn == nil {
+ initRsyncClient()
+ conn = rpc.GetRpcConn(rsyncName)
+ }
if conn != nil {
rpcClient = installpb.NewInstallappClient(conn)
@@ -64,3 +90,41 @@ func InvokeInstallApp(appContextId string) error {
}
return err
}
+
+func InvokeUninstallApp(appContextId string) error {
+ var err error
+ var rpcClient installpb.InstallappClient
+ var uninstallRes *installpb.UninstallAppResponse
+ ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
+ defer cancel()
+
+ conn := rpc.GetRpcConn("rsync")
+
+ if conn != nil {
+ rpcClient = installpb.NewInstallappClient(conn)
+ uninstallReq := new(installpb.UninstallAppRequest)
+ uninstallReq.AppContext = appContextId
+ uninstallRes, err = rpcClient.UninstallApp(ctx, uninstallReq)
+ if err == nil {
+ log.Info("Response from UninstappApp GRPC call", log.Fields{
+ "Succeeded": uninstallRes.AppContextUninstalled,
+ "Message": uninstallRes.AppContextUninstallMessage,
+ })
+ }
+ } else {
+ return pkgerrors.Errorf("UninstallApp Failed - Could not get InstallAppClient: %v", "rsync")
+ }
+
+ if err == nil {
+ if uninstallRes.AppContextUninstalled {
+ log.Info("UninstallApp Success", log.Fields{
+ "AppContext": appContextId,
+ "Message": uninstallRes.AppContextUninstallMessage,
+ })
+ return nil
+ } else {
+ return pkgerrors.Errorf("UninstallApp Failed: %v", uninstallRes.AppContextUninstallMessage)
+ }
+ }
+ return err
+}
diff --git a/src/orchestrator/pkg/module/deployment_intent_groups.go b/src/orchestrator/pkg/module/deployment_intent_groups.go
index 16a14c7b..0decb68f 100644
--- a/src/orchestrator/pkg/module/deployment_intent_groups.go
+++ b/src/orchestrator/pkg/module/deployment_intent_groups.go
@@ -20,6 +20,7 @@ import (
"encoding/json"
"reflect"
+ appcontext "github.com/onap/multicloud-k8s/src/orchestrator/pkg/appcontext"
"github.com/onap/multicloud-k8s/src/orchestrator/pkg/infra/db"
pkgerrors "github.com/pkg/errors"
@@ -61,6 +62,7 @@ type OverrideValues struct {
type DeploymentIntentGroupManager interface {
CreateDeploymentIntentGroup(d DeploymentIntentGroup, p string, ca string, v string) (DeploymentIntentGroup, error)
GetDeploymentIntentGroup(di string, p string, ca string, v string) (DeploymentIntentGroup, error)
+ GetDeploymentIntentGroupContext(di string, p string, ca string, v string) (appcontext.AppContext, string, error)
DeleteDeploymentIntentGroup(di string, p string, ca string, v string) error
}
@@ -86,6 +88,7 @@ func (dk DeploymentIntentGroupKey) String() string {
type DeploymentIntentGroupClient struct {
storeName string
tagMetaData string
+ tagContext string
}
// NewDeploymentIntentGroupClient return an instance of DeploymentIntentGroupClient which implements DeploymentIntentGroupManager
@@ -93,6 +96,7 @@ func NewDeploymentIntentGroupClient() *DeploymentIntentGroupClient {
return &DeploymentIntentGroupClient{
storeName: "orchestrator",
tagMetaData: "deploymentintentgroupmetadata",
+ tagContext: "contextid",
}
}
@@ -160,6 +164,34 @@ func (c *DeploymentIntentGroupClient) GetDeploymentIntentGroup(di string, p stri
}
+// GetDeploymentIntentGroup returns the DeploymentIntentGroup with a given name, project, compositeApp and version of compositeApp
+func (c *DeploymentIntentGroupClient) GetDeploymentIntentGroupContext(di string, p string, ca string, v string) (appcontext.AppContext, string, error) {
+
+ key := DeploymentIntentGroupKey{
+ Name: di,
+ Project: p,
+ CompositeApp: ca,
+ Version: v,
+ }
+
+ result, err := db.DBconn.Find(c.storeName, key, c.tagContext)
+ if err != nil {
+ return appcontext.AppContext{}, "", pkgerrors.Wrap(err, "Get DeploymentIntentGroup Context error")
+ }
+
+ if result != nil {
+ ctxVal := string(result[0])
+ var cc appcontext.AppContext
+ _, err = cc.LoadAppContext(ctxVal)
+ if err != nil {
+ return appcontext.AppContext{}, "", pkgerrors.Wrap(err, "Error loading DeploymentIntentGroup Appcontext")
+ }
+ return cc, ctxVal, nil
+ }
+
+ return appcontext.AppContext{}, "", pkgerrors.New("Error getting DeploymentIntentGroup AppContext")
+}
+
// DeleteDeploymentIntentGroup deletes a DeploymentIntentGroup
func (c *DeploymentIntentGroupClient) DeleteDeploymentIntentGroup(di string, p string, ca string, v string) error {
k := DeploymentIntentGroupKey{
@@ -168,10 +200,14 @@ func (c *DeploymentIntentGroupClient) DeleteDeploymentIntentGroup(di string, p s
CompositeApp: ca,
Version: v,
}
+ _, _, err := c.GetDeploymentIntentGroupContext(di, p, ca, v)
+ if err == nil {
+ return pkgerrors.Wrap(err, "DeploymentIntentGroup must be terminated before it can be deleted "+di)
+ }
- err := db.DBconn.Remove(c.storeName, k)
+ err = db.DBconn.Remove(c.storeName, k)
if err != nil {
- return pkgerrors.Wrap(err, "Delete DeploymentIntentGroup entry;")
+ return pkgerrors.Wrap(err, "Error deleting DeploymentIntentGroup entry")
}
return nil
diff --git a/src/orchestrator/pkg/module/instantiation.go b/src/orchestrator/pkg/module/instantiation.go
index 043b80f2..1f2e1117 100644
--- a/src/orchestrator/pkg/module/instantiation.go
+++ b/src/orchestrator/pkg/module/instantiation.go
@@ -21,6 +21,7 @@ import (
"encoding/json"
"fmt"
+ rb "github.com/onap/multicloud-k8s/src/monitor/pkg/apis/k8splugin/v1alpha1"
gpic "github.com/onap/multicloud-k8s/src/orchestrator/pkg/gpic"
"github.com/onap/multicloud-k8s/src/orchestrator/pkg/infra/db"
log "github.com/onap/multicloud-k8s/src/orchestrator/pkg/infra/logutils"
@@ -42,17 +43,25 @@ type InstantiationClient struct {
db InstantiationClientDbInfo
}
+type ClusterAppStatus struct {
+ Cluster string
+ App string
+ Status rb.ResourceBundleStatus
+}
+
+type StatusData struct {
+ Data []ClusterAppStatus
+}
+
/*
InstantiationKey used in storing the contextid in the momgodb
It consists of
-GenericPlacementIntentName,
ProjectName,
CompositeAppName,
CompositeAppVersion,
DeploymentIntentGroup
*/
type InstantiationKey struct {
- IntentName string
Project string
CompositeApp string
Version string
@@ -64,6 +73,8 @@ type InstantiationKey struct {
type InstantiationManager interface {
//ApproveInstantiation(p string, ca string, v string, di string) (error)
Instantiate(p string, ca string, v string, di string) error
+ Status(p string, ca string, v string, di string) (StatusData, error)
+ Terminate(p string, ca string, v string, di string) error
}
// InstantiationClientDbInfo consists of storeName and tagContext
@@ -177,6 +188,12 @@ func (c InstantiationClient) Instantiate(p string, ca string, v string, di strin
if err != nil {
return pkgerrors.Wrap(err, "Not finding the deploymentIntentGroup")
}
+
+ _, _, err = NewDeploymentIntentGroupClient().GetDeploymentIntentGroupContext(di, p, ca, v)
+ if err == nil {
+ return pkgerrors.Errorf("DeploymentIntentGroup has already been instantiated: " + di)
+ }
+
rName := dIGrp.Spec.Version //rName is releaseName
overrideValues := dIGrp.Spec.OverrideValuesObj
cp := dIGrp.Spec.Profile
@@ -229,6 +246,12 @@ func (c InstantiationClient) Instantiate(p string, ca string, v string, di strin
return pkgerrors.Wrapf(err, "Unable to get the resources for app :: %s", eachApp.Metadata.Name)
}
+ statusResource, err := getStatusResource(ctxval.(string), eachApp.Metadata.Name)
+ if err != nil {
+ return pkgerrors.Wrapf(err, "Unable to generate the status resource for app :: %s", eachApp.Metadata.Name)
+ }
+ resources = append(resources, statusResource)
+
specData, err := NewAppIntentClient().GetAllIntentsByApp(eachApp.Metadata.Name, p, ca, v, gIntent)
if err != nil {
return pkgerrors.Wrap(err, "Unable to get the intents for app")
@@ -269,12 +292,11 @@ func (c InstantiationClient) Instantiate(p string, ca string, v string, di strin
//END: storing into etcd
// BEGIN:: save the context in the orchestrator db record
- key := InstantiationKey{
- IntentName: gIntent,
- Project: p,
- CompositeApp: ca,
- Version: v,
- DeploymentIntentGroup: di,
+ key := DeploymentIntentGroupKey{
+ Name: di,
+ Project: p,
+ CompositeApp: ca,
+ Version: v,
}
err = db.DBconn.Insert(c.db.storeName, key, nil, c.db.tagContext, ctxval)
@@ -315,7 +337,7 @@ func (c InstantiationClient) Instantiate(p string, ca string, v string, di strin
// END: Scheduler code
// BEGIN : Rsync code
- err = callRsync(ctxval)
+ err = callRsyncInstall(ctxval)
if err != nil {
return err
}
@@ -324,3 +346,101 @@ func (c InstantiationClient) Instantiate(p string, ca string, v string, di strin
log.Info(":: Done with instantiation... ::", log.Fields{"CompositeAppName": ca})
return err
}
+
+/*
+Status takes in projectName, compositeAppName, compositeAppVersion,
+DeploymentIntentName. This method is responsible obtaining the status of
+the deployment, which is made available in the appcontext.
+*/
+func (c InstantiationClient) Status(p string, ca string, v string, di string) (StatusData, error) {
+
+ ac, _, err := NewDeploymentIntentGroupClient().GetDeploymentIntentGroupContext(di, p, ca, v)
+ if err != nil {
+ return StatusData{}, pkgerrors.Wrap(err, "deploymentIntentGroup not found: "+di)
+ }
+
+ // Get all apps in this composite app
+ allApps, err := NewAppClient().GetApps(p, ca, v)
+ if err != nil {
+ return StatusData{}, pkgerrors.Wrap(err, "Not finding the apps")
+ }
+
+ var diStatus StatusData
+ diStatus.Data = make([]ClusterAppStatus, 0)
+
+ // Loop through each app and get the status data for each cluster in the app
+ for _, app := range allApps {
+ // Get the clusters in the appcontext for this app
+ clusters, err := ac.GetClusterNames(app.Metadata.Name)
+ if err != nil {
+ log.Info(":: No clusters for app ::", log.Fields{"AppName": app.Metadata.Name})
+ continue
+ }
+
+ for _, cluster := range clusters {
+ handle, err := ac.GetStatusHandle(app.Metadata.Name, cluster)
+ if err != nil {
+ log.Info(":: No status handle for cluster, app ::",
+ log.Fields{"Cluster": cluster, "AppName": app.Metadata.Name, "Error": err})
+ continue
+ }
+ statusValue, err := ac.GetValue(handle)
+ if err != nil {
+ log.Info(":: No status value for cluster, app ::",
+ log.Fields{"Cluster": cluster, "AppName": app.Metadata.Name, "Error": err})
+ continue
+ }
+ log.Info(":: STATUS VALUE ::", log.Fields{"statusValue": statusValue})
+ var statusData ClusterAppStatus
+ err = json.Unmarshal([]byte(statusValue.(string)), &statusData.Status)
+ if err != nil {
+ log.Info(":: Error unmarshaling status value for cluster, app ::",
+ log.Fields{"Cluster": cluster, "AppName": app.Metadata.Name, "Error": err})
+ continue
+ }
+ statusData.Cluster = cluster
+ statusData.App = app.Metadata.Name
+ log.Info(":: STATUS DATA ::", log.Fields{"status": statusData})
+
+ diStatus.Data = append(diStatus.Data, statusData)
+ }
+ }
+
+ return diStatus, nil
+}
+
+/*
+Terminate takes in projectName, compositeAppName, compositeAppVersion,
+DeploymentIntentName and calls rsync to terminate.
+*/
+func (c InstantiationClient) Terminate(p string, ca string, v string, di string) error {
+
+ ac, ctxval, err := NewDeploymentIntentGroupClient().GetDeploymentIntentGroupContext(di, p, ca, v)
+ if err != nil {
+ return pkgerrors.Wrap(err, "DeploymentIntentGroup has no app context: "+di)
+ }
+
+ err = callRsyncUninstall(ctxval)
+ if err != nil {
+ return err
+ }
+
+ err = ac.DeleteCompositeApp()
+ if err != nil {
+ return pkgerrors.Wrap(err, "Error deleting the app context for DeploymentIntentGroup: "+di)
+ }
+
+ key := DeploymentIntentGroupKey{
+ Name: di,
+ Project: p,
+ CompositeApp: ca,
+ Version: v,
+ }
+
+ err = db.DBconn.RemoveTag(c.db.storeName, key, c.db.tagContext)
+ if err != nil {
+ return pkgerrors.Wrap(err, "Error removing the app context tag from DeploymentIntentGroup: "+di)
+ }
+
+ return nil
+}
diff --git a/src/orchestrator/pkg/module/instantiation_appcontext_helper.go b/src/orchestrator/pkg/module/instantiation_appcontext_helper.go
index 43ddd6df..e6e2bf30 100644
--- a/src/orchestrator/pkg/module/instantiation_appcontext_helper.go
+++ b/src/orchestrator/pkg/module/instantiation_appcontext_helper.go
@@ -25,12 +25,16 @@ import (
"encoding/json"
"io/ioutil"
+ jyaml "github.com/ghodss/yaml"
+
+ rb "github.com/onap/multicloud-k8s/src/monitor/pkg/apis/k8splugin/v1alpha1"
"github.com/onap/multicloud-k8s/src/orchestrator/pkg/appcontext"
gpic "github.com/onap/multicloud-k8s/src/orchestrator/pkg/gpic"
log "github.com/onap/multicloud-k8s/src/orchestrator/pkg/infra/logutils"
"github.com/onap/multicloud-k8s/src/orchestrator/utils"
"github.com/onap/multicloud-k8s/src/orchestrator/utils/helm"
pkgerrors "github.com/pkg/errors"
+ metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)
// resource consists of name of reource
@@ -90,6 +94,45 @@ func getResources(st []helm.KubernetesResourceTemplate) ([]resource, error) {
return resources, nil
}
+// addStatusResource adds a status monitoring resource to the app
+// which consists of name(name+kind) and content
+func getStatusResource(id, app string) (resource, error) {
+
+ var statusCr rb.ResourceBundleState
+
+ label := id + "-" + app
+ name := app + "-" + id
+
+ statusCr.TypeMeta.APIVersion = "k8splugin.io/v1alpha1"
+ statusCr.TypeMeta.Kind = "ResourceBundleState"
+ statusCr.SetName(name)
+
+ labels := make(map[string]string)
+ labels["emco/deployment-id"] = label
+ statusCr.SetLabels(labels)
+
+ labelSelector, err := metav1.ParseToLabelSelector("emco/deployment-id = " + label)
+ if err != nil {
+ log.Info(":: ERROR Parsing Label Selector ::", log.Fields{"Error": err})
+ } else {
+ statusCr.Spec.Selector = labelSelector
+ }
+
+ // Marshaling to json then convert to yaml works better than marshaling to yaml
+ // The 'apiVersion' attribute was marshaling to 'apiversion'
+ // y, _ := yaml.Marshal(&statusCr)
+ j, _ := json.Marshal(&statusCr)
+ y, _ := jyaml.JSONToYAML(j)
+ log.Info(":: RESULTING STATUS CR ::", log.Fields{"StatusCR": y})
+
+ statusResource := resource{
+ name: name + "+" + "ResourceBundleState",
+ filecontent: string(y),
+ }
+
+ return statusResource, nil
+}
+
func addResourcesToCluster(ct appcontext.AppContext, ch interface{}, resources []resource) error {
var resOrderInstr struct {
diff --git a/src/orchestrator/pkg/module/instantiation_scheduler_helper.go b/src/orchestrator/pkg/module/instantiation_scheduler_helper.go
index 3d9d851c..184d6972 100644
--- a/src/orchestrator/pkg/module/instantiation_scheduler_helper.go
+++ b/src/orchestrator/pkg/module/instantiation_scheduler_helper.go
@@ -192,9 +192,9 @@ func callGrpcForControllerList(cl []controller.Controller, mc map[string]string,
}
/*
-callRsync method shall take in the app context id and invokes the rsync service via grpc
+callRsyncInstall method shall take in the app context id and invokes the rsync service via grpc
*/
-func callRsync(contextid interface{}) error {
+func callRsyncInstall(contextid interface{}) error {
appContextID := fmt.Sprintf("%v", contextid)
err := rsyncclient.InvokeInstallApp(appContextID)
if err != nil {
@@ -204,6 +204,18 @@ func callRsync(contextid interface{}) error {
}
/*
+callRsyncUninstall method shall take in the app context id and invokes the rsync service via grpc
+*/
+func callRsyncUninstall(contextid interface{}) error {
+ appContextID := fmt.Sprintf("%v", contextid)
+ err := rsyncclient.InvokeUninstallApp(appContextID)
+ if err != nil {
+ return err
+ }
+ return nil
+}
+
+/*
deleteExtraClusters method shall delete the extra cluster handles for each AnyOf cluster present in the etcd after the grpc call for context updation.
*/
func deleteExtraClusters(apps []App, ct appcontext.AppContext) error {
diff --git a/src/orchestrator/pkg/rtcontext/rtcontext.go b/src/orchestrator/pkg/rtcontext/rtcontext.go
index 432c5d87..f3905eb0 100644
--- a/src/orchestrator/pkg/rtcontext/rtcontext.go
+++ b/src/orchestrator/pkg/rtcontext/rtcontext.go
@@ -18,11 +18,12 @@ package rtcontext
import (
"fmt"
- "github.com/onap/multicloud-k8s/src/orchestrator/pkg/infra/contextdb"
- pkgerrors "github.com/pkg/errors"
"math/rand"
"strings"
"time"
+
+ "github.com/onap/multicloud-k8s/src/orchestrator/pkg/infra/contextdb"
+ pkgerrors "github.com/pkg/errors"
)
const maxrand = 0x7fffffffffffffff
@@ -40,6 +41,7 @@ type Rtcontext interface {
RtcAddMeta(meta interface{}) error
RtcGet() (interface{}, error)
RtcAddLevel(handle interface{}, level string, value string) (interface{}, error)
+ RtcAddStatus(handle interface{}, value interface{}) (interface{}, error)
RtcAddResource(handle interface{}, resname string, value interface{}) (interface{}, error)
RtcAddInstruction(handle interface{}, level string, insttype string, value interface{}) (interface{}, error)
RtcDeletePair(handle interface{}) error
@@ -201,6 +203,26 @@ func (rtc *RunTimeContext) RtcAddOneLevel(pl interface{}, level string, value in
return (interface{})(key), nil
}
+// Add status under the given level and return new handle
+func (rtc *RunTimeContext) RtcAddStatus(handle interface{}, value interface{}) (interface{}, error) {
+
+ str := fmt.Sprintf("%v", handle)
+ sid := fmt.Sprintf("%v", rtc.cid)
+ if !strings.HasPrefix(str, sid) {
+ return nil, pkgerrors.Errorf("Not a valid run time context handle")
+ }
+ if value == nil {
+ return nil, pkgerrors.Errorf("Not a valid run time context resource value")
+ }
+
+ k := str + "status" + "/"
+ err := contextdb.Db.Put(k, value)
+ if err != nil {
+ return nil, pkgerrors.Errorf("Error adding run time context status: %s", err.Error())
+ }
+ return (interface{})(k), nil
+}
+
// Add a resource under the given level and return new handle
func (rtc *RunTimeContext) RtcAddResource(handle interface{}, resname string, value interface{}) (interface{}, error) {
diff --git a/src/rsync/go.mod b/src/rsync/go.mod
index a2c5f83b..c6831e5e 100644
--- a/src/rsync/go.mod
+++ b/src/rsync/go.mod
@@ -5,12 +5,7 @@ go 1.13
require (
github.com/golang/protobuf v1.4.1
github.com/googleapis/gnostic v0.4.0
- github.com/grpc-ecosystem/go-grpc-middleware v1.0.1-0.20190118093823-f849b5445de4 // indirect
- github.com/grpc-ecosystem/grpc-gateway v1.9.5 // indirect
- github.com/mattn/go-isatty v0.0.4 // indirect
- github.com/modern-go/reflect2 v1.0.1 // indirect
github.com/onap/multicloud-k8s/src/orchestrator v0.0.0-20200601021239-7959bd4c6fd4
- go.etcd.io/bbolt v1.3.3 // indirect
google.golang.org/grpc v1.27.1
k8s.io/kubernetes v1.14.1
)
diff --git a/src/rsync/pkg/connector/connector.go b/src/rsync/pkg/connector/connector.go
index 6e17f87a..fc8aa839 100644
--- a/src/rsync/pkg/connector/connector.go
+++ b/src/rsync/pkg/connector/connector.go
@@ -19,8 +19,6 @@ package connector
import (
"log"
- "github.com/onap/multicloud-k8s/src/rsync/pkg/internal/config"
-
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/meta"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
@@ -52,7 +50,7 @@ type KubernetesConnector interface {
// Reference is the interface that is implemented
type Reference interface {
//Create a kubernetes resource described by the yaml in yamlFilePath
- Create(yamlFilePath string, namespace string, client KubernetesConnector) (string, error)
+ Create(yamlFilePath string, namespace string, label string, client KubernetesConnector) (string, error)
//Delete a kubernetes resource described in the provided namespace
Delete(yamlFilePath string, resname string, namespace string, client KubernetesConnector) error
}
@@ -86,7 +84,7 @@ func TagPodsIfPresent(unstruct *unstructured.Unstructured, tag string) {
if labels == nil {
labels = map[string]string{}
}
- labels[config.GetConfiguration().KubernetesLabelName] = tag
+ labels["emco/deployment-id"] = tag
podTemplateSpec.SetLabels(labels)
updatedTemplate, err := runtime.DefaultUnstructuredConverter.ToUnstructured(podTemplateSpec)
diff --git a/src/rsync/pkg/context/context.go b/src/rsync/pkg/context/context.go
index e5da1296..7e0fce3c 100644
--- a/src/rsync/pkg/context/context.go
+++ b/src/rsync/pkg/context/context.go
@@ -18,67 +18,68 @@ package context
import (
"encoding/json"
- "fmt"
- "log"
- "sync"
- "strings"
- "github.com/onap/multicloud-k8s/src/orchestrator/pkg/appcontext"
- pkgerrors "github.com/pkg/errors"
- res "github.com/onap/multicloud-k8s/src/rsync/pkg/resource"
- con "github.com/onap/multicloud-k8s/src/rsync/pkg/connector"
- "github.com/onap/multicloud-k8s/src/rsync/pkg/app"
+ "fmt"
+ "log"
+ "strings"
+ "sync"
+
+ "github.com/onap/multicloud-k8s/src/orchestrator/pkg/appcontext"
+ "github.com/onap/multicloud-k8s/src/rsync/pkg/app"
+ con "github.com/onap/multicloud-k8s/src/rsync/pkg/connector"
+ res "github.com/onap/multicloud-k8s/src/rsync/pkg/resource"
+ status "github.com/onap/multicloud-k8s/src/rsync/pkg/status"
+ pkgerrors "github.com/pkg/errors"
)
type CompositeAppContext struct {
- cid interface{}
+ cid interface{}
appsorder string
appsdependency string
- appsmap []instMap
+ appsmap []instMap
}
type clusterInfo struct {
- name string
+ name string
resorder string
resdependency string
- ressmap []instMap
+ ressmap []instMap
}
type instMap struct {
- name string
- depinfo string
- status string
- rerr error
+ name string
+ depinfo string
+ status string
+ rerr error
clusters []clusterInfo
}
func getInstMap(order string, dependency string, level string) ([]instMap, error) {
- if order == "" {
- return nil, pkgerrors.Errorf("Not a valid order value")
- }
- if dependency == "" {
- return nil, pkgerrors.Errorf("Not a valid dependency value")
- }
-
- if !(level == "app" || level == "res") {
- return nil, pkgerrors.Errorf("Not a valid level name given to create map")
- }
+ if order == "" {
+ return nil, pkgerrors.Errorf("Not a valid order value")
+ }
+ if dependency == "" {
+ return nil, pkgerrors.Errorf("Not a valid dependency value")
+ }
+ if !(level == "app" || level == "res") {
+ return nil, pkgerrors.Errorf("Not a valid level name given to create map")
+ }
- var aov map[string]interface{}
- json.Unmarshal([]byte(order), &aov)
+ var aov map[string]interface{}
+ json.Unmarshal([]byte(order), &aov)
- s := fmt.Sprintf("%vorder", level)
- appso := aov[s].([]interface{})
- var instmap = make([]instMap, len(appso))
+ s := fmt.Sprintf("%vorder", level)
+ appso := aov[s].([]interface{})
+ var instmap = make([]instMap, len(appso))
- var adv map[string]interface{}
- json.Unmarshal([]byte(dependency), &adv)
- s = fmt.Sprintf("%vdependency", level)
- appsd := adv[s].(map[string]interface{})
- for i, u := range appso {
- instmap[i] = instMap{u.(string), appsd[u.(string)].(string), "none", nil, nil}
- }
+ var adv map[string]interface{}
+ json.Unmarshal([]byte(dependency), &adv)
+ s = fmt.Sprintf("%vdependency", level)
+ appsd := adv[s].(map[string]interface{})
+ for i, u := range appso {
+ instmap[i] = instMap{u.(string), appsd[u.(string)].(string), "none", nil, nil}
+ }
- return instmap, nil
+ return instmap, nil
}
func deleteResource(clustername string, resname string, respath string) error {
@@ -94,7 +95,7 @@ func deleteResource(clustername string, resname string, respath string) error {
var gp res.Resource
err = gp.Delete(respath, resname, "default", c)
if err != nil {
- log.Println("Delete resource failed: " + err.Error() + resname)
+ log.Println("Delete resource failed: " + err.Error() + resname)
return err
}
log.Println("Resource succesfully deleted", resname)
@@ -102,7 +103,7 @@ func deleteResource(clustername string, resname string, respath string) error {
}
-func createResource(clustername string, resname string, respath string) error {
+func createResource(clustername string, resname string, respath string, label string) error {
k8sClient := app.KubernetesClient{}
err := k8sClient.Init(clustername, resname)
if err != nil {
@@ -113,9 +114,9 @@ func createResource(clustername string, resname string, respath string) error {
var c con.KubernetesConnector
c = &k8sClient
var gp res.Resource
- _, err = gp.Create(respath,"default", c)
+ _, err = gp.Create(respath, "default", label, c)
if err != nil {
- log.Println("Create failed: " + err.Error() + resname)
+ log.Println("Create failed: " + err.Error() + resname)
return err
}
log.Println("Resource succesfully created", resname)
@@ -152,7 +153,7 @@ func terminateResource(ac appcontext.AppContext, resmap instMap, appname string,
}
-func instantiateResource(ac appcontext.AppContext, resmap instMap, appname string, clustername string) error {
+func instantiateResource(ac appcontext.AppContext, resmap instMap, appname string, clustername string, label string) error {
rh, err := ac.GetResourceHandle(appname, clustername, resmap.name)
if err != nil {
return err
@@ -168,7 +169,7 @@ func instantiateResource(ac appcontext.AppContext, resmap instMap, appname strin
if result[0] == "" {
return pkgerrors.Errorf("Resource name is nil")
}
- err = createResource(clustername, result[0], resval.(string))
+ err = createResource(clustername, result[0], resval.(string), label)
if err != nil {
return err
}
@@ -180,97 +181,102 @@ func instantiateResource(ac appcontext.AppContext, resmap instMap, appname strin
}
-func terminateResources(ac appcontext.AppContext, ressmap []instMap, appname string, clustername string) error {
- var wg sync.WaitGroup
- var chans = make([]chan int, len(ressmap))
- for l := range chans {
- chans[l] = make(chan int)
- }
- for i:=0; i<len(ressmap); i++ {
- wg.Add(1)
- go func(index int) {
- if ressmap[index].depinfo == "go" {
- ressmap[index].status = "start"
- } else {
- ressmap[index].status = "waiting"
- c := <- chans[index]
- if c != index {
+func terminateResources(ac appcontext.AppContext, ressmap []instMap, appname string, clustername string) error {
+ var wg sync.WaitGroup
+ var chans = make([]chan int, len(ressmap))
+ for l := range chans {
+ chans[l] = make(chan int)
+ }
+ for i := 0; i < len(ressmap); i++ {
+ wg.Add(1)
+ go func(index int) {
+ if ressmap[index].depinfo == "go" {
+ ressmap[index].status = "start"
+ } else {
+ ressmap[index].status = "waiting"
+ c := <-chans[index]
+ if c != index {
ressmap[index].status = "error"
- ressmap[index].rerr = pkgerrors.Errorf("channel does not match")
+ ressmap[index].rerr = pkgerrors.Errorf("channel does not match")
wg.Done()
return
- }
- ressmap[index].status = "start"
- }
- ressmap[index].rerr = terminateResource(ac, ressmap[index], appname, clustername)
- ressmap[index].status = "done"
- waitstr := fmt.Sprintf("wait on %v",ressmap[index].name)
- for j:=0; j<len(ressmap); j++ {
- if ressmap[j].depinfo == waitstr {
- chans[j] <- j
- }
- }
- wg.Done()
- }(i)
- }
- wg.Wait()
- for k:=0; k<len(ressmap); k++ {
- if ressmap[k].rerr != nil {
- return pkgerrors.Errorf("error during resources termination")
- }
- }
- return nil
+ }
+ ressmap[index].status = "start"
+ }
+ ressmap[index].rerr = terminateResource(ac, ressmap[index], appname, clustername)
+ ressmap[index].status = "done"
+ waitstr := fmt.Sprintf("wait on %v", ressmap[index].name)
+ for j := 0; j < len(ressmap); j++ {
+ if ressmap[j].depinfo == waitstr {
+ chans[j] <- j
+ }
+ }
+ wg.Done()
+ }(i)
+ }
+ wg.Wait()
+ for k := 0; k < len(ressmap); k++ {
+ if ressmap[k].rerr != nil {
+ return pkgerrors.Errorf("error during resources termination")
+ }
+ }
+ return nil
}
-func instantiateResources(ac appcontext.AppContext, ressmap []instMap, appname string, clustername string) error {
- var wg sync.WaitGroup
- var chans = make([]chan int, len(ressmap))
- for l := range chans {
- chans[l] = make(chan int)
- }
- for i:=0; i<len(ressmap); i++ {
- wg.Add(1)
- go func(index int) {
- if ressmap[index].depinfo == "go" {
- ressmap[index].status = "start"
- } else {
- ressmap[index].status = "waiting"
- c := <- chans[index]
- if c != index {
+func instantiateResources(ac appcontext.AppContext, ressmap []instMap, appname string, clustername string) error {
+ var wg sync.WaitGroup
+ var chans = make([]chan int, len(ressmap))
+ cid, _ := ac.GetCompositeAppHandle()
+
+ results := strings.Split(cid.(string), "/")
+ label := results[2] + "-" + appname
+
+ for l := range chans {
+ chans[l] = make(chan int)
+ }
+ for i := 0; i < len(ressmap); i++ {
+ wg.Add(1)
+ go func(index int) {
+ if ressmap[index].depinfo == "go" {
+ ressmap[index].status = "start"
+ } else {
+ ressmap[index].status = "waiting"
+ c := <-chans[index]
+ if c != index {
ressmap[index].status = "error"
- ressmap[index].rerr = pkgerrors.Errorf("channel does not match")
+ ressmap[index].rerr = pkgerrors.Errorf("channel does not match")
wg.Done()
return
- }
- ressmap[index].status = "start"
- }
- ressmap[index].rerr = instantiateResource(ac, ressmap[index], appname, clustername)
- ressmap[index].status = "done"
- waitstr := fmt.Sprintf("wait on %v",ressmap[index].name)
- for j:=0; j<len(ressmap); j++ {
- if ressmap[j].depinfo == waitstr {
- chans[j] <- j
- }
- }
- wg.Done()
- }(i)
- }
- wg.Wait()
- for k:=0; k<len(ressmap); k++ {
- if ressmap[k].rerr != nil {
- return pkgerrors.Errorf("error during resources instantiation")
- }
- }
- return nil
+ }
+ ressmap[index].status = "start"
+ }
+ ressmap[index].rerr = instantiateResource(ac, ressmap[index], appname, clustername, label)
+ ressmap[index].status = "done"
+ waitstr := fmt.Sprintf("wait on %v", ressmap[index].name)
+ for j := 0; j < len(ressmap); j++ {
+ if ressmap[j].depinfo == waitstr {
+ chans[j] <- j
+ }
+ }
+ wg.Done()
+ }(i)
+ }
+ wg.Wait()
+ for k := 0; k < len(ressmap); k++ {
+ if ressmap[k].rerr != nil {
+ return pkgerrors.Errorf("error during resources instantiation")
+ }
+ }
+ return nil
}
func terminateApp(ac appcontext.AppContext, appmap instMap) error {
- for i:=0; i<len(appmap.clusters); i++ {
+ for i := 0; i < len(appmap.clusters); i++ {
err := terminateResources(ac, appmap.clusters[i].ressmap, appmap.name,
- appmap.clusters[i].name)
+ appmap.clusters[i].name)
if err != nil {
return err
}
@@ -281,38 +287,41 @@ func terminateApp(ac appcontext.AppContext, appmap instMap) error {
}
-
func instantiateApp(ac appcontext.AppContext, appmap instMap) error {
- for i:=0; i<len(appmap.clusters); i++ {
+ for i := 0; i < len(appmap.clusters); i++ {
err := instantiateResources(ac, appmap.clusters[i].ressmap, appmap.name,
- appmap.clusters[i].name)
+ appmap.clusters[i].name)
if err != nil {
return err
}
+ err = status.StartClusterWatcher(appmap.clusters[i].name)
+ if err != nil {
+ log.Printf("Error starting Cluster Watcher %v: %v\n", appmap.clusters[i], err)
+ }
}
log.Println("Instantiation of app done: " + appmap.name)
return nil
}
-func instantiateApps(ac appcontext.AppContext, appsmap []instMap) error {
+func instantiateApps(ac appcontext.AppContext, appsmap []instMap) error {
var wg sync.WaitGroup
var chans = make([]chan int, len(appsmap))
for l := range chans {
chans[l] = make(chan int)
}
- for i:=0; i<len(appsmap); i++ {
+ for i := 0; i < len(appsmap); i++ {
wg.Add(1)
- go func(index int) {
- if appsmap[index].depinfo == "go" {
+ go func(index int) {
+ if appsmap[index].depinfo == "go" {
appsmap[index].status = "start"
} else {
appsmap[index].status = "waiting"
- c := <- chans[index]
+ c := <-chans[index]
if c != index {
appsmap[index].status = "error"
- appsmap[index].rerr = pkgerrors.Errorf("channel does not match")
+ appsmap[index].rerr = pkgerrors.Errorf("channel does not match")
wg.Done()
return
}
@@ -320,17 +329,17 @@ func instantiateApps(ac appcontext.AppContext, appsmap []instMap) error {
}
appsmap[index].rerr = instantiateApp(ac, appsmap[index])
appsmap[index].status = "done"
- waitstr := fmt.Sprintf("wait on %v",appsmap[index].name)
- for j:=0; j<len(appsmap); j++ {
+ waitstr := fmt.Sprintf("wait on %v", appsmap[index].name)
+ for j := 0; j < len(appsmap); j++ {
if appsmap[j].depinfo == waitstr {
chans[j] <- j
}
}
wg.Done()
- }(i)
- }
+ }(i)
+ }
wg.Wait()
- for k:=0; k<len(appsmap); k++ {
+ for k := 0; k < len(appsmap); k++ {
if appsmap[k].rerr != nil {
return pkgerrors.Errorf("error during apps instantiation")
}
@@ -343,45 +352,45 @@ func (instca *CompositeAppContext) InstantiateComApp(cid interface{}) error {
ac := appcontext.AppContext{}
_, err := ac.LoadAppContext(cid)
- if err != nil {
- return err
- }
+ if err != nil {
+ return err
+ }
instca.cid = cid
appsorder, err := ac.GetAppInstruction("order")
- if err != nil {
- return err
- }
+ if err != nil {
+ return err
+ }
instca.appsorder = appsorder.(string)
appsdependency, err := ac.GetAppInstruction("dependency")
- if err != nil {
- return err
- }
+ if err != nil {
+ return err
+ }
instca.appsdependency = appsdependency.(string)
- instca.appsmap, err = getInstMap(instca.appsorder,instca.appsdependency, "app")
- if err != nil {
- return err
- }
+ instca.appsmap, err = getInstMap(instca.appsorder, instca.appsdependency, "app")
+ if err != nil {
+ return err
+ }
- for j:=0; j<len(instca.appsmap); j++ {
+ for j := 0; j < len(instca.appsmap); j++ {
clusternames, err := ac.GetClusterNames(instca.appsmap[j].name)
if err != nil {
- return err
+ return err
}
instca.appsmap[j].clusters = make([]clusterInfo, len(clusternames))
- for k:=0; k<len(clusternames); k++ {
+ for k := 0; k < len(clusternames); k++ {
instca.appsmap[j].clusters[k].name = clusternames[k]
resorder, err := ac.GetResourceInstruction(
- instca.appsmap[j].name, clusternames[k], "order")
+ instca.appsmap[j].name, clusternames[k], "order")
if err != nil {
- return err
+ return err
}
instca.appsmap[j].clusters[k].resorder = resorder.(string)
resdependency, err := ac.GetResourceInstruction(
- instca.appsmap[j].name, clusternames[k], "dependency")
+ instca.appsmap[j].name, clusternames[k], "dependency")
if err != nil {
- return err
+ return err
}
instca.appsmap[j].clusters[k].resdependency = resdependency.(string)
@@ -389,36 +398,36 @@ func (instca *CompositeAppContext) InstantiateComApp(cid interface{}) error {
instca.appsmap[j].clusters[k].resorder,
instca.appsmap[j].clusters[k].resdependency, "res")
if err != nil {
- return err
+ return err
}
}
}
err = instantiateApps(ac, instca.appsmap)
- if err != nil {
- return err
- }
+ if err != nil {
+ return err
+ }
return nil
}
// Delete all the apps
-func terminateApps(ac appcontext.AppContext, appsmap []instMap) error {
+func terminateApps(ac appcontext.AppContext, appsmap []instMap) error {
var wg sync.WaitGroup
var chans = make([]chan int, len(appsmap))
for l := range chans {
chans[l] = make(chan int)
}
- for i:=0; i<len(appsmap); i++ {
+ for i := 0; i < len(appsmap); i++ {
wg.Add(1)
- go func(index int) {
- if appsmap[index].depinfo == "go" {
+ go func(index int) {
+ if appsmap[index].depinfo == "go" {
appsmap[index].status = "start"
} else {
appsmap[index].status = "waiting"
- c := <- chans[index]
+ c := <-chans[index]
if c != index {
appsmap[index].status = "error"
- appsmap[index].rerr = pkgerrors.Errorf("channel does not match")
+ appsmap[index].rerr = pkgerrors.Errorf("channel does not match")
wg.Done()
return
}
@@ -426,17 +435,17 @@ func terminateApps(ac appcontext.AppContext, appsmap []instMap) error {
}
appsmap[index].rerr = terminateApp(ac, appsmap[index])
appsmap[index].status = "done"
- waitstr := fmt.Sprintf("wait on %v",appsmap[index].name)
- for j:=0; j<len(appsmap); j++ {
+ waitstr := fmt.Sprintf("wait on %v", appsmap[index].name)
+ for j := 0; j < len(appsmap); j++ {
if appsmap[j].depinfo == waitstr {
chans[j] <- j
}
}
wg.Done()
- }(i)
- }
+ }(i)
+ }
wg.Wait()
- for k:=0; k<len(appsmap); k++ {
+ for k := 0; k < len(appsmap); k++ {
if appsmap[k].rerr != nil {
return pkgerrors.Errorf("error during apps instantiation")
}
@@ -444,50 +453,51 @@ func terminateApps(ac appcontext.AppContext, appsmap []instMap) error {
return nil
}
+
// Delete all the resources for a given context
func (instca *CompositeAppContext) TerminateComApp(cid interface{}) error {
ac := appcontext.AppContext{}
_, err := ac.LoadAppContext(cid)
- if err != nil {
- return err
- }
+ if err != nil {
+ return err
+ }
instca.cid = cid
appsorder, err := ac.GetAppInstruction("order")
- if err != nil {
- return err
- }
+ if err != nil {
+ return err
+ }
instca.appsorder = appsorder.(string)
appsdependency, err := ac.GetAppInstruction("dependency")
- if err != nil {
- return err
- }
+ if err != nil {
+ return err
+ }
instca.appsdependency = appsdependency.(string)
- instca.appsmap, err = getInstMap(instca.appsorder,instca.appsdependency, "app")
- if err != nil {
- return err
- }
+ instca.appsmap, err = getInstMap(instca.appsorder, instca.appsdependency, "app")
+ if err != nil {
+ return err
+ }
- for j:=0; j<len(instca.appsmap); j++ {
+ for j := 0; j < len(instca.appsmap); j++ {
clusternames, err := ac.GetClusterNames(instca.appsmap[j].name)
if err != nil {
- return err
+ return err
}
instca.appsmap[j].clusters = make([]clusterInfo, len(clusternames))
- for k:=0; k<len(clusternames); k++ {
+ for k := 0; k < len(clusternames); k++ {
instca.appsmap[j].clusters[k].name = clusternames[k]
resorder, err := ac.GetResourceInstruction(
- instca.appsmap[j].name, clusternames[k], "order")
+ instca.appsmap[j].name, clusternames[k], "order")
if err != nil {
- return err
+ return err
}
instca.appsmap[j].clusters[k].resorder = resorder.(string)
resdependency, err := ac.GetResourceInstruction(
- instca.appsmap[j].name, clusternames[k], "dependency")
+ instca.appsmap[j].name, clusternames[k], "dependency")
if err != nil {
- return err
+ return err
}
instca.appsmap[j].clusters[k].resdependency = resdependency.(string)
@@ -495,14 +505,14 @@ func (instca *CompositeAppContext) TerminateComApp(cid interface{}) error {
instca.appsmap[j].clusters[k].resorder,
instca.appsmap[j].clusters[k].resdependency, "res")
if err != nil {
- return err
+ return err
}
}
}
err = terminateApps(ac, instca.appsmap)
- if err != nil {
- return err
- }
+ if err != nil {
+ return err
+ }
return nil
diff --git a/src/rsync/pkg/resource/resource.go b/src/rsync/pkg/resource/resource.go
index 8b45c341..2877e2a3 100644
--- a/src/rsync/pkg/resource/resource.go
+++ b/src/rsync/pkg/resource/resource.go
@@ -20,16 +20,15 @@ import (
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime/schema"
- utils "github.com/onap/multicloud-k8s/src/rsync/pkg/internal"
- "github.com/onap/multicloud-k8s/src/rsync/pkg/internal/config"
"github.com/onap/multicloud-k8s/src/rsync/pkg/connector"
+ utils "github.com/onap/multicloud-k8s/src/rsync/pkg/internal"
)
type Resource struct {
}
// Create deployment object in a specific Kubernetes cluster
-func (r Resource) Create(data string, namespace string, client connector.KubernetesConnector) (string, error) {
+func (r Resource) Create(data string, namespace string, label string, client connector.KubernetesConnector) (string, error) {
if namespace == "" {
namespace = "default"
}
@@ -57,13 +56,15 @@ func (r Resource) Create(data string, namespace string, client connector.Kuberne
if labels == nil {
labels = map[string]string{}
}
- labels[config.GetConfiguration().KubernetesLabelName] = client.GetInstanceID()
+ //labels[config.GetConfiguration().KubernetesLabelName] = client.GetInstanceID()
+ labels["emco/deployment-id"] = label
unstruct.SetLabels(labels)
// This checks if the resource we are creating has a podSpec in it
// Eg: Deployment, StatefulSet, Job etc..
// If a PodSpec is found, the label will be added to it too.
- connector.TagPodsIfPresent(unstruct, client.GetInstanceID())
+ //connector.TagPodsIfPresent(unstruct, client.GetInstanceID())
+ connector.TagPodsIfPresent(unstruct, label)
gvr := mapping.Resource
var createdObj *unstructured.Unstructured
@@ -86,44 +87,44 @@ func (r Resource) Create(data string, namespace string, client connector.Kuberne
// Delete an existing resource hosted in a specific Kubernetes cluster
func (r Resource) Delete(data string, resname string, namespace string, client connector.KubernetesConnector) error {
- if namespace == "" {
- namespace = "default"
- }
-
- //Decode the yaml file to create a runtime.Object
- unstruct := &unstructured.Unstructured{}
- //Ignore the returned obj as we expect the data in unstruct
- _, err := utils.DecodeYAMLData(data, unstruct)
- if err != nil {
- return pkgerrors.Wrap(err, "Decode deployment object error")
- }
-
- dynClient := client.GetDynamicClient()
- mapper := client.GetMapper()
-
- gvk := unstruct.GroupVersionKind()
- mapping, err := mapper.RESTMapping(schema.GroupKind{Group: gvk.Group, Kind: gvk.Kind}, gvk.Version)
- if err != nil {
- return pkgerrors.Wrap(err, "Mapping kind to resource error")
- }
-
- gvr := mapping.Resource
- deletePolicy := metav1.DeletePropagationForeground
- opts := &metav1.DeleteOptions{
- PropagationPolicy: &deletePolicy,
- }
-
- switch mapping.Scope.Name() {
- case meta.RESTScopeNameNamespace:
- err = dynClient.Resource(gvr).Namespace(namespace).Delete(resname, opts)
- case meta.RESTScopeNameRoot:
- err = dynClient.Resource(gvr).Delete(resname, opts)
- default:
- return pkgerrors.New("Got an unknown RESTSCopeName for mappin")
- }
-
- if err != nil {
- return pkgerrors.Wrap(err, "Delete object error")
- }
- return nil
+ if namespace == "" {
+ namespace = "default"
+ }
+
+ //Decode the yaml file to create a runtime.Object
+ unstruct := &unstructured.Unstructured{}
+ //Ignore the returned obj as we expect the data in unstruct
+ _, err := utils.DecodeYAMLData(data, unstruct)
+ if err != nil {
+ return pkgerrors.Wrap(err, "Decode deployment object error")
+ }
+
+ dynClient := client.GetDynamicClient()
+ mapper := client.GetMapper()
+
+ gvk := unstruct.GroupVersionKind()
+ mapping, err := mapper.RESTMapping(schema.GroupKind{Group: gvk.Group, Kind: gvk.Kind}, gvk.Version)
+ if err != nil {
+ return pkgerrors.Wrap(err, "Mapping kind to resource error")
+ }
+
+ gvr := mapping.Resource
+ deletePolicy := metav1.DeletePropagationForeground
+ opts := &metav1.DeleteOptions{
+ PropagationPolicy: &deletePolicy,
+ }
+
+ switch mapping.Scope.Name() {
+ case meta.RESTScopeNameNamespace:
+ err = dynClient.Resource(gvr).Namespace(namespace).Delete(resname, opts)
+ case meta.RESTScopeNameRoot:
+ err = dynClient.Resource(gvr).Delete(resname, opts)
+ default:
+ return pkgerrors.New("Got an unknown RESTSCopeName for mappin")
+ }
+
+ if err != nil {
+ return pkgerrors.Wrap(err, "Delete object error")
+ }
+ return nil
}
diff --git a/src/rsync/pkg/status/status.go b/src/rsync/pkg/status/status.go
new file mode 100644
index 00000000..351da027
--- /dev/null
+++ b/src/rsync/pkg/status/status.go
@@ -0,0 +1,222 @@
+/*
+ * Copyright 2020 Intel Corporation, Inc
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package status
+
+import (
+ "encoding/base64"
+ "encoding/json"
+ "fmt"
+ "strings"
+ "sync"
+
+ pkgerrors "github.com/pkg/errors"
+ "github.com/sirupsen/logrus"
+
+ "github.com/onap/multicloud-k8s/src/clm/pkg/cluster"
+ v1alpha1 "github.com/onap/multicloud-k8s/src/monitor/pkg/apis/k8splugin/v1alpha1"
+ clientset "github.com/onap/multicloud-k8s/src/monitor/pkg/generated/clientset/versioned"
+ informers "github.com/onap/multicloud-k8s/src/monitor/pkg/generated/informers/externalversions"
+ appcontext "github.com/onap/multicloud-k8s/src/orchestrator/pkg/appcontext"
+ "k8s.io/client-go/tools/cache"
+ "k8s.io/client-go/tools/clientcmd"
+)
+
+type channelManager struct {
+ channels map[string]chan struct{}
+ sync.Mutex
+}
+
+var channelData channelManager
+
+const monitorLabel = "emco/deployment-id"
+
+// HandleStatusUpdate for an application in a cluster
+// TODO: Add code for specific handling
+func HandleStatusUpdate(clusterId string, id string, v *v1alpha1.ResourceBundleState) {
+ //status := v.Status.ServiceStatuses
+ //podStatus := v.Status.PodStatuses
+
+ // Get the contextId from the label (id)
+ result := strings.SplitN(id, "-", 2)
+ if result[0] == "" {
+ logrus.Info(clusterId, "::label is missing an appcontext identifier::", id)
+ return
+ }
+
+ if len(result) != 2 {
+ logrus.Info(clusterId, "::invalid label format::", id)
+ return
+ }
+
+ // Get the app from the label (id)
+ if result[1] == "" {
+ logrus.Info(clusterId, "::label is missing an app identifier::", id)
+ return
+ }
+
+ // Look up the contextId
+ var ac appcontext.AppContext
+ _, err := ac.LoadAppContext(result[0])
+ if err != nil {
+ logrus.Info(clusterId, "::App context not found::", result[0], "::Error::", err)
+ return
+ }
+
+ // produce yaml representation of the status
+ vjson, err := json.Marshal(v.Status)
+ if err != nil {
+ logrus.Info(clusterId, "::Error marshalling status information::", err)
+ return
+ }
+
+ // Get the handle for the context/app/cluster status object
+ handle, err := ac.GetStatusHandle(result[1], clusterId)
+ if err != nil {
+ // Expected first time
+ logrus.Info(clusterId, "::Status context handle not found::", id, "::Error::", err)
+ }
+
+ // If status handle was not found, then create the status object in the appcontext
+ if handle == nil {
+ chandle, err := ac.GetClusterHandle(result[1], clusterId)
+ if err != nil {
+ logrus.Info(clusterId, "::Cluster context handle not found::", id, "::Error::", err)
+ } else {
+ ac.AddStatus(chandle, string(vjson))
+ }
+ } else {
+ ac.UpdateStatusValue(handle, string(vjson))
+ }
+
+ return
+}
+
+// StartClusterWatcher watches for CR
+// configBytes - Kubectl file data
+func StartClusterWatcher(clusterId string) error {
+
+ configBytes, err := getKubeConfig(clusterId)
+ if err != nil {
+ return err
+ }
+
+ //key := provider + "+" + name
+ // Get the lock
+ channelData.Lock()
+ defer channelData.Unlock()
+ // For first time
+ if channelData.channels == nil {
+ channelData.channels = make(map[string]chan struct{})
+ }
+ _, ok := channelData.channels[clusterId]
+ if !ok {
+ // Create Channel
+ channelData.channels[clusterId] = make(chan struct{})
+ // Create config
+ config, err := clientcmd.RESTConfigFromKubeConfig(configBytes)
+ if err != nil {
+ logrus.Info(fmt.Sprintf("RESTConfigFromKubeConfig error: %s", err.Error()))
+ return pkgerrors.Wrap(err, "RESTConfigFromKubeConfig error")
+ }
+ k8sClient, err := clientset.NewForConfig(config)
+ if err != nil {
+ return pkgerrors.Wrap(err, "Clientset NewForConfig error")
+ }
+ // Create Informer
+ mInformerFactory := informers.NewSharedInformerFactory(k8sClient, 0)
+ mInformer := mInformerFactory.K8splugin().V1alpha1().ResourceBundleStates().Informer()
+ go scheduleStatus(clusterId, channelData.channels[clusterId], mInformer)
+ }
+ return nil
+}
+
+// StopClusterWatcher stop watching a cluster
+func StopClusterWatcher(clusterId string) {
+ //key := provider + "+" + name
+ if channelData.channels != nil {
+ c, ok := channelData.channels[clusterId]
+ if ok {
+ close(c)
+ }
+ }
+}
+
+// CloseAllClusterWatchers close all channels
+func CloseAllClusterWatchers() {
+ if channelData.channels == nil {
+ return
+ }
+ // Close all Channels to stop all watchers
+ for _, e := range channelData.channels {
+ close(e)
+ }
+}
+
+// Per Cluster Go routine to watch CR
+func scheduleStatus(clusterId string, c <-chan struct{}, s cache.SharedIndexInformer) {
+ handlers := cache.ResourceEventHandlerFuncs{
+ AddFunc: func(obj interface{}) {
+ v, ok := obj.(*v1alpha1.ResourceBundleState)
+ if ok {
+ labels := v.GetLabels()
+ l, ok := labels[monitorLabel]
+ if ok {
+ HandleStatusUpdate(clusterId, l, v)
+ }
+ }
+ },
+ UpdateFunc: func(oldObj, obj interface{}) {
+ v, ok := obj.(*v1alpha1.ResourceBundleState)
+ if ok {
+ labels := v.GetLabels()
+ l, ok := labels[monitorLabel]
+ if ok {
+ HandleStatusUpdate(clusterId, l, v)
+ }
+ }
+ },
+ DeleteFunc: func(obj interface{}) {
+ // Ignore it
+ },
+ }
+ s.AddEventHandler(handlers)
+ s.Run(c)
+}
+
+// getKubeConfig uses the connectivity client to get the kubeconfig based on the name
+// of the clustername. This is written out to a file.
+// TODO - consolidate with other rsync methods to get kubeconfig files
+func getKubeConfig(clustername string) ([]byte, error) {
+
+ if !strings.Contains(clustername, "+") {
+ return nil, pkgerrors.New("Not a valid cluster name")
+ }
+ strs := strings.Split(clustername, "+")
+ if len(strs) != 2 {
+ return nil, pkgerrors.New("Not a valid cluster name")
+ }
+ kubeConfig, err := cluster.NewClusterClient().GetClusterContent(strs[0], strs[1])
+ if err != nil {
+ return nil, pkgerrors.New("Get kubeconfig failed")
+ }
+
+ dec, err := base64.StdEncoding.DecodeString(kubeConfig.Kubeconfig)
+ if err != nil {
+ return nil, err
+ }
+ return dec, nil
+}