diff options
Diffstat (limited to 'src')
30 files changed, 1048 insertions, 383 deletions
diff --git a/src/clm/api/clusterhandler_test.go b/src/clm/api/clusterhandler_test.go index 7095e87c..4bbc91b1 100644 --- a/src/clm/api/clusterhandler_test.go +++ b/src/clm/api/clusterhandler_test.go @@ -102,12 +102,12 @@ func (m *mockClusterManager) GetClusterContent(provider, name string) (cluster.C return m.ClusterContentItems[0], nil } -func (m *mockClusterManager) GetClusterContext(provider, name string) (appcontext.AppContext, error) { +func (m *mockClusterManager) GetClusterContext(provider, name string) (appcontext.AppContext, string, error) { if m.Err != nil { - return appcontext.AppContext{}, m.Err + return appcontext.AppContext{}, "", m.Err } - return m.ClusterContextItems[0], nil + return m.ClusterContextItems[0], "", nil } func (m *mockClusterManager) GetClusters(provider string) ([]cluster.Cluster, error) { diff --git a/src/clm/pkg/cluster/cluster.go b/src/clm/pkg/cluster/cluster.go index 06faafd2..ac7f31f7 100644 --- a/src/clm/pkg/cluster/cluster.go +++ b/src/clm/pkg/cluster/cluster.go @@ -101,7 +101,7 @@ type ClusterManager interface { CreateCluster(provider string, pr Cluster, qr ClusterContent) (Cluster, error) GetCluster(provider, name string) (Cluster, error) GetClusterContent(provider, name string) (ClusterContent, error) - GetClusterContext(provider, name string) (appcontext.AppContext, error) + GetClusterContext(provider, name string) (appcontext.AppContext, string, error) GetClusters(provider string) ([]Cluster, error) GetClustersWithLabel(provider, label string) ([]string, error) DeleteCluster(provider, name string) error @@ -310,7 +310,7 @@ func (v *ClusterClient) GetClusterContent(provider, name string) (ClusterContent } // GetClusterContext returns the AppContext for corresponding provider and name -func (v *ClusterClient) GetClusterContext(provider, name string) (appcontext.AppContext, error) { +func (v *ClusterClient) GetClusterContext(provider, name string) (appcontext.AppContext, string, error) { //Construct key and tag to select the entry key := ClusterKey{ ClusterProviderName: provider, @@ -319,7 +319,7 @@ func (v *ClusterClient) GetClusterContext(provider, name string) (appcontext.App value, err := db.DBconn.Find(v.db.storeName, key, v.db.tagContext) if err != nil { - return appcontext.AppContext{}, pkgerrors.Wrap(err, "Get Cluster Context") + return appcontext.AppContext{}, "", pkgerrors.Wrap(err, "Get Cluster Context") } //value is a byte array @@ -328,12 +328,12 @@ func (v *ClusterClient) GetClusterContext(provider, name string) (appcontext.App var cc appcontext.AppContext _, err = cc.LoadAppContext(ctxVal) if err != nil { - return appcontext.AppContext{}, pkgerrors.Wrap(err, "Reinitializing Cluster AppContext") + return appcontext.AppContext{}, "", pkgerrors.Wrap(err, "Reinitializing Cluster AppContext") } - return cc, nil + return cc, ctxVal, nil } - return appcontext.AppContext{}, pkgerrors.New("Error getting Cluster AppContext") + return appcontext.AppContext{}, "", pkgerrors.New("Error getting Cluster AppContext") } // GetClusters returns all the Clusters for corresponding provider @@ -393,7 +393,7 @@ func (v *ClusterClient) DeleteCluster(provider, name string) error { ClusterProviderName: provider, ClusterName: name, } - _, err := v.GetClusterContext(provider, name) + _, _, err := v.GetClusterContext(provider, name) if err == nil { return pkgerrors.Errorf("Cannot delete cluster until context is deleted: %v, %v", provider, name) } diff --git a/src/monitor/build/Dockerfile b/src/monitor/build/Dockerfile index 812eb47e..9ecff169 100644 --- a/src/monitor/build/Dockerfile +++ b/src/monitor/build/Dockerfile @@ -1,15 +1,16 @@ -FROM registry.access.redhat.com/ubi7/ubi-minimal:latest +FROM golang:1.14.1 -ENV OPERATOR=/usr/local/bin/monitor \ - USER_UID=1001 \ - USER_NAME=monitor +WORKDIR /go/src/github.com/onap/multicloud-k8s/src/monitor +COPY ./ ./ +RUN make all -# install operator binary -COPY _output/bin/monitor ${OPERATOR} +FROM ubuntu:16.04 -COPY bin /usr/local/bin -RUN /usr/local/bin/user_setup +WORKDIR /opt/monitor +RUN groupadd -r monitor && useradd -r -g monitor monitor +RUN chown monitor:monitor /opt/monitor -R +COPY --chown=monitor --from=0 /go/src/github.com/onap/multicloud-k8s/src/monitor/monitor ./ -ENTRYPOINT ["/usr/local/bin/entrypoint"] +USER monitor +ENTRYPOINT ["/opt/monitor/monitor"] -USER ${USER_UID} diff --git a/src/monitor/deploy/cluster_role.yaml b/src/monitor/deploy/cluster_role.yaml new file mode 100644 index 00000000..0732e8d3 --- /dev/null +++ b/src/monitor/deploy/cluster_role.yaml @@ -0,0 +1,72 @@ +apiVersion: rbac.authorization.k8s.io/v1 +kind: ClusterRole +metadata: + creationTimestamp: null + name: monitor +rules: +- apiGroups: + - "" + resources: + - pods + - services + - endpoints + - persistentvolumeclaims + - events + - configmaps + - secrets + verbs: + - '*' +- apiGroups: + - apps + resources: + - deployments + - daemonsets + - replicasets + - statefulsets + verbs: + - '*' +- apiGroups: + - monitoring.coreos.com + resources: + - servicemonitors + verbs: + - get + - create +- apiGroups: + - apps + resourceNames: + - monitor + resources: + - deployments/finalizers + verbs: + - update +- apiGroups: + - "" + resources: + - pods + verbs: + - get +- apiGroups: + - apps + resources: + - replicasets + verbs: + - get +- apiGroups: + - k8splugin.io + resources: + - '*' + verbs: + - '*' +- apiGroups: + - batch + resources: + - '*' + verbs: + - '*' +- apiGroups: + - extensions + resources: + - '*' + verbs: + - '*' diff --git a/src/monitor/deploy/clusterrole_binding.yaml b/src/monitor/deploy/clusterrole_binding.yaml new file mode 100644 index 00000000..73e74039 --- /dev/null +++ b/src/monitor/deploy/clusterrole_binding.yaml @@ -0,0 +1,12 @@ +kind: ClusterRoleBinding +apiVersion: rbac.authorization.k8s.io/v1 +metadata: + name: monitor +subjects: +- kind: ServiceAccount + name: monitor + namespace: default +roleRef: + kind: ClusterRole + name: monitor + apiGroup: rbac.authorization.k8s.io diff --git a/src/monitor/deploy/monitor-cleanup.sh b/src/monitor/deploy/monitor-cleanup.sh new file mode 100755 index 00000000..21725073 --- /dev/null +++ b/src/monitor/deploy/monitor-cleanup.sh @@ -0,0 +1,6 @@ +kubectl delete rolebinding monitor +kubectl delete clusterrolebinding monitor +kubectl delete role monitor +kubectl delete clusterrole monitor +kubectl delete serviceaccount monitor +kubectl delete deploy monitor diff --git a/src/monitor/deploy/monitor-deploy.sh b/src/monitor/deploy/monitor-deploy.sh new file mode 100755 index 00000000..47c7120f --- /dev/null +++ b/src/monitor/deploy/monitor-deploy.sh @@ -0,0 +1,6 @@ +kubectl apply -f role.yaml +kubectl apply -f cluster_role.yaml +kubectl apply -f role_binding.yaml +kubectl apply -f clusterrole_binding.yaml +kubectl apply -f service_account.yaml +kubectl apply -f operator.yaml diff --git a/src/monitor/deploy/operator.yaml b/src/monitor/deploy/operator.yaml index a06c07d3..93e4522c 100644 --- a/src/monitor/deploy/operator.yaml +++ b/src/monitor/deploy/operator.yaml @@ -3,30 +3,28 @@ kind: Deployment metadata: name: monitor labels: - "emco/deployment-id": "bionic-beaver" + "emco/deployment-id": "monitor" spec: replicas: 1 selector: matchLabels: - "emco/deployment-id": "bionic-beaver" + "emco/deployment-id": "monitor" template: metadata: labels: - "emco/deployment-id": "bionic-beaver" + "emco/deployment-id": "monitor" spec: serviceAccountName: monitor containers: - name: monitor # Replace this with the built image name - image: k8splugin.io/monitor:latest + image: ewmduck/monitor:latest command: - - monitor + - /opt/monitor/monitor imagePullPolicy: IfNotPresent env: - name: WATCH_NAMESPACE - valueFrom: - fieldRef: - fieldPath: metadata.namespace + value: "" - name: POD_NAME valueFrom: fieldRef: diff --git a/src/monitor/deploy/role.yaml b/src/monitor/deploy/role.yaml index 4d0fd1b6..c48141ac 100644 --- a/src/monitor/deploy/role.yaml +++ b/src/monitor/deploy/role.yaml @@ -58,3 +58,15 @@ rules: - '*' verbs: - '*' +- apiGroups: + - batch + resources: + - '*' + verbs: + - '*' +- apiGroups: + - extensions + resources: + - '*' + verbs: + - '*' diff --git a/src/monitor/go.mod b/src/monitor/go.mod index ec48d268..6eff59af 100644 --- a/src/monitor/go.mod +++ b/src/monitor/go.mod @@ -32,6 +32,4 @@ replace ( sigs.k8s.io/controller-tools => sigs.k8s.io/controller-tools v0.1.11-0.20190411181648-9d55346c2bde ) -// Remove hg dependency using this mirror replace github.com/operator-framework/operator-sdk => github.com/operator-framework/operator-sdk v0.9.0 - diff --git a/src/monitor/pkg/apis/k8splugin/v1alpha1/types.go b/src/monitor/pkg/apis/k8splugin/v1alpha1/types.go index 231f226e..064591fc 100644 --- a/src/monitor/pkg/apis/k8splugin/v1alpha1/types.go +++ b/src/monitor/pkg/apis/k8splugin/v1alpha1/types.go @@ -15,8 +15,8 @@ import ( // +kubebuilder:subresource:status // +genclient type ResourceBundleState struct { - metav1.TypeMeta `json:",inline"` - metav1.ObjectMeta `json:"metadata,omitempty" protobuf:"bytes,1,opt,name=metadata"` + metav1.TypeMeta `json:",inline" yaml:",inline"` + metav1.ObjectMeta `json:"metadata,omitempty" protobuf:"bytes,1,opt,name=metadata" yaml:"metadata"` Spec ResourceBundleStateSpec `json:"spec,omitempty" protobuf:"bytes,2,opt,name=spec"` Status ResourceBundleStatus `json:"status,omitempty" protobuf:"bytes,3,opt,name=status"` diff --git a/src/ncm/go.mod b/src/ncm/go.mod index 233d8880..d3d2924a 100644 --- a/src/ncm/go.mod +++ b/src/ncm/go.mod @@ -5,6 +5,8 @@ require ( github.com/gorilla/handlers v1.3.0 github.com/gorilla/mux v1.7.3 github.com/k8snetworkplumbingwg/network-attachment-definition-client v0.0.0-20200127152046-0ee521d56061 + github.com/onap/multicloud-k8s/src/clm v0.0.0-00010101000000-000000000000 + github.com/onap/multicloud-k8s/src/orchestrator v0.0.0-20200601021239-7959bd4c6fd4 github.com/pkg/errors v0.8.1 google.golang.org/grpc v1.27.1 gopkg.in/yaml.v2 v2.2.8 diff --git a/src/ncm/internal/grpc/rsyncclient.go b/src/ncm/internal/grpc/rsyncclient.go deleted file mode 100644 index 5eb870a7..00000000 --- a/src/ncm/internal/grpc/rsyncclient.go +++ /dev/null @@ -1,41 +0,0 @@ -/* -Copyright 2020 Intel Corporation. -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - http://www.apache.org/licenses/LICENSE-2.0 -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package grpc - -import ( - log "github.com/onap/multicloud-k8s/src/orchestrator/pkg/infra/logutils" - "github.com/onap/multicloud-k8s/src/orchestrator/pkg/infra/rpc" - controller "github.com/onap/multicloud-k8s/src/orchestrator/pkg/module/controller" -) - -const RsyncName = "rsync" - -// InitRsyncClient initializes connctions to the Resource Synchronizer serivice -func InitRsyncClient() bool { - client := controller.NewControllerClient() - - vals, _ := client.GetControllers() - found := false - for _, v := range vals { - if v.Metadata.Name == RsyncName { - log.Info("Initializing RPC connection to resource synchronizer", log.Fields{ - "Controller": v.Metadata.Name, - }) - rpc.UpdateRpcConn(v.Metadata.Name, v.Spec.Host, v.Spec.Port) - found = true - break - } - } - return found -} diff --git a/src/ncm/pkg/scheduler/scheduler.go b/src/ncm/pkg/scheduler/scheduler.go index 29d67662..4886a67e 100644 --- a/src/ncm/pkg/scheduler/scheduler.go +++ b/src/ncm/pkg/scheduler/scheduler.go @@ -17,20 +17,16 @@ package scheduler import ( - "context" "encoding/json" - "time" clusterPkg "github.com/onap/multicloud-k8s/src/clm/pkg/cluster" - "github.com/onap/multicloud-k8s/src/ncm/internal/grpc" oc "github.com/onap/multicloud-k8s/src/ncm/internal/ovncontroller" ncmtypes "github.com/onap/multicloud-k8s/src/ncm/pkg/module/types" nettypes "github.com/onap/multicloud-k8s/src/ncm/pkg/networkintents/types" appcontext "github.com/onap/multicloud-k8s/src/orchestrator/pkg/appcontext" + "github.com/onap/multicloud-k8s/src/orchestrator/pkg/grpc/installappclient" "github.com/onap/multicloud-k8s/src/orchestrator/pkg/infra/db" log "github.com/onap/multicloud-k8s/src/orchestrator/pkg/infra/logutils" - "github.com/onap/multicloud-k8s/src/orchestrator/pkg/infra/rpc" - installpb "github.com/onap/multicloud-k8s/src/rsync/pkg/grpc/installapp" pkgerrors "github.com/pkg/errors" ) @@ -63,7 +59,7 @@ func NewSchedulerClient() *SchedulerClient { // Apply Network Intents associated with a cluster func (v *SchedulerClient) ApplyNetworkIntents(clusterProvider, cluster string) error { - _, err := clusterPkg.NewClusterClient().GetClusterContext(clusterProvider, cluster) + _, _, err := clusterPkg.NewClusterClient().GetClusterContext(clusterProvider, cluster) if err == nil { return pkgerrors.Errorf("Cluster network intents have already been applied: %v, %v", clusterProvider, cluster) } @@ -157,30 +153,9 @@ func (v *SchedulerClient) ApplyNetworkIntents(clusterProvider, cluster string) e } // call resource synchronizer to instantiate the CRs in the cluster - conn := rpc.GetRpcConn(grpc.RsyncName) - if conn == nil { - grpc.InitRsyncClient() - conn = rpc.GetRpcConn(grpc.RsyncName) - } - - var rpcClient installpb.InstallappClient - var installRes *installpb.InstallAppResponse - ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) - defer cancel() - - if conn != nil { - rpcClient = installpb.NewInstallappClient(conn) - installReq := new(installpb.InstallAppRequest) - installReq.AppContext = ctxVal.(string) - installRes, err = rpcClient.InstallApp(ctx, installReq) - if err == nil { - log.Info("Response from InstappApp GRPC call", log.Fields{ - "Succeeded": installRes.AppContextInstalled, - "Message": installRes.AppContextInstallMessage, - }) - } - } else { - return pkgerrors.Errorf("InstallApp Failed - Could not get InstallAppClient: %v", grpc.RsyncName) + err = installappclient.InvokeInstallApp(ctxVal.(string)) + if err != nil { + return err } return nil @@ -188,12 +163,16 @@ func (v *SchedulerClient) ApplyNetworkIntents(clusterProvider, cluster string) e // Terminate Network Intents associated with a cluster func (v *SchedulerClient) TerminateNetworkIntents(clusterProvider, cluster string) error { - context, err := clusterPkg.NewClusterClient().GetClusterContext(clusterProvider, cluster) + context, ctxVal, err := clusterPkg.NewClusterClient().GetClusterContext(clusterProvider, cluster) if err != nil { return pkgerrors.Wrapf(err, "Error finding AppContext for cluster: %v, %v", clusterProvider, cluster) } - // TODO: call resource synchronizer to terminate the CRs in the cluster + // call resource synchronizer to terminate the CRs in the cluster + err = installappclient.InvokeUninstallApp(ctxVal) + if err != nil { + return err + } // remove the app context cleanuperr := context.DeleteCompositeApp() diff --git a/src/orchestrator/api/api.go b/src/orchestrator/api/api.go index 2470a1be..5abbb96d 100644 --- a/src/orchestrator/api/api.go +++ b/src/orchestrator/api/api.go @@ -180,6 +180,8 @@ func NewRouter(projectClient moduleLib.ProjectManager, } router.HandleFunc("/projects/{project-name}/composite-apps/{composite-app-name}/{composite-app-version}/deployment-intent-groups/{deployment-intent-group-name}/instantiate", instantiationHandler.instantiateHandler).Methods("POST") + router.HandleFunc("/projects/{project-name}/composite-apps/{composite-app-name}/{composite-app-version}/deployment-intent-groups/{deployment-intent-group-name}/terminate", instantiationHandler.terminateHandler).Methods("POST") + router.HandleFunc("/projects/{project-name}/composite-apps/{composite-app-name}/{composite-app-version}/deployment-intent-groups/{deployment-intent-group-name}/status", instantiationHandler.statusHandler).Methods("GET") return router } diff --git a/src/orchestrator/api/instantiation_handler.go b/src/orchestrator/api/instantiation_handler.go index c95785f2..ce50e5b8 100644 --- a/src/orchestrator/api/instantiation_handler.go +++ b/src/orchestrator/api/instantiation_handler.go @@ -17,9 +17,11 @@ package api import ( + "encoding/json" + "net/http" + "github.com/gorilla/mux" moduleLib "github.com/onap/multicloud-k8s/src/orchestrator/pkg/module" - "net/http" ) /* Used to store backend implementation objects @@ -45,3 +47,45 @@ func (h instantiationHandler) instantiateHandler(w http.ResponseWriter, r *http. w.WriteHeader(http.StatusAccepted) } + +func (h instantiationHandler) terminateHandler(w http.ResponseWriter, r *http.Request) { + + vars := mux.Vars(r) + p := vars["project-name"] + ca := vars["composite-app-name"] + v := vars["composite-app-version"] + di := vars["deployment-intent-group-name"] + + iErr := h.client.Terminate(p, ca, v, di) + if iErr != nil { + http.Error(w, iErr.Error(), http.StatusInternalServerError) + return + } + w.WriteHeader(http.StatusAccepted) + +} + +func (h instantiationHandler) statusHandler(w http.ResponseWriter, r *http.Request) { + + vars := mux.Vars(r) + p := vars["project-name"] + ca := vars["composite-app-name"] + v := vars["composite-app-version"] + di := vars["deployment-intent-group-name"] + + status, iErr := h.client.Status(p, ca, v, di) + if iErr != nil { + http.Error(w, iErr.Error(), http.StatusInternalServerError) + return + } + + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(http.StatusOK) + iErr = json.NewEncoder(w).Encode(status) + if iErr != nil { + http.Error(w, iErr.Error(), http.StatusInternalServerError) + return + } + w.WriteHeader(http.StatusAccepted) + +} diff --git a/src/orchestrator/go.mod b/src/orchestrator/go.mod index 223dc068..3f14f00b 100644 --- a/src/orchestrator/go.mod +++ b/src/orchestrator/go.mod @@ -2,17 +2,12 @@ module github.com/onap/multicloud-k8s/src/orchestrator require ( github.com/MakeNowJust/heredoc v1.0.0 // indirect - github.com/Masterminds/goutils v1.1.0 // indirect github.com/Masterminds/semver v1.5.0 // indirect github.com/Masterminds/sprig v2.22.0+incompatible // indirect github.com/chai2010/gettext-go v0.0.0-20170215093142-bf70f2a70fb1 github.com/coreos/etcd v3.3.12+incompatible - github.com/cyphar/filepath-securejoin v0.2.2 // indirect github.com/docker/docker v1.13.1 // indirect - github.com/docker/spdystream v0.0.0-20181023171402-6480d4af844c // indirect - github.com/exponent-io/jsonpath v0.0.0-20151013193312-d6023ce2651d // indirect github.com/ghodss/yaml v1.0.0 - github.com/gobwas/glob v0.2.3 // indirect github.com/golang/protobuf v1.4.1 github.com/gorilla/handlers v1.3.0 github.com/gorilla/mux v1.7.3 @@ -21,34 +16,32 @@ require ( github.com/lib/pq v1.6.0 // indirect github.com/liggitt/tabwriter v0.0.0-20181228230101-89fcab3d43de // indirect github.com/mitchellh/copystructure v1.0.0 // indirect - github.com/mitchellh/go-wordwrap v1.0.0 // indirect github.com/onap/multicloud-k8s/src/clm v0.0.0-00010101000000-000000000000 + github.com/onap/multicloud-k8s/src/monitor v0.0.0-20200630152613-7c20f73e7c5d github.com/onap/multicloud-k8s/src/ncm v0.0.0-20200515060444-c77850a75eee + github.com/onap/multicloud-k8s/src/rsync v0.0.0-20200630152613-7c20f73e7c5d github.com/pkg/errors v0.8.1 github.com/rubenv/sql-migrate v0.0.0-20200429072036-ae26b214fa43 // indirect github.com/russross/blackfriday v1.5.2 github.com/sirupsen/logrus v1.4.2 github.com/spf13/cobra v1.0.0 // indirect - github.com/technosophos/moniker v0.0.0-20180509230615-a5dbd03a2245 // indirect go.etcd.io/etcd v3.3.12+incompatible go.mongodb.org/mongo-driver v1.0.0 golang.org/x/net v0.0.0-20200301022130-244492dfa37a google.golang.org/grpc v1.27.1 google.golang.org/protobuf v1.24.0 gopkg.in/square/go-jose.v2 v2.5.1 // indirect + gopkg.in/yaml.v2 v2.2.8 gopkg.in/yaml.v3 v3.0.0-20200506231410-2ff61e1afc86 - k8s.io/apiextensions-apiserver v0.0.0-00010101000000-000000000000 // indirect k8s.io/apimachinery v0.0.0-20190831074630-461753078381 - k8s.io/apiserver v0.0.0-00010101000000-000000000000 // indirect - k8s.io/cli-runtime v0.0.0-00010101000000-000000000000 // indirect k8s.io/cloud-provider v0.0.0-00010101000000-000000000000 // indirect k8s.io/helm v2.14.3+incompatible sigs.k8s.io/kustomize v2.0.3+incompatible // indirect - vbom.ml/util v0.0.0-20180919145318-efcd4e0f9787 // indirect ) replace ( github.com/onap/multicloud-k8s/src/clm => ../clm + github.com/onap/multicloud-k8s/src/monitor => ../monitor k8s.io/api => k8s.io/api v0.0.0-20190409021203-6e4e0e4f393b k8s.io/apiextensions-apiserver => k8s.io/apiextensions-apiserver v0.0.0-20190409022649-727a075fdec8 k8s.io/apimachinery => k8s.io/apimachinery v0.0.0-20190404173353-6a84e37a896d diff --git a/src/orchestrator/pkg/appcontext/appcontext.go b/src/orchestrator/pkg/appcontext/appcontext.go index a847ae32..cdf23bfa 100644 --- a/src/orchestrator/pkg/appcontext/appcontext.go +++ b/src/orchestrator/pkg/appcontext/appcontext.go @@ -18,10 +18,11 @@ package appcontext import ( "fmt" + "strings" + log "github.com/onap/multicloud-k8s/src/orchestrator/pkg/infra/logutils" "github.com/onap/multicloud-k8s/src/orchestrator/pkg/rtcontext" pkgerrors "github.com/pkg/errors" - "strings" ) // metaPrefix used for denoting clusterMeta level @@ -413,6 +414,59 @@ func (ac *AppContext) GetResourceInstruction(appname string, clustername string, return v, nil } +//AddStatus for holding status of all resources under app and cluster +// handle should be a cluster handle +func (ac *AppContext) AddStatus(handle interface{}, value interface{}) (interface{}, error) { + h, err := ac.rtc.RtcAddStatus(handle, value) + if err != nil { + return nil, err + } + log.Info(":: Added status handle ::", log.Fields{"StatusHandler": h}) + + return h, nil +} + +//DeleteStatus for the given the handle +func (ac *AppContext) DeleteStatus(handle interface{}) error { + err := ac.rtc.RtcDeletePair(handle) + if err != nil { + return err + } + return nil +} + +//Return the handle for status for a given app and cluster +func (ac *AppContext) GetStatusHandle(appname string, clustername string) (interface{}, error) { + if appname == "" { + return nil, pkgerrors.Errorf("Not a valid run time context app name") + } + if clustername == "" { + return nil, pkgerrors.Errorf("Not a valid run time context cluster name") + } + + rh, err := ac.rtc.RtcGet() + if err != nil { + return nil, err + } + + acrh := fmt.Sprintf("%v", rh) + "app/" + appname + "/cluster/" + clustername + "/status/" + hs, err := ac.rtc.RtcGetHandles(acrh) + if err != nil { + return nil, err + } + for _, v := range hs { + if v == acrh { + return v, nil + } + } + return nil, pkgerrors.Errorf("No handle was found for the given resource") +} + +//UpdateStatusValue updates the status value with the given handle +func (ac *AppContext) UpdateStatusValue(handle interface{}, value interface{}) error { + return ac.rtc.RtcUpdateValue(handle, value) +} + //Return all the handles under the composite app func (ac *AppContext) GetAllHandles(handle interface{}) ([]interface{}, error) { hs, err := ac.rtc.RtcGetHandles(handle) diff --git a/src/orchestrator/pkg/appcontext/appcontext_test.go b/src/orchestrator/pkg/appcontext/appcontext_test.go index 05c73703..92c43113 100644 --- a/src/orchestrator/pkg/appcontext/appcontext_test.go +++ b/src/orchestrator/pkg/appcontext/appcontext_test.go @@ -145,6 +145,10 @@ func (c *MockRunTimeContext) RtcUpdateValue(handle interface{}, value interface{ return c.Err } +func (rtc *MockRunTimeContext) RtcAddStatus(handle interface{}, value interface{}) (interface{}, error) { + return nil, nil +} + func TestCreateCompositeApp(t *testing.T) { var ac = AppContext{} testCases := []struct { diff --git a/src/orchestrator/pkg/grpc/installappclient/client.go b/src/orchestrator/pkg/grpc/installappclient/client.go index 4c652a84..0e9141a6 100644 --- a/src/orchestrator/pkg/grpc/installappclient/client.go +++ b/src/orchestrator/pkg/grpc/installappclient/client.go @@ -19,10 +19,32 @@ import ( log "github.com/onap/multicloud-k8s/src/orchestrator/pkg/infra/logutils" "github.com/onap/multicloud-k8s/src/orchestrator/pkg/infra/rpc" + "github.com/onap/multicloud-k8s/src/orchestrator/pkg/module/controller" installpb "github.com/onap/multicloud-k8s/src/rsync/pkg/grpc/installapp" pkgerrors "github.com/pkg/errors" ) +const rsyncName = "rsync" + +// InitRsyncClient initializes connctions to the Resource Synchronizer service +func initRsyncClient() bool { + client := controller.NewControllerClient() + + vals, _ := client.GetControllers() + found := false + for _, v := range vals { + if v.Metadata.Name == rsyncName { + log.Info("Initializing RPC connection to resource synchronizer", log.Fields{ + "Controller": v.Metadata.Name, + }) + rpc.UpdateRpcConn(v.Metadata.Name, v.Spec.Host, v.Spec.Port) + found = true + break + } + } + return found +} + // InvokeInstallApp will make the grpc call to the resource synchronizer // or rsync controller. // rsync will deply the resources in the app context to the clusters as @@ -34,7 +56,11 @@ func InvokeInstallApp(appContextId string) error { ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() - conn := rpc.GetRpcConn("rsync") + conn := rpc.GetRpcConn(rsyncName) + if conn == nil { + initRsyncClient() + conn = rpc.GetRpcConn(rsyncName) + } if conn != nil { rpcClient = installpb.NewInstallappClient(conn) @@ -64,3 +90,41 @@ func InvokeInstallApp(appContextId string) error { } return err } + +func InvokeUninstallApp(appContextId string) error { + var err error + var rpcClient installpb.InstallappClient + var uninstallRes *installpb.UninstallAppResponse + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + + conn := rpc.GetRpcConn("rsync") + + if conn != nil { + rpcClient = installpb.NewInstallappClient(conn) + uninstallReq := new(installpb.UninstallAppRequest) + uninstallReq.AppContext = appContextId + uninstallRes, err = rpcClient.UninstallApp(ctx, uninstallReq) + if err == nil { + log.Info("Response from UninstappApp GRPC call", log.Fields{ + "Succeeded": uninstallRes.AppContextUninstalled, + "Message": uninstallRes.AppContextUninstallMessage, + }) + } + } else { + return pkgerrors.Errorf("UninstallApp Failed - Could not get InstallAppClient: %v", "rsync") + } + + if err == nil { + if uninstallRes.AppContextUninstalled { + log.Info("UninstallApp Success", log.Fields{ + "AppContext": appContextId, + "Message": uninstallRes.AppContextUninstallMessage, + }) + return nil + } else { + return pkgerrors.Errorf("UninstallApp Failed: %v", uninstallRes.AppContextUninstallMessage) + } + } + return err +} diff --git a/src/orchestrator/pkg/module/deployment_intent_groups.go b/src/orchestrator/pkg/module/deployment_intent_groups.go index 16a14c7b..0decb68f 100644 --- a/src/orchestrator/pkg/module/deployment_intent_groups.go +++ b/src/orchestrator/pkg/module/deployment_intent_groups.go @@ -20,6 +20,7 @@ import ( "encoding/json" "reflect" + appcontext "github.com/onap/multicloud-k8s/src/orchestrator/pkg/appcontext" "github.com/onap/multicloud-k8s/src/orchestrator/pkg/infra/db" pkgerrors "github.com/pkg/errors" @@ -61,6 +62,7 @@ type OverrideValues struct { type DeploymentIntentGroupManager interface { CreateDeploymentIntentGroup(d DeploymentIntentGroup, p string, ca string, v string) (DeploymentIntentGroup, error) GetDeploymentIntentGroup(di string, p string, ca string, v string) (DeploymentIntentGroup, error) + GetDeploymentIntentGroupContext(di string, p string, ca string, v string) (appcontext.AppContext, string, error) DeleteDeploymentIntentGroup(di string, p string, ca string, v string) error } @@ -86,6 +88,7 @@ func (dk DeploymentIntentGroupKey) String() string { type DeploymentIntentGroupClient struct { storeName string tagMetaData string + tagContext string } // NewDeploymentIntentGroupClient return an instance of DeploymentIntentGroupClient which implements DeploymentIntentGroupManager @@ -93,6 +96,7 @@ func NewDeploymentIntentGroupClient() *DeploymentIntentGroupClient { return &DeploymentIntentGroupClient{ storeName: "orchestrator", tagMetaData: "deploymentintentgroupmetadata", + tagContext: "contextid", } } @@ -160,6 +164,34 @@ func (c *DeploymentIntentGroupClient) GetDeploymentIntentGroup(di string, p stri } +// GetDeploymentIntentGroup returns the DeploymentIntentGroup with a given name, project, compositeApp and version of compositeApp +func (c *DeploymentIntentGroupClient) GetDeploymentIntentGroupContext(di string, p string, ca string, v string) (appcontext.AppContext, string, error) { + + key := DeploymentIntentGroupKey{ + Name: di, + Project: p, + CompositeApp: ca, + Version: v, + } + + result, err := db.DBconn.Find(c.storeName, key, c.tagContext) + if err != nil { + return appcontext.AppContext{}, "", pkgerrors.Wrap(err, "Get DeploymentIntentGroup Context error") + } + + if result != nil { + ctxVal := string(result[0]) + var cc appcontext.AppContext + _, err = cc.LoadAppContext(ctxVal) + if err != nil { + return appcontext.AppContext{}, "", pkgerrors.Wrap(err, "Error loading DeploymentIntentGroup Appcontext") + } + return cc, ctxVal, nil + } + + return appcontext.AppContext{}, "", pkgerrors.New("Error getting DeploymentIntentGroup AppContext") +} + // DeleteDeploymentIntentGroup deletes a DeploymentIntentGroup func (c *DeploymentIntentGroupClient) DeleteDeploymentIntentGroup(di string, p string, ca string, v string) error { k := DeploymentIntentGroupKey{ @@ -168,10 +200,14 @@ func (c *DeploymentIntentGroupClient) DeleteDeploymentIntentGroup(di string, p s CompositeApp: ca, Version: v, } + _, _, err := c.GetDeploymentIntentGroupContext(di, p, ca, v) + if err == nil { + return pkgerrors.Wrap(err, "DeploymentIntentGroup must be terminated before it can be deleted "+di) + } - err := db.DBconn.Remove(c.storeName, k) + err = db.DBconn.Remove(c.storeName, k) if err != nil { - return pkgerrors.Wrap(err, "Delete DeploymentIntentGroup entry;") + return pkgerrors.Wrap(err, "Error deleting DeploymentIntentGroup entry") } return nil diff --git a/src/orchestrator/pkg/module/instantiation.go b/src/orchestrator/pkg/module/instantiation.go index 043b80f2..1f2e1117 100644 --- a/src/orchestrator/pkg/module/instantiation.go +++ b/src/orchestrator/pkg/module/instantiation.go @@ -21,6 +21,7 @@ import ( "encoding/json" "fmt" + rb "github.com/onap/multicloud-k8s/src/monitor/pkg/apis/k8splugin/v1alpha1" gpic "github.com/onap/multicloud-k8s/src/orchestrator/pkg/gpic" "github.com/onap/multicloud-k8s/src/orchestrator/pkg/infra/db" log "github.com/onap/multicloud-k8s/src/orchestrator/pkg/infra/logutils" @@ -42,17 +43,25 @@ type InstantiationClient struct { db InstantiationClientDbInfo } +type ClusterAppStatus struct { + Cluster string + App string + Status rb.ResourceBundleStatus +} + +type StatusData struct { + Data []ClusterAppStatus +} + /* InstantiationKey used in storing the contextid in the momgodb It consists of -GenericPlacementIntentName, ProjectName, CompositeAppName, CompositeAppVersion, DeploymentIntentGroup */ type InstantiationKey struct { - IntentName string Project string CompositeApp string Version string @@ -64,6 +73,8 @@ type InstantiationKey struct { type InstantiationManager interface { //ApproveInstantiation(p string, ca string, v string, di string) (error) Instantiate(p string, ca string, v string, di string) error + Status(p string, ca string, v string, di string) (StatusData, error) + Terminate(p string, ca string, v string, di string) error } // InstantiationClientDbInfo consists of storeName and tagContext @@ -177,6 +188,12 @@ func (c InstantiationClient) Instantiate(p string, ca string, v string, di strin if err != nil { return pkgerrors.Wrap(err, "Not finding the deploymentIntentGroup") } + + _, _, err = NewDeploymentIntentGroupClient().GetDeploymentIntentGroupContext(di, p, ca, v) + if err == nil { + return pkgerrors.Errorf("DeploymentIntentGroup has already been instantiated: " + di) + } + rName := dIGrp.Spec.Version //rName is releaseName overrideValues := dIGrp.Spec.OverrideValuesObj cp := dIGrp.Spec.Profile @@ -229,6 +246,12 @@ func (c InstantiationClient) Instantiate(p string, ca string, v string, di strin return pkgerrors.Wrapf(err, "Unable to get the resources for app :: %s", eachApp.Metadata.Name) } + statusResource, err := getStatusResource(ctxval.(string), eachApp.Metadata.Name) + if err != nil { + return pkgerrors.Wrapf(err, "Unable to generate the status resource for app :: %s", eachApp.Metadata.Name) + } + resources = append(resources, statusResource) + specData, err := NewAppIntentClient().GetAllIntentsByApp(eachApp.Metadata.Name, p, ca, v, gIntent) if err != nil { return pkgerrors.Wrap(err, "Unable to get the intents for app") @@ -269,12 +292,11 @@ func (c InstantiationClient) Instantiate(p string, ca string, v string, di strin //END: storing into etcd // BEGIN:: save the context in the orchestrator db record - key := InstantiationKey{ - IntentName: gIntent, - Project: p, - CompositeApp: ca, - Version: v, - DeploymentIntentGroup: di, + key := DeploymentIntentGroupKey{ + Name: di, + Project: p, + CompositeApp: ca, + Version: v, } err = db.DBconn.Insert(c.db.storeName, key, nil, c.db.tagContext, ctxval) @@ -315,7 +337,7 @@ func (c InstantiationClient) Instantiate(p string, ca string, v string, di strin // END: Scheduler code // BEGIN : Rsync code - err = callRsync(ctxval) + err = callRsyncInstall(ctxval) if err != nil { return err } @@ -324,3 +346,101 @@ func (c InstantiationClient) Instantiate(p string, ca string, v string, di strin log.Info(":: Done with instantiation... ::", log.Fields{"CompositeAppName": ca}) return err } + +/* +Status takes in projectName, compositeAppName, compositeAppVersion, +DeploymentIntentName. This method is responsible obtaining the status of +the deployment, which is made available in the appcontext. +*/ +func (c InstantiationClient) Status(p string, ca string, v string, di string) (StatusData, error) { + + ac, _, err := NewDeploymentIntentGroupClient().GetDeploymentIntentGroupContext(di, p, ca, v) + if err != nil { + return StatusData{}, pkgerrors.Wrap(err, "deploymentIntentGroup not found: "+di) + } + + // Get all apps in this composite app + allApps, err := NewAppClient().GetApps(p, ca, v) + if err != nil { + return StatusData{}, pkgerrors.Wrap(err, "Not finding the apps") + } + + var diStatus StatusData + diStatus.Data = make([]ClusterAppStatus, 0) + + // Loop through each app and get the status data for each cluster in the app + for _, app := range allApps { + // Get the clusters in the appcontext for this app + clusters, err := ac.GetClusterNames(app.Metadata.Name) + if err != nil { + log.Info(":: No clusters for app ::", log.Fields{"AppName": app.Metadata.Name}) + continue + } + + for _, cluster := range clusters { + handle, err := ac.GetStatusHandle(app.Metadata.Name, cluster) + if err != nil { + log.Info(":: No status handle for cluster, app ::", + log.Fields{"Cluster": cluster, "AppName": app.Metadata.Name, "Error": err}) + continue + } + statusValue, err := ac.GetValue(handle) + if err != nil { + log.Info(":: No status value for cluster, app ::", + log.Fields{"Cluster": cluster, "AppName": app.Metadata.Name, "Error": err}) + continue + } + log.Info(":: STATUS VALUE ::", log.Fields{"statusValue": statusValue}) + var statusData ClusterAppStatus + err = json.Unmarshal([]byte(statusValue.(string)), &statusData.Status) + if err != nil { + log.Info(":: Error unmarshaling status value for cluster, app ::", + log.Fields{"Cluster": cluster, "AppName": app.Metadata.Name, "Error": err}) + continue + } + statusData.Cluster = cluster + statusData.App = app.Metadata.Name + log.Info(":: STATUS DATA ::", log.Fields{"status": statusData}) + + diStatus.Data = append(diStatus.Data, statusData) + } + } + + return diStatus, nil +} + +/* +Terminate takes in projectName, compositeAppName, compositeAppVersion, +DeploymentIntentName and calls rsync to terminate. +*/ +func (c InstantiationClient) Terminate(p string, ca string, v string, di string) error { + + ac, ctxval, err := NewDeploymentIntentGroupClient().GetDeploymentIntentGroupContext(di, p, ca, v) + if err != nil { + return pkgerrors.Wrap(err, "DeploymentIntentGroup has no app context: "+di) + } + + err = callRsyncUninstall(ctxval) + if err != nil { + return err + } + + err = ac.DeleteCompositeApp() + if err != nil { + return pkgerrors.Wrap(err, "Error deleting the app context for DeploymentIntentGroup: "+di) + } + + key := DeploymentIntentGroupKey{ + Name: di, + Project: p, + CompositeApp: ca, + Version: v, + } + + err = db.DBconn.RemoveTag(c.db.storeName, key, c.db.tagContext) + if err != nil { + return pkgerrors.Wrap(err, "Error removing the app context tag from DeploymentIntentGroup: "+di) + } + + return nil +} diff --git a/src/orchestrator/pkg/module/instantiation_appcontext_helper.go b/src/orchestrator/pkg/module/instantiation_appcontext_helper.go index 43ddd6df..e6e2bf30 100644 --- a/src/orchestrator/pkg/module/instantiation_appcontext_helper.go +++ b/src/orchestrator/pkg/module/instantiation_appcontext_helper.go @@ -25,12 +25,16 @@ import ( "encoding/json" "io/ioutil" + jyaml "github.com/ghodss/yaml" + + rb "github.com/onap/multicloud-k8s/src/monitor/pkg/apis/k8splugin/v1alpha1" "github.com/onap/multicloud-k8s/src/orchestrator/pkg/appcontext" gpic "github.com/onap/multicloud-k8s/src/orchestrator/pkg/gpic" log "github.com/onap/multicloud-k8s/src/orchestrator/pkg/infra/logutils" "github.com/onap/multicloud-k8s/src/orchestrator/utils" "github.com/onap/multicloud-k8s/src/orchestrator/utils/helm" pkgerrors "github.com/pkg/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) // resource consists of name of reource @@ -90,6 +94,45 @@ func getResources(st []helm.KubernetesResourceTemplate) ([]resource, error) { return resources, nil } +// addStatusResource adds a status monitoring resource to the app +// which consists of name(name+kind) and content +func getStatusResource(id, app string) (resource, error) { + + var statusCr rb.ResourceBundleState + + label := id + "-" + app + name := app + "-" + id + + statusCr.TypeMeta.APIVersion = "k8splugin.io/v1alpha1" + statusCr.TypeMeta.Kind = "ResourceBundleState" + statusCr.SetName(name) + + labels := make(map[string]string) + labels["emco/deployment-id"] = label + statusCr.SetLabels(labels) + + labelSelector, err := metav1.ParseToLabelSelector("emco/deployment-id = " + label) + if err != nil { + log.Info(":: ERROR Parsing Label Selector ::", log.Fields{"Error": err}) + } else { + statusCr.Spec.Selector = labelSelector + } + + // Marshaling to json then convert to yaml works better than marshaling to yaml + // The 'apiVersion' attribute was marshaling to 'apiversion' + // y, _ := yaml.Marshal(&statusCr) + j, _ := json.Marshal(&statusCr) + y, _ := jyaml.JSONToYAML(j) + log.Info(":: RESULTING STATUS CR ::", log.Fields{"StatusCR": y}) + + statusResource := resource{ + name: name + "+" + "ResourceBundleState", + filecontent: string(y), + } + + return statusResource, nil +} + func addResourcesToCluster(ct appcontext.AppContext, ch interface{}, resources []resource) error { var resOrderInstr struct { diff --git a/src/orchestrator/pkg/module/instantiation_scheduler_helper.go b/src/orchestrator/pkg/module/instantiation_scheduler_helper.go index 3d9d851c..184d6972 100644 --- a/src/orchestrator/pkg/module/instantiation_scheduler_helper.go +++ b/src/orchestrator/pkg/module/instantiation_scheduler_helper.go @@ -192,9 +192,9 @@ func callGrpcForControllerList(cl []controller.Controller, mc map[string]string, } /* -callRsync method shall take in the app context id and invokes the rsync service via grpc +callRsyncInstall method shall take in the app context id and invokes the rsync service via grpc */ -func callRsync(contextid interface{}) error { +func callRsyncInstall(contextid interface{}) error { appContextID := fmt.Sprintf("%v", contextid) err := rsyncclient.InvokeInstallApp(appContextID) if err != nil { @@ -204,6 +204,18 @@ func callRsync(contextid interface{}) error { } /* +callRsyncUninstall method shall take in the app context id and invokes the rsync service via grpc +*/ +func callRsyncUninstall(contextid interface{}) error { + appContextID := fmt.Sprintf("%v", contextid) + err := rsyncclient.InvokeUninstallApp(appContextID) + if err != nil { + return err + } + return nil +} + +/* deleteExtraClusters method shall delete the extra cluster handles for each AnyOf cluster present in the etcd after the grpc call for context updation. */ func deleteExtraClusters(apps []App, ct appcontext.AppContext) error { diff --git a/src/orchestrator/pkg/rtcontext/rtcontext.go b/src/orchestrator/pkg/rtcontext/rtcontext.go index 432c5d87..f3905eb0 100644 --- a/src/orchestrator/pkg/rtcontext/rtcontext.go +++ b/src/orchestrator/pkg/rtcontext/rtcontext.go @@ -18,11 +18,12 @@ package rtcontext import ( "fmt" - "github.com/onap/multicloud-k8s/src/orchestrator/pkg/infra/contextdb" - pkgerrors "github.com/pkg/errors" "math/rand" "strings" "time" + + "github.com/onap/multicloud-k8s/src/orchestrator/pkg/infra/contextdb" + pkgerrors "github.com/pkg/errors" ) const maxrand = 0x7fffffffffffffff @@ -40,6 +41,7 @@ type Rtcontext interface { RtcAddMeta(meta interface{}) error RtcGet() (interface{}, error) RtcAddLevel(handle interface{}, level string, value string) (interface{}, error) + RtcAddStatus(handle interface{}, value interface{}) (interface{}, error) RtcAddResource(handle interface{}, resname string, value interface{}) (interface{}, error) RtcAddInstruction(handle interface{}, level string, insttype string, value interface{}) (interface{}, error) RtcDeletePair(handle interface{}) error @@ -201,6 +203,26 @@ func (rtc *RunTimeContext) RtcAddOneLevel(pl interface{}, level string, value in return (interface{})(key), nil } +// Add status under the given level and return new handle +func (rtc *RunTimeContext) RtcAddStatus(handle interface{}, value interface{}) (interface{}, error) { + + str := fmt.Sprintf("%v", handle) + sid := fmt.Sprintf("%v", rtc.cid) + if !strings.HasPrefix(str, sid) { + return nil, pkgerrors.Errorf("Not a valid run time context handle") + } + if value == nil { + return nil, pkgerrors.Errorf("Not a valid run time context resource value") + } + + k := str + "status" + "/" + err := contextdb.Db.Put(k, value) + if err != nil { + return nil, pkgerrors.Errorf("Error adding run time context status: %s", err.Error()) + } + return (interface{})(k), nil +} + // Add a resource under the given level and return new handle func (rtc *RunTimeContext) RtcAddResource(handle interface{}, resname string, value interface{}) (interface{}, error) { diff --git a/src/rsync/go.mod b/src/rsync/go.mod index a2c5f83b..c6831e5e 100644 --- a/src/rsync/go.mod +++ b/src/rsync/go.mod @@ -5,12 +5,7 @@ go 1.13 require ( github.com/golang/protobuf v1.4.1 github.com/googleapis/gnostic v0.4.0 - github.com/grpc-ecosystem/go-grpc-middleware v1.0.1-0.20190118093823-f849b5445de4 // indirect - github.com/grpc-ecosystem/grpc-gateway v1.9.5 // indirect - github.com/mattn/go-isatty v0.0.4 // indirect - github.com/modern-go/reflect2 v1.0.1 // indirect github.com/onap/multicloud-k8s/src/orchestrator v0.0.0-20200601021239-7959bd4c6fd4 - go.etcd.io/bbolt v1.3.3 // indirect google.golang.org/grpc v1.27.1 k8s.io/kubernetes v1.14.1 ) diff --git a/src/rsync/pkg/connector/connector.go b/src/rsync/pkg/connector/connector.go index 6e17f87a..fc8aa839 100644 --- a/src/rsync/pkg/connector/connector.go +++ b/src/rsync/pkg/connector/connector.go @@ -19,8 +19,6 @@ package connector import ( "log" - "github.com/onap/multicloud-k8s/src/rsync/pkg/internal/config" - corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/meta" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" @@ -52,7 +50,7 @@ type KubernetesConnector interface { // Reference is the interface that is implemented type Reference interface { //Create a kubernetes resource described by the yaml in yamlFilePath - Create(yamlFilePath string, namespace string, client KubernetesConnector) (string, error) + Create(yamlFilePath string, namespace string, label string, client KubernetesConnector) (string, error) //Delete a kubernetes resource described in the provided namespace Delete(yamlFilePath string, resname string, namespace string, client KubernetesConnector) error } @@ -86,7 +84,7 @@ func TagPodsIfPresent(unstruct *unstructured.Unstructured, tag string) { if labels == nil { labels = map[string]string{} } - labels[config.GetConfiguration().KubernetesLabelName] = tag + labels["emco/deployment-id"] = tag podTemplateSpec.SetLabels(labels) updatedTemplate, err := runtime.DefaultUnstructuredConverter.ToUnstructured(podTemplateSpec) diff --git a/src/rsync/pkg/context/context.go b/src/rsync/pkg/context/context.go index e5da1296..7e0fce3c 100644 --- a/src/rsync/pkg/context/context.go +++ b/src/rsync/pkg/context/context.go @@ -18,67 +18,68 @@ package context import ( "encoding/json" - "fmt" - "log" - "sync" - "strings" - "github.com/onap/multicloud-k8s/src/orchestrator/pkg/appcontext" - pkgerrors "github.com/pkg/errors" - res "github.com/onap/multicloud-k8s/src/rsync/pkg/resource" - con "github.com/onap/multicloud-k8s/src/rsync/pkg/connector" - "github.com/onap/multicloud-k8s/src/rsync/pkg/app" + "fmt" + "log" + "strings" + "sync" + + "github.com/onap/multicloud-k8s/src/orchestrator/pkg/appcontext" + "github.com/onap/multicloud-k8s/src/rsync/pkg/app" + con "github.com/onap/multicloud-k8s/src/rsync/pkg/connector" + res "github.com/onap/multicloud-k8s/src/rsync/pkg/resource" + status "github.com/onap/multicloud-k8s/src/rsync/pkg/status" + pkgerrors "github.com/pkg/errors" ) type CompositeAppContext struct { - cid interface{} + cid interface{} appsorder string appsdependency string - appsmap []instMap + appsmap []instMap } type clusterInfo struct { - name string + name string resorder string resdependency string - ressmap []instMap + ressmap []instMap } type instMap struct { - name string - depinfo string - status string - rerr error + name string + depinfo string + status string + rerr error clusters []clusterInfo } func getInstMap(order string, dependency string, level string) ([]instMap, error) { - if order == "" { - return nil, pkgerrors.Errorf("Not a valid order value") - } - if dependency == "" { - return nil, pkgerrors.Errorf("Not a valid dependency value") - } - - if !(level == "app" || level == "res") { - return nil, pkgerrors.Errorf("Not a valid level name given to create map") - } + if order == "" { + return nil, pkgerrors.Errorf("Not a valid order value") + } + if dependency == "" { + return nil, pkgerrors.Errorf("Not a valid dependency value") + } + if !(level == "app" || level == "res") { + return nil, pkgerrors.Errorf("Not a valid level name given to create map") + } - var aov map[string]interface{} - json.Unmarshal([]byte(order), &aov) + var aov map[string]interface{} + json.Unmarshal([]byte(order), &aov) - s := fmt.Sprintf("%vorder", level) - appso := aov[s].([]interface{}) - var instmap = make([]instMap, len(appso)) + s := fmt.Sprintf("%vorder", level) + appso := aov[s].([]interface{}) + var instmap = make([]instMap, len(appso)) - var adv map[string]interface{} - json.Unmarshal([]byte(dependency), &adv) - s = fmt.Sprintf("%vdependency", level) - appsd := adv[s].(map[string]interface{}) - for i, u := range appso { - instmap[i] = instMap{u.(string), appsd[u.(string)].(string), "none", nil, nil} - } + var adv map[string]interface{} + json.Unmarshal([]byte(dependency), &adv) + s = fmt.Sprintf("%vdependency", level) + appsd := adv[s].(map[string]interface{}) + for i, u := range appso { + instmap[i] = instMap{u.(string), appsd[u.(string)].(string), "none", nil, nil} + } - return instmap, nil + return instmap, nil } func deleteResource(clustername string, resname string, respath string) error { @@ -94,7 +95,7 @@ func deleteResource(clustername string, resname string, respath string) error { var gp res.Resource err = gp.Delete(respath, resname, "default", c) if err != nil { - log.Println("Delete resource failed: " + err.Error() + resname) + log.Println("Delete resource failed: " + err.Error() + resname) return err } log.Println("Resource succesfully deleted", resname) @@ -102,7 +103,7 @@ func deleteResource(clustername string, resname string, respath string) error { } -func createResource(clustername string, resname string, respath string) error { +func createResource(clustername string, resname string, respath string, label string) error { k8sClient := app.KubernetesClient{} err := k8sClient.Init(clustername, resname) if err != nil { @@ -113,9 +114,9 @@ func createResource(clustername string, resname string, respath string) error { var c con.KubernetesConnector c = &k8sClient var gp res.Resource - _, err = gp.Create(respath,"default", c) + _, err = gp.Create(respath, "default", label, c) if err != nil { - log.Println("Create failed: " + err.Error() + resname) + log.Println("Create failed: " + err.Error() + resname) return err } log.Println("Resource succesfully created", resname) @@ -152,7 +153,7 @@ func terminateResource(ac appcontext.AppContext, resmap instMap, appname string, } -func instantiateResource(ac appcontext.AppContext, resmap instMap, appname string, clustername string) error { +func instantiateResource(ac appcontext.AppContext, resmap instMap, appname string, clustername string, label string) error { rh, err := ac.GetResourceHandle(appname, clustername, resmap.name) if err != nil { return err @@ -168,7 +169,7 @@ func instantiateResource(ac appcontext.AppContext, resmap instMap, appname strin if result[0] == "" { return pkgerrors.Errorf("Resource name is nil") } - err = createResource(clustername, result[0], resval.(string)) + err = createResource(clustername, result[0], resval.(string), label) if err != nil { return err } @@ -180,97 +181,102 @@ func instantiateResource(ac appcontext.AppContext, resmap instMap, appname strin } -func terminateResources(ac appcontext.AppContext, ressmap []instMap, appname string, clustername string) error { - var wg sync.WaitGroup - var chans = make([]chan int, len(ressmap)) - for l := range chans { - chans[l] = make(chan int) - } - for i:=0; i<len(ressmap); i++ { - wg.Add(1) - go func(index int) { - if ressmap[index].depinfo == "go" { - ressmap[index].status = "start" - } else { - ressmap[index].status = "waiting" - c := <- chans[index] - if c != index { +func terminateResources(ac appcontext.AppContext, ressmap []instMap, appname string, clustername string) error { + var wg sync.WaitGroup + var chans = make([]chan int, len(ressmap)) + for l := range chans { + chans[l] = make(chan int) + } + for i := 0; i < len(ressmap); i++ { + wg.Add(1) + go func(index int) { + if ressmap[index].depinfo == "go" { + ressmap[index].status = "start" + } else { + ressmap[index].status = "waiting" + c := <-chans[index] + if c != index { ressmap[index].status = "error" - ressmap[index].rerr = pkgerrors.Errorf("channel does not match") + ressmap[index].rerr = pkgerrors.Errorf("channel does not match") wg.Done() return - } - ressmap[index].status = "start" - } - ressmap[index].rerr = terminateResource(ac, ressmap[index], appname, clustername) - ressmap[index].status = "done" - waitstr := fmt.Sprintf("wait on %v",ressmap[index].name) - for j:=0; j<len(ressmap); j++ { - if ressmap[j].depinfo == waitstr { - chans[j] <- j - } - } - wg.Done() - }(i) - } - wg.Wait() - for k:=0; k<len(ressmap); k++ { - if ressmap[k].rerr != nil { - return pkgerrors.Errorf("error during resources termination") - } - } - return nil + } + ressmap[index].status = "start" + } + ressmap[index].rerr = terminateResource(ac, ressmap[index], appname, clustername) + ressmap[index].status = "done" + waitstr := fmt.Sprintf("wait on %v", ressmap[index].name) + for j := 0; j < len(ressmap); j++ { + if ressmap[j].depinfo == waitstr { + chans[j] <- j + } + } + wg.Done() + }(i) + } + wg.Wait() + for k := 0; k < len(ressmap); k++ { + if ressmap[k].rerr != nil { + return pkgerrors.Errorf("error during resources termination") + } + } + return nil } -func instantiateResources(ac appcontext.AppContext, ressmap []instMap, appname string, clustername string) error { - var wg sync.WaitGroup - var chans = make([]chan int, len(ressmap)) - for l := range chans { - chans[l] = make(chan int) - } - for i:=0; i<len(ressmap); i++ { - wg.Add(1) - go func(index int) { - if ressmap[index].depinfo == "go" { - ressmap[index].status = "start" - } else { - ressmap[index].status = "waiting" - c := <- chans[index] - if c != index { +func instantiateResources(ac appcontext.AppContext, ressmap []instMap, appname string, clustername string) error { + var wg sync.WaitGroup + var chans = make([]chan int, len(ressmap)) + cid, _ := ac.GetCompositeAppHandle() + + results := strings.Split(cid.(string), "/") + label := results[2] + "-" + appname + + for l := range chans { + chans[l] = make(chan int) + } + for i := 0; i < len(ressmap); i++ { + wg.Add(1) + go func(index int) { + if ressmap[index].depinfo == "go" { + ressmap[index].status = "start" + } else { + ressmap[index].status = "waiting" + c := <-chans[index] + if c != index { ressmap[index].status = "error" - ressmap[index].rerr = pkgerrors.Errorf("channel does not match") + ressmap[index].rerr = pkgerrors.Errorf("channel does not match") wg.Done() return - } - ressmap[index].status = "start" - } - ressmap[index].rerr = instantiateResource(ac, ressmap[index], appname, clustername) - ressmap[index].status = "done" - waitstr := fmt.Sprintf("wait on %v",ressmap[index].name) - for j:=0; j<len(ressmap); j++ { - if ressmap[j].depinfo == waitstr { - chans[j] <- j - } - } - wg.Done() - }(i) - } - wg.Wait() - for k:=0; k<len(ressmap); k++ { - if ressmap[k].rerr != nil { - return pkgerrors.Errorf("error during resources instantiation") - } - } - return nil + } + ressmap[index].status = "start" + } + ressmap[index].rerr = instantiateResource(ac, ressmap[index], appname, clustername, label) + ressmap[index].status = "done" + waitstr := fmt.Sprintf("wait on %v", ressmap[index].name) + for j := 0; j < len(ressmap); j++ { + if ressmap[j].depinfo == waitstr { + chans[j] <- j + } + } + wg.Done() + }(i) + } + wg.Wait() + for k := 0; k < len(ressmap); k++ { + if ressmap[k].rerr != nil { + return pkgerrors.Errorf("error during resources instantiation") + } + } + return nil } func terminateApp(ac appcontext.AppContext, appmap instMap) error { - for i:=0; i<len(appmap.clusters); i++ { + for i := 0; i < len(appmap.clusters); i++ { err := terminateResources(ac, appmap.clusters[i].ressmap, appmap.name, - appmap.clusters[i].name) + appmap.clusters[i].name) if err != nil { return err } @@ -281,38 +287,41 @@ func terminateApp(ac appcontext.AppContext, appmap instMap) error { } - func instantiateApp(ac appcontext.AppContext, appmap instMap) error { - for i:=0; i<len(appmap.clusters); i++ { + for i := 0; i < len(appmap.clusters); i++ { err := instantiateResources(ac, appmap.clusters[i].ressmap, appmap.name, - appmap.clusters[i].name) + appmap.clusters[i].name) if err != nil { return err } + err = status.StartClusterWatcher(appmap.clusters[i].name) + if err != nil { + log.Printf("Error starting Cluster Watcher %v: %v\n", appmap.clusters[i], err) + } } log.Println("Instantiation of app done: " + appmap.name) return nil } -func instantiateApps(ac appcontext.AppContext, appsmap []instMap) error { +func instantiateApps(ac appcontext.AppContext, appsmap []instMap) error { var wg sync.WaitGroup var chans = make([]chan int, len(appsmap)) for l := range chans { chans[l] = make(chan int) } - for i:=0; i<len(appsmap); i++ { + for i := 0; i < len(appsmap); i++ { wg.Add(1) - go func(index int) { - if appsmap[index].depinfo == "go" { + go func(index int) { + if appsmap[index].depinfo == "go" { appsmap[index].status = "start" } else { appsmap[index].status = "waiting" - c := <- chans[index] + c := <-chans[index] if c != index { appsmap[index].status = "error" - appsmap[index].rerr = pkgerrors.Errorf("channel does not match") + appsmap[index].rerr = pkgerrors.Errorf("channel does not match") wg.Done() return } @@ -320,17 +329,17 @@ func instantiateApps(ac appcontext.AppContext, appsmap []instMap) error { } appsmap[index].rerr = instantiateApp(ac, appsmap[index]) appsmap[index].status = "done" - waitstr := fmt.Sprintf("wait on %v",appsmap[index].name) - for j:=0; j<len(appsmap); j++ { + waitstr := fmt.Sprintf("wait on %v", appsmap[index].name) + for j := 0; j < len(appsmap); j++ { if appsmap[j].depinfo == waitstr { chans[j] <- j } } wg.Done() - }(i) - } + }(i) + } wg.Wait() - for k:=0; k<len(appsmap); k++ { + for k := 0; k < len(appsmap); k++ { if appsmap[k].rerr != nil { return pkgerrors.Errorf("error during apps instantiation") } @@ -343,45 +352,45 @@ func (instca *CompositeAppContext) InstantiateComApp(cid interface{}) error { ac := appcontext.AppContext{} _, err := ac.LoadAppContext(cid) - if err != nil { - return err - } + if err != nil { + return err + } instca.cid = cid appsorder, err := ac.GetAppInstruction("order") - if err != nil { - return err - } + if err != nil { + return err + } instca.appsorder = appsorder.(string) appsdependency, err := ac.GetAppInstruction("dependency") - if err != nil { - return err - } + if err != nil { + return err + } instca.appsdependency = appsdependency.(string) - instca.appsmap, err = getInstMap(instca.appsorder,instca.appsdependency, "app") - if err != nil { - return err - } + instca.appsmap, err = getInstMap(instca.appsorder, instca.appsdependency, "app") + if err != nil { + return err + } - for j:=0; j<len(instca.appsmap); j++ { + for j := 0; j < len(instca.appsmap); j++ { clusternames, err := ac.GetClusterNames(instca.appsmap[j].name) if err != nil { - return err + return err } instca.appsmap[j].clusters = make([]clusterInfo, len(clusternames)) - for k:=0; k<len(clusternames); k++ { + for k := 0; k < len(clusternames); k++ { instca.appsmap[j].clusters[k].name = clusternames[k] resorder, err := ac.GetResourceInstruction( - instca.appsmap[j].name, clusternames[k], "order") + instca.appsmap[j].name, clusternames[k], "order") if err != nil { - return err + return err } instca.appsmap[j].clusters[k].resorder = resorder.(string) resdependency, err := ac.GetResourceInstruction( - instca.appsmap[j].name, clusternames[k], "dependency") + instca.appsmap[j].name, clusternames[k], "dependency") if err != nil { - return err + return err } instca.appsmap[j].clusters[k].resdependency = resdependency.(string) @@ -389,36 +398,36 @@ func (instca *CompositeAppContext) InstantiateComApp(cid interface{}) error { instca.appsmap[j].clusters[k].resorder, instca.appsmap[j].clusters[k].resdependency, "res") if err != nil { - return err + return err } } } err = instantiateApps(ac, instca.appsmap) - if err != nil { - return err - } + if err != nil { + return err + } return nil } // Delete all the apps -func terminateApps(ac appcontext.AppContext, appsmap []instMap) error { +func terminateApps(ac appcontext.AppContext, appsmap []instMap) error { var wg sync.WaitGroup var chans = make([]chan int, len(appsmap)) for l := range chans { chans[l] = make(chan int) } - for i:=0; i<len(appsmap); i++ { + for i := 0; i < len(appsmap); i++ { wg.Add(1) - go func(index int) { - if appsmap[index].depinfo == "go" { + go func(index int) { + if appsmap[index].depinfo == "go" { appsmap[index].status = "start" } else { appsmap[index].status = "waiting" - c := <- chans[index] + c := <-chans[index] if c != index { appsmap[index].status = "error" - appsmap[index].rerr = pkgerrors.Errorf("channel does not match") + appsmap[index].rerr = pkgerrors.Errorf("channel does not match") wg.Done() return } @@ -426,17 +435,17 @@ func terminateApps(ac appcontext.AppContext, appsmap []instMap) error { } appsmap[index].rerr = terminateApp(ac, appsmap[index]) appsmap[index].status = "done" - waitstr := fmt.Sprintf("wait on %v",appsmap[index].name) - for j:=0; j<len(appsmap); j++ { + waitstr := fmt.Sprintf("wait on %v", appsmap[index].name) + for j := 0; j < len(appsmap); j++ { if appsmap[j].depinfo == waitstr { chans[j] <- j } } wg.Done() - }(i) - } + }(i) + } wg.Wait() - for k:=0; k<len(appsmap); k++ { + for k := 0; k < len(appsmap); k++ { if appsmap[k].rerr != nil { return pkgerrors.Errorf("error during apps instantiation") } @@ -444,50 +453,51 @@ func terminateApps(ac appcontext.AppContext, appsmap []instMap) error { return nil } + // Delete all the resources for a given context func (instca *CompositeAppContext) TerminateComApp(cid interface{}) error { ac := appcontext.AppContext{} _, err := ac.LoadAppContext(cid) - if err != nil { - return err - } + if err != nil { + return err + } instca.cid = cid appsorder, err := ac.GetAppInstruction("order") - if err != nil { - return err - } + if err != nil { + return err + } instca.appsorder = appsorder.(string) appsdependency, err := ac.GetAppInstruction("dependency") - if err != nil { - return err - } + if err != nil { + return err + } instca.appsdependency = appsdependency.(string) - instca.appsmap, err = getInstMap(instca.appsorder,instca.appsdependency, "app") - if err != nil { - return err - } + instca.appsmap, err = getInstMap(instca.appsorder, instca.appsdependency, "app") + if err != nil { + return err + } - for j:=0; j<len(instca.appsmap); j++ { + for j := 0; j < len(instca.appsmap); j++ { clusternames, err := ac.GetClusterNames(instca.appsmap[j].name) if err != nil { - return err + return err } instca.appsmap[j].clusters = make([]clusterInfo, len(clusternames)) - for k:=0; k<len(clusternames); k++ { + for k := 0; k < len(clusternames); k++ { instca.appsmap[j].clusters[k].name = clusternames[k] resorder, err := ac.GetResourceInstruction( - instca.appsmap[j].name, clusternames[k], "order") + instca.appsmap[j].name, clusternames[k], "order") if err != nil { - return err + return err } instca.appsmap[j].clusters[k].resorder = resorder.(string) resdependency, err := ac.GetResourceInstruction( - instca.appsmap[j].name, clusternames[k], "dependency") + instca.appsmap[j].name, clusternames[k], "dependency") if err != nil { - return err + return err } instca.appsmap[j].clusters[k].resdependency = resdependency.(string) @@ -495,14 +505,14 @@ func (instca *CompositeAppContext) TerminateComApp(cid interface{}) error { instca.appsmap[j].clusters[k].resorder, instca.appsmap[j].clusters[k].resdependency, "res") if err != nil { - return err + return err } } } err = terminateApps(ac, instca.appsmap) - if err != nil { - return err - } + if err != nil { + return err + } return nil diff --git a/src/rsync/pkg/resource/resource.go b/src/rsync/pkg/resource/resource.go index 8b45c341..2877e2a3 100644 --- a/src/rsync/pkg/resource/resource.go +++ b/src/rsync/pkg/resource/resource.go @@ -20,16 +20,15 @@ import ( "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/runtime/schema" - utils "github.com/onap/multicloud-k8s/src/rsync/pkg/internal" - "github.com/onap/multicloud-k8s/src/rsync/pkg/internal/config" "github.com/onap/multicloud-k8s/src/rsync/pkg/connector" + utils "github.com/onap/multicloud-k8s/src/rsync/pkg/internal" ) type Resource struct { } // Create deployment object in a specific Kubernetes cluster -func (r Resource) Create(data string, namespace string, client connector.KubernetesConnector) (string, error) { +func (r Resource) Create(data string, namespace string, label string, client connector.KubernetesConnector) (string, error) { if namespace == "" { namespace = "default" } @@ -57,13 +56,15 @@ func (r Resource) Create(data string, namespace string, client connector.Kuberne if labels == nil { labels = map[string]string{} } - labels[config.GetConfiguration().KubernetesLabelName] = client.GetInstanceID() + //labels[config.GetConfiguration().KubernetesLabelName] = client.GetInstanceID() + labels["emco/deployment-id"] = label unstruct.SetLabels(labels) // This checks if the resource we are creating has a podSpec in it // Eg: Deployment, StatefulSet, Job etc.. // If a PodSpec is found, the label will be added to it too. - connector.TagPodsIfPresent(unstruct, client.GetInstanceID()) + //connector.TagPodsIfPresent(unstruct, client.GetInstanceID()) + connector.TagPodsIfPresent(unstruct, label) gvr := mapping.Resource var createdObj *unstructured.Unstructured @@ -86,44 +87,44 @@ func (r Resource) Create(data string, namespace string, client connector.Kuberne // Delete an existing resource hosted in a specific Kubernetes cluster func (r Resource) Delete(data string, resname string, namespace string, client connector.KubernetesConnector) error { - if namespace == "" { - namespace = "default" - } - - //Decode the yaml file to create a runtime.Object - unstruct := &unstructured.Unstructured{} - //Ignore the returned obj as we expect the data in unstruct - _, err := utils.DecodeYAMLData(data, unstruct) - if err != nil { - return pkgerrors.Wrap(err, "Decode deployment object error") - } - - dynClient := client.GetDynamicClient() - mapper := client.GetMapper() - - gvk := unstruct.GroupVersionKind() - mapping, err := mapper.RESTMapping(schema.GroupKind{Group: gvk.Group, Kind: gvk.Kind}, gvk.Version) - if err != nil { - return pkgerrors.Wrap(err, "Mapping kind to resource error") - } - - gvr := mapping.Resource - deletePolicy := metav1.DeletePropagationForeground - opts := &metav1.DeleteOptions{ - PropagationPolicy: &deletePolicy, - } - - switch mapping.Scope.Name() { - case meta.RESTScopeNameNamespace: - err = dynClient.Resource(gvr).Namespace(namespace).Delete(resname, opts) - case meta.RESTScopeNameRoot: - err = dynClient.Resource(gvr).Delete(resname, opts) - default: - return pkgerrors.New("Got an unknown RESTSCopeName for mappin") - } - - if err != nil { - return pkgerrors.Wrap(err, "Delete object error") - } - return nil + if namespace == "" { + namespace = "default" + } + + //Decode the yaml file to create a runtime.Object + unstruct := &unstructured.Unstructured{} + //Ignore the returned obj as we expect the data in unstruct + _, err := utils.DecodeYAMLData(data, unstruct) + if err != nil { + return pkgerrors.Wrap(err, "Decode deployment object error") + } + + dynClient := client.GetDynamicClient() + mapper := client.GetMapper() + + gvk := unstruct.GroupVersionKind() + mapping, err := mapper.RESTMapping(schema.GroupKind{Group: gvk.Group, Kind: gvk.Kind}, gvk.Version) + if err != nil { + return pkgerrors.Wrap(err, "Mapping kind to resource error") + } + + gvr := mapping.Resource + deletePolicy := metav1.DeletePropagationForeground + opts := &metav1.DeleteOptions{ + PropagationPolicy: &deletePolicy, + } + + switch mapping.Scope.Name() { + case meta.RESTScopeNameNamespace: + err = dynClient.Resource(gvr).Namespace(namespace).Delete(resname, opts) + case meta.RESTScopeNameRoot: + err = dynClient.Resource(gvr).Delete(resname, opts) + default: + return pkgerrors.New("Got an unknown RESTSCopeName for mappin") + } + + if err != nil { + return pkgerrors.Wrap(err, "Delete object error") + } + return nil } diff --git a/src/rsync/pkg/status/status.go b/src/rsync/pkg/status/status.go new file mode 100644 index 00000000..351da027 --- /dev/null +++ b/src/rsync/pkg/status/status.go @@ -0,0 +1,222 @@ +/* + * Copyright 2020 Intel Corporation, Inc + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package status + +import ( + "encoding/base64" + "encoding/json" + "fmt" + "strings" + "sync" + + pkgerrors "github.com/pkg/errors" + "github.com/sirupsen/logrus" + + "github.com/onap/multicloud-k8s/src/clm/pkg/cluster" + v1alpha1 "github.com/onap/multicloud-k8s/src/monitor/pkg/apis/k8splugin/v1alpha1" + clientset "github.com/onap/multicloud-k8s/src/monitor/pkg/generated/clientset/versioned" + informers "github.com/onap/multicloud-k8s/src/monitor/pkg/generated/informers/externalversions" + appcontext "github.com/onap/multicloud-k8s/src/orchestrator/pkg/appcontext" + "k8s.io/client-go/tools/cache" + "k8s.io/client-go/tools/clientcmd" +) + +type channelManager struct { + channels map[string]chan struct{} + sync.Mutex +} + +var channelData channelManager + +const monitorLabel = "emco/deployment-id" + +// HandleStatusUpdate for an application in a cluster +// TODO: Add code for specific handling +func HandleStatusUpdate(clusterId string, id string, v *v1alpha1.ResourceBundleState) { + //status := v.Status.ServiceStatuses + //podStatus := v.Status.PodStatuses + + // Get the contextId from the label (id) + result := strings.SplitN(id, "-", 2) + if result[0] == "" { + logrus.Info(clusterId, "::label is missing an appcontext identifier::", id) + return + } + + if len(result) != 2 { + logrus.Info(clusterId, "::invalid label format::", id) + return + } + + // Get the app from the label (id) + if result[1] == "" { + logrus.Info(clusterId, "::label is missing an app identifier::", id) + return + } + + // Look up the contextId + var ac appcontext.AppContext + _, err := ac.LoadAppContext(result[0]) + if err != nil { + logrus.Info(clusterId, "::App context not found::", result[0], "::Error::", err) + return + } + + // produce yaml representation of the status + vjson, err := json.Marshal(v.Status) + if err != nil { + logrus.Info(clusterId, "::Error marshalling status information::", err) + return + } + + // Get the handle for the context/app/cluster status object + handle, err := ac.GetStatusHandle(result[1], clusterId) + if err != nil { + // Expected first time + logrus.Info(clusterId, "::Status context handle not found::", id, "::Error::", err) + } + + // If status handle was not found, then create the status object in the appcontext + if handle == nil { + chandle, err := ac.GetClusterHandle(result[1], clusterId) + if err != nil { + logrus.Info(clusterId, "::Cluster context handle not found::", id, "::Error::", err) + } else { + ac.AddStatus(chandle, string(vjson)) + } + } else { + ac.UpdateStatusValue(handle, string(vjson)) + } + + return +} + +// StartClusterWatcher watches for CR +// configBytes - Kubectl file data +func StartClusterWatcher(clusterId string) error { + + configBytes, err := getKubeConfig(clusterId) + if err != nil { + return err + } + + //key := provider + "+" + name + // Get the lock + channelData.Lock() + defer channelData.Unlock() + // For first time + if channelData.channels == nil { + channelData.channels = make(map[string]chan struct{}) + } + _, ok := channelData.channels[clusterId] + if !ok { + // Create Channel + channelData.channels[clusterId] = make(chan struct{}) + // Create config + config, err := clientcmd.RESTConfigFromKubeConfig(configBytes) + if err != nil { + logrus.Info(fmt.Sprintf("RESTConfigFromKubeConfig error: %s", err.Error())) + return pkgerrors.Wrap(err, "RESTConfigFromKubeConfig error") + } + k8sClient, err := clientset.NewForConfig(config) + if err != nil { + return pkgerrors.Wrap(err, "Clientset NewForConfig error") + } + // Create Informer + mInformerFactory := informers.NewSharedInformerFactory(k8sClient, 0) + mInformer := mInformerFactory.K8splugin().V1alpha1().ResourceBundleStates().Informer() + go scheduleStatus(clusterId, channelData.channels[clusterId], mInformer) + } + return nil +} + +// StopClusterWatcher stop watching a cluster +func StopClusterWatcher(clusterId string) { + //key := provider + "+" + name + if channelData.channels != nil { + c, ok := channelData.channels[clusterId] + if ok { + close(c) + } + } +} + +// CloseAllClusterWatchers close all channels +func CloseAllClusterWatchers() { + if channelData.channels == nil { + return + } + // Close all Channels to stop all watchers + for _, e := range channelData.channels { + close(e) + } +} + +// Per Cluster Go routine to watch CR +func scheduleStatus(clusterId string, c <-chan struct{}, s cache.SharedIndexInformer) { + handlers := cache.ResourceEventHandlerFuncs{ + AddFunc: func(obj interface{}) { + v, ok := obj.(*v1alpha1.ResourceBundleState) + if ok { + labels := v.GetLabels() + l, ok := labels[monitorLabel] + if ok { + HandleStatusUpdate(clusterId, l, v) + } + } + }, + UpdateFunc: func(oldObj, obj interface{}) { + v, ok := obj.(*v1alpha1.ResourceBundleState) + if ok { + labels := v.GetLabels() + l, ok := labels[monitorLabel] + if ok { + HandleStatusUpdate(clusterId, l, v) + } + } + }, + DeleteFunc: func(obj interface{}) { + // Ignore it + }, + } + s.AddEventHandler(handlers) + s.Run(c) +} + +// getKubeConfig uses the connectivity client to get the kubeconfig based on the name +// of the clustername. This is written out to a file. +// TODO - consolidate with other rsync methods to get kubeconfig files +func getKubeConfig(clustername string) ([]byte, error) { + + if !strings.Contains(clustername, "+") { + return nil, pkgerrors.New("Not a valid cluster name") + } + strs := strings.Split(clustername, "+") + if len(strs) != 2 { + return nil, pkgerrors.New("Not a valid cluster name") + } + kubeConfig, err := cluster.NewClusterClient().GetClusterContent(strs[0], strs[1]) + if err != nil { + return nil, pkgerrors.New("Get kubeconfig failed") + } + + dec, err := base64.StdEncoding.DecodeString(kubeConfig.Kubeconfig) + if err != nil { + return nil, err + } + return dec, nil +} |