diff options
26 files changed, 834 insertions, 312 deletions
diff --git a/kud/tests/cleanup-composite-vfw.sh b/deployments/kubernetes/cleanup-emco.sh index 7f96e8ac..a8aef470 100644..100755 --- a/kud/tests/cleanup-composite-vfw.sh +++ b/deployments/kubernetes/cleanup-emco.sh @@ -14,8 +14,3 @@ kubectl -n onap4k8s delete configmap orchestrator kubectl -n onap4k8s delete configmap ncm kubectl -n onap4k8s delete configmap ovnaction kubectl -n onap4k8s delete configmap rsync - -# delete the networks -kubectl delete network protected-private-net -kubectl delete providernetwork emco-private-net -kubectl delete providernetwork unprotected-private-net diff --git a/kud/tests/README-composite-vfw.txt b/kud/tests/README-composite-vfw.txt index d2018c09..3f334a25 100644 --- a/kud/tests/README-composite-vfw.txt +++ b/kud/tests/README-composite-vfw.txt @@ -7,7 +7,29 @@ As written, the vfw-test.sh script assumes 3 clusters The edge cluster in which vFW will be instantiated should be KUD clusters. -# Preparations +# Edge cluster preparation + +For status monitoring support, the 'monitor' docker image must be built and +deployed. + +In multicloud-k8s repo: + cd multicloud-k8s/src/monitor + docker build -f build/Dockerfile . -t monitor + <tag and push docker image to dockerhub ...> + +Deploy monitor program in each cluster (assumes multicloud-k8s repo is present in cloud) + # one time setup per cluster - install the CRD + cd multicloud-k8s/src/monitor/deploy/crds + kubectl apply -f crds/k8splugin_v1alpha1_resourcebundlestate_crd.yaml + + # one time setup per cluster + # update yaml files with correct image + # (cleanup first, if monitor was already installed - see monitor-cleanup.sh) + cd multicloud-k8s/src/monitor/deploy + monitor-deploy.sh + + +# Preparation of the vFW Composit Application ## Prepare the Composite vFW Application Charts and Profiles diff --git a/kud/tests/vfw-test-clean-cluster.sh b/kud/tests/vfw-test-clean-cluster.sh index 3d386466..153924ca 100644..100755 --- a/kud/tests/vfw-test-clean-cluster.sh +++ b/kud/tests/vfw-test-clean-cluster.sh @@ -1,6 +1,17 @@ +#!/bin/bash + +# script to delete vfw resources (until terminate is completed) kubectl delete deploy fw0-packetgen kubectl delete deploy fw0-firewall kubectl delete deploy fw0-sink kubectl delete service packetgen-service kubectl delete service sink-service kubectl delete configmap sink-configmap + +kubectl delete network protected-private-net +kubectl delete providernetwork emco-private-net +kubectl delete providernetwork unprotected-private-net + +for i in `kubectl get resourcebundlestate --template '{{range .items}}{{.metadata.name}}{{"\n"}}{{end}}'`; do + kubectl delete resourcebundlestate $i +done diff --git a/kud/tests/vfw-test.sh b/kud/tests/vfw-test.sh index 2bdddcd7..9c431ef0 100755 --- a/kud/tests/vfw-test.sh +++ b/kud/tests/vfw-test.sh @@ -969,6 +969,9 @@ function instantiateVfw { call_api -d "{ }" "${base_url_orchestrator}/projects/${projectname}/composite-apps/${vfw_compositeapp_name}/${vfw_compositeapp_version}/deployment-intent-groups/${deployment_intent_group_name}/instantiate" } +function statusVfw { + call_api "${base_url_orchestrator}/projects/${projectname}/composite-apps/${vfw_compositeapp_name}/${vfw_compositeapp_version}/deployment-intent-groups/${deployment_intent_group_name}/status" +} function usage { echo "Usage: $0 create|get|delete|apply|terminate|instantiate" @@ -986,12 +989,14 @@ function usage { echo " delete - deletes all resources in ncm, ovnaction, clm resources created for vfw" echo " apply - applys the network intents - e.g. networks created in ncm" echo " instantiate - approves and instantiates the composite app via the generic deployment intent" + echo " status - get status of deployed resources" echo " terminate - remove the network inents created by ncm" echo "" echo " a reasonable test sequence:" echo " 1. create" echo " 2. apply" echo " 3. instantiate" + echo " 4. status" exit } @@ -1050,5 +1055,6 @@ case "$1" in "apply" ) applyNcmData ;; "terminate" ) terminateNcmData ;; "instantiate" ) instantiateVfw ;; + "status" ) statusVfw ;; *) usage ;; esac diff --git a/src/monitor/build/Dockerfile b/src/monitor/build/Dockerfile index 812eb47e..9ecff169 100644 --- a/src/monitor/build/Dockerfile +++ b/src/monitor/build/Dockerfile @@ -1,15 +1,16 @@ -FROM registry.access.redhat.com/ubi7/ubi-minimal:latest +FROM golang:1.14.1 -ENV OPERATOR=/usr/local/bin/monitor \ - USER_UID=1001 \ - USER_NAME=monitor +WORKDIR /go/src/github.com/onap/multicloud-k8s/src/monitor +COPY ./ ./ +RUN make all -# install operator binary -COPY _output/bin/monitor ${OPERATOR} +FROM ubuntu:16.04 -COPY bin /usr/local/bin -RUN /usr/local/bin/user_setup +WORKDIR /opt/monitor +RUN groupadd -r monitor && useradd -r -g monitor monitor +RUN chown monitor:monitor /opt/monitor -R +COPY --chown=monitor --from=0 /go/src/github.com/onap/multicloud-k8s/src/monitor/monitor ./ -ENTRYPOINT ["/usr/local/bin/entrypoint"] +USER monitor +ENTRYPOINT ["/opt/monitor/monitor"] -USER ${USER_UID} diff --git a/src/monitor/deploy/cluster_role.yaml b/src/monitor/deploy/cluster_role.yaml new file mode 100644 index 00000000..0732e8d3 --- /dev/null +++ b/src/monitor/deploy/cluster_role.yaml @@ -0,0 +1,72 @@ +apiVersion: rbac.authorization.k8s.io/v1 +kind: ClusterRole +metadata: + creationTimestamp: null + name: monitor +rules: +- apiGroups: + - "" + resources: + - pods + - services + - endpoints + - persistentvolumeclaims + - events + - configmaps + - secrets + verbs: + - '*' +- apiGroups: + - apps + resources: + - deployments + - daemonsets + - replicasets + - statefulsets + verbs: + - '*' +- apiGroups: + - monitoring.coreos.com + resources: + - servicemonitors + verbs: + - get + - create +- apiGroups: + - apps + resourceNames: + - monitor + resources: + - deployments/finalizers + verbs: + - update +- apiGroups: + - "" + resources: + - pods + verbs: + - get +- apiGroups: + - apps + resources: + - replicasets + verbs: + - get +- apiGroups: + - k8splugin.io + resources: + - '*' + verbs: + - '*' +- apiGroups: + - batch + resources: + - '*' + verbs: + - '*' +- apiGroups: + - extensions + resources: + - '*' + verbs: + - '*' diff --git a/src/monitor/deploy/clusterrole_binding.yaml b/src/monitor/deploy/clusterrole_binding.yaml new file mode 100644 index 00000000..73e74039 --- /dev/null +++ b/src/monitor/deploy/clusterrole_binding.yaml @@ -0,0 +1,12 @@ +kind: ClusterRoleBinding +apiVersion: rbac.authorization.k8s.io/v1 +metadata: + name: monitor +subjects: +- kind: ServiceAccount + name: monitor + namespace: default +roleRef: + kind: ClusterRole + name: monitor + apiGroup: rbac.authorization.k8s.io diff --git a/src/monitor/deploy/monitor-cleanup.sh b/src/monitor/deploy/monitor-cleanup.sh new file mode 100755 index 00000000..21725073 --- /dev/null +++ b/src/monitor/deploy/monitor-cleanup.sh @@ -0,0 +1,6 @@ +kubectl delete rolebinding monitor +kubectl delete clusterrolebinding monitor +kubectl delete role monitor +kubectl delete clusterrole monitor +kubectl delete serviceaccount monitor +kubectl delete deploy monitor diff --git a/src/monitor/deploy/monitor-deploy.sh b/src/monitor/deploy/monitor-deploy.sh new file mode 100755 index 00000000..47c7120f --- /dev/null +++ b/src/monitor/deploy/monitor-deploy.sh @@ -0,0 +1,6 @@ +kubectl apply -f role.yaml +kubectl apply -f cluster_role.yaml +kubectl apply -f role_binding.yaml +kubectl apply -f clusterrole_binding.yaml +kubectl apply -f service_account.yaml +kubectl apply -f operator.yaml diff --git a/src/monitor/deploy/operator.yaml b/src/monitor/deploy/operator.yaml index a06c07d3..93e4522c 100644 --- a/src/monitor/deploy/operator.yaml +++ b/src/monitor/deploy/operator.yaml @@ -3,30 +3,28 @@ kind: Deployment metadata: name: monitor labels: - "emco/deployment-id": "bionic-beaver" + "emco/deployment-id": "monitor" spec: replicas: 1 selector: matchLabels: - "emco/deployment-id": "bionic-beaver" + "emco/deployment-id": "monitor" template: metadata: labels: - "emco/deployment-id": "bionic-beaver" + "emco/deployment-id": "monitor" spec: serviceAccountName: monitor containers: - name: monitor # Replace this with the built image name - image: k8splugin.io/monitor:latest + image: ewmduck/monitor:latest command: - - monitor + - /opt/monitor/monitor imagePullPolicy: IfNotPresent env: - name: WATCH_NAMESPACE - valueFrom: - fieldRef: - fieldPath: metadata.namespace + value: "" - name: POD_NAME valueFrom: fieldRef: diff --git a/src/monitor/deploy/role.yaml b/src/monitor/deploy/role.yaml index 4d0fd1b6..c48141ac 100644 --- a/src/monitor/deploy/role.yaml +++ b/src/monitor/deploy/role.yaml @@ -58,3 +58,15 @@ rules: - '*' verbs: - '*' +- apiGroups: + - batch + resources: + - '*' + verbs: + - '*' +- apiGroups: + - extensions + resources: + - '*' + verbs: + - '*' diff --git a/src/monitor/go.mod b/src/monitor/go.mod index ec48d268..6eff59af 100644 --- a/src/monitor/go.mod +++ b/src/monitor/go.mod @@ -32,6 +32,4 @@ replace ( sigs.k8s.io/controller-tools => sigs.k8s.io/controller-tools v0.1.11-0.20190411181648-9d55346c2bde ) -// Remove hg dependency using this mirror replace github.com/operator-framework/operator-sdk => github.com/operator-framework/operator-sdk v0.9.0 - diff --git a/src/monitor/pkg/apis/k8splugin/v1alpha1/types.go b/src/monitor/pkg/apis/k8splugin/v1alpha1/types.go index 231f226e..064591fc 100644 --- a/src/monitor/pkg/apis/k8splugin/v1alpha1/types.go +++ b/src/monitor/pkg/apis/k8splugin/v1alpha1/types.go @@ -15,8 +15,8 @@ import ( // +kubebuilder:subresource:status // +genclient type ResourceBundleState struct { - metav1.TypeMeta `json:",inline"` - metav1.ObjectMeta `json:"metadata,omitempty" protobuf:"bytes,1,opt,name=metadata"` + metav1.TypeMeta `json:",inline" yaml:",inline"` + metav1.ObjectMeta `json:"metadata,omitempty" protobuf:"bytes,1,opt,name=metadata" yaml:"metadata"` Spec ResourceBundleStateSpec `json:"spec,omitempty" protobuf:"bytes,2,opt,name=spec"` Status ResourceBundleStatus `json:"status,omitempty" protobuf:"bytes,3,opt,name=status"` diff --git a/src/orchestrator/api/api.go b/src/orchestrator/api/api.go index 2470a1be..5abbb96d 100644 --- a/src/orchestrator/api/api.go +++ b/src/orchestrator/api/api.go @@ -180,6 +180,8 @@ func NewRouter(projectClient moduleLib.ProjectManager, } router.HandleFunc("/projects/{project-name}/composite-apps/{composite-app-name}/{composite-app-version}/deployment-intent-groups/{deployment-intent-group-name}/instantiate", instantiationHandler.instantiateHandler).Methods("POST") + router.HandleFunc("/projects/{project-name}/composite-apps/{composite-app-name}/{composite-app-version}/deployment-intent-groups/{deployment-intent-group-name}/terminate", instantiationHandler.terminateHandler).Methods("POST") + router.HandleFunc("/projects/{project-name}/composite-apps/{composite-app-name}/{composite-app-version}/deployment-intent-groups/{deployment-intent-group-name}/status", instantiationHandler.statusHandler).Methods("GET") return router } diff --git a/src/orchestrator/api/instantiation_handler.go b/src/orchestrator/api/instantiation_handler.go index c95785f2..ce50e5b8 100644 --- a/src/orchestrator/api/instantiation_handler.go +++ b/src/orchestrator/api/instantiation_handler.go @@ -17,9 +17,11 @@ package api import ( + "encoding/json" + "net/http" + "github.com/gorilla/mux" moduleLib "github.com/onap/multicloud-k8s/src/orchestrator/pkg/module" - "net/http" ) /* Used to store backend implementation objects @@ -45,3 +47,45 @@ func (h instantiationHandler) instantiateHandler(w http.ResponseWriter, r *http. w.WriteHeader(http.StatusAccepted) } + +func (h instantiationHandler) terminateHandler(w http.ResponseWriter, r *http.Request) { + + vars := mux.Vars(r) + p := vars["project-name"] + ca := vars["composite-app-name"] + v := vars["composite-app-version"] + di := vars["deployment-intent-group-name"] + + iErr := h.client.Terminate(p, ca, v, di) + if iErr != nil { + http.Error(w, iErr.Error(), http.StatusInternalServerError) + return + } + w.WriteHeader(http.StatusAccepted) + +} + +func (h instantiationHandler) statusHandler(w http.ResponseWriter, r *http.Request) { + + vars := mux.Vars(r) + p := vars["project-name"] + ca := vars["composite-app-name"] + v := vars["composite-app-version"] + di := vars["deployment-intent-group-name"] + + status, iErr := h.client.Status(p, ca, v, di) + if iErr != nil { + http.Error(w, iErr.Error(), http.StatusInternalServerError) + return + } + + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(http.StatusOK) + iErr = json.NewEncoder(w).Encode(status) + if iErr != nil { + http.Error(w, iErr.Error(), http.StatusInternalServerError) + return + } + w.WriteHeader(http.StatusAccepted) + +} diff --git a/src/orchestrator/go.mod b/src/orchestrator/go.mod index 223dc068..3f14f00b 100644 --- a/src/orchestrator/go.mod +++ b/src/orchestrator/go.mod @@ -2,17 +2,12 @@ module github.com/onap/multicloud-k8s/src/orchestrator require ( github.com/MakeNowJust/heredoc v1.0.0 // indirect - github.com/Masterminds/goutils v1.1.0 // indirect github.com/Masterminds/semver v1.5.0 // indirect github.com/Masterminds/sprig v2.22.0+incompatible // indirect github.com/chai2010/gettext-go v0.0.0-20170215093142-bf70f2a70fb1 github.com/coreos/etcd v3.3.12+incompatible - github.com/cyphar/filepath-securejoin v0.2.2 // indirect github.com/docker/docker v1.13.1 // indirect - github.com/docker/spdystream v0.0.0-20181023171402-6480d4af844c // indirect - github.com/exponent-io/jsonpath v0.0.0-20151013193312-d6023ce2651d // indirect github.com/ghodss/yaml v1.0.0 - github.com/gobwas/glob v0.2.3 // indirect github.com/golang/protobuf v1.4.1 github.com/gorilla/handlers v1.3.0 github.com/gorilla/mux v1.7.3 @@ -21,34 +16,32 @@ require ( github.com/lib/pq v1.6.0 // indirect github.com/liggitt/tabwriter v0.0.0-20181228230101-89fcab3d43de // indirect github.com/mitchellh/copystructure v1.0.0 // indirect - github.com/mitchellh/go-wordwrap v1.0.0 // indirect github.com/onap/multicloud-k8s/src/clm v0.0.0-00010101000000-000000000000 + github.com/onap/multicloud-k8s/src/monitor v0.0.0-20200630152613-7c20f73e7c5d github.com/onap/multicloud-k8s/src/ncm v0.0.0-20200515060444-c77850a75eee + github.com/onap/multicloud-k8s/src/rsync v0.0.0-20200630152613-7c20f73e7c5d github.com/pkg/errors v0.8.1 github.com/rubenv/sql-migrate v0.0.0-20200429072036-ae26b214fa43 // indirect github.com/russross/blackfriday v1.5.2 github.com/sirupsen/logrus v1.4.2 github.com/spf13/cobra v1.0.0 // indirect - github.com/technosophos/moniker v0.0.0-20180509230615-a5dbd03a2245 // indirect go.etcd.io/etcd v3.3.12+incompatible go.mongodb.org/mongo-driver v1.0.0 golang.org/x/net v0.0.0-20200301022130-244492dfa37a google.golang.org/grpc v1.27.1 google.golang.org/protobuf v1.24.0 gopkg.in/square/go-jose.v2 v2.5.1 // indirect + gopkg.in/yaml.v2 v2.2.8 gopkg.in/yaml.v3 v3.0.0-20200506231410-2ff61e1afc86 - k8s.io/apiextensions-apiserver v0.0.0-00010101000000-000000000000 // indirect k8s.io/apimachinery v0.0.0-20190831074630-461753078381 - k8s.io/apiserver v0.0.0-00010101000000-000000000000 // indirect - k8s.io/cli-runtime v0.0.0-00010101000000-000000000000 // indirect k8s.io/cloud-provider v0.0.0-00010101000000-000000000000 // indirect k8s.io/helm v2.14.3+incompatible sigs.k8s.io/kustomize v2.0.3+incompatible // indirect - vbom.ml/util v0.0.0-20180919145318-efcd4e0f9787 // indirect ) replace ( github.com/onap/multicloud-k8s/src/clm => ../clm + github.com/onap/multicloud-k8s/src/monitor => ../monitor k8s.io/api => k8s.io/api v0.0.0-20190409021203-6e4e0e4f393b k8s.io/apiextensions-apiserver => k8s.io/apiextensions-apiserver v0.0.0-20190409022649-727a075fdec8 k8s.io/apimachinery => k8s.io/apimachinery v0.0.0-20190404173353-6a84e37a896d diff --git a/src/orchestrator/pkg/appcontext/appcontext.go b/src/orchestrator/pkg/appcontext/appcontext.go index a847ae32..cdf23bfa 100644 --- a/src/orchestrator/pkg/appcontext/appcontext.go +++ b/src/orchestrator/pkg/appcontext/appcontext.go @@ -18,10 +18,11 @@ package appcontext import ( "fmt" + "strings" + log "github.com/onap/multicloud-k8s/src/orchestrator/pkg/infra/logutils" "github.com/onap/multicloud-k8s/src/orchestrator/pkg/rtcontext" pkgerrors "github.com/pkg/errors" - "strings" ) // metaPrefix used for denoting clusterMeta level @@ -413,6 +414,59 @@ func (ac *AppContext) GetResourceInstruction(appname string, clustername string, return v, nil } +//AddStatus for holding status of all resources under app and cluster +// handle should be a cluster handle +func (ac *AppContext) AddStatus(handle interface{}, value interface{}) (interface{}, error) { + h, err := ac.rtc.RtcAddStatus(handle, value) + if err != nil { + return nil, err + } + log.Info(":: Added status handle ::", log.Fields{"StatusHandler": h}) + + return h, nil +} + +//DeleteStatus for the given the handle +func (ac *AppContext) DeleteStatus(handle interface{}) error { + err := ac.rtc.RtcDeletePair(handle) + if err != nil { + return err + } + return nil +} + +//Return the handle for status for a given app and cluster +func (ac *AppContext) GetStatusHandle(appname string, clustername string) (interface{}, error) { + if appname == "" { + return nil, pkgerrors.Errorf("Not a valid run time context app name") + } + if clustername == "" { + return nil, pkgerrors.Errorf("Not a valid run time context cluster name") + } + + rh, err := ac.rtc.RtcGet() + if err != nil { + return nil, err + } + + acrh := fmt.Sprintf("%v", rh) + "app/" + appname + "/cluster/" + clustername + "/status/" + hs, err := ac.rtc.RtcGetHandles(acrh) + if err != nil { + return nil, err + } + for _, v := range hs { + if v == acrh { + return v, nil + } + } + return nil, pkgerrors.Errorf("No handle was found for the given resource") +} + +//UpdateStatusValue updates the status value with the given handle +func (ac *AppContext) UpdateStatusValue(handle interface{}, value interface{}) error { + return ac.rtc.RtcUpdateValue(handle, value) +} + //Return all the handles under the composite app func (ac *AppContext) GetAllHandles(handle interface{}) ([]interface{}, error) { hs, err := ac.rtc.RtcGetHandles(handle) diff --git a/src/orchestrator/pkg/appcontext/appcontext_test.go b/src/orchestrator/pkg/appcontext/appcontext_test.go index 05c73703..92c43113 100644 --- a/src/orchestrator/pkg/appcontext/appcontext_test.go +++ b/src/orchestrator/pkg/appcontext/appcontext_test.go @@ -145,6 +145,10 @@ func (c *MockRunTimeContext) RtcUpdateValue(handle interface{}, value interface{ return c.Err } +func (rtc *MockRunTimeContext) RtcAddStatus(handle interface{}, value interface{}) (interface{}, error) { + return nil, nil +} + func TestCreateCompositeApp(t *testing.T) { var ac = AppContext{} testCases := []struct { diff --git a/src/orchestrator/pkg/module/deployment_intent_groups.go b/src/orchestrator/pkg/module/deployment_intent_groups.go index 16a14c7b..3412a034 100644 --- a/src/orchestrator/pkg/module/deployment_intent_groups.go +++ b/src/orchestrator/pkg/module/deployment_intent_groups.go @@ -20,6 +20,7 @@ import ( "encoding/json" "reflect" + appcontext "github.com/onap/multicloud-k8s/src/orchestrator/pkg/appcontext" "github.com/onap/multicloud-k8s/src/orchestrator/pkg/infra/db" pkgerrors "github.com/pkg/errors" @@ -61,6 +62,7 @@ type OverrideValues struct { type DeploymentIntentGroupManager interface { CreateDeploymentIntentGroup(d DeploymentIntentGroup, p string, ca string, v string) (DeploymentIntentGroup, error) GetDeploymentIntentGroup(di string, p string, ca string, v string) (DeploymentIntentGroup, error) + GetDeploymentIntentGroupContext(di string, p string, ca string, v string) (appcontext.AppContext, error) DeleteDeploymentIntentGroup(di string, p string, ca string, v string) error } @@ -86,6 +88,7 @@ func (dk DeploymentIntentGroupKey) String() string { type DeploymentIntentGroupClient struct { storeName string tagMetaData string + tagContext string } // NewDeploymentIntentGroupClient return an instance of DeploymentIntentGroupClient which implements DeploymentIntentGroupManager @@ -93,6 +96,7 @@ func NewDeploymentIntentGroupClient() *DeploymentIntentGroupClient { return &DeploymentIntentGroupClient{ storeName: "orchestrator", tagMetaData: "deploymentintentgroupmetadata", + tagContext: "contextid", } } @@ -160,6 +164,34 @@ func (c *DeploymentIntentGroupClient) GetDeploymentIntentGroup(di string, p stri } +// GetDeploymentIntentGroup returns the DeploymentIntentGroup with a given name, project, compositeApp and version of compositeApp +func (c *DeploymentIntentGroupClient) GetDeploymentIntentGroupContext(di string, p string, ca string, v string) (appcontext.AppContext, error) { + + key := DeploymentIntentGroupKey{ + Name: di, + Project: p, + CompositeApp: ca, + Version: v, + } + + result, err := db.DBconn.Find(c.storeName, key, c.tagContext) + if err != nil { + return appcontext.AppContext{}, pkgerrors.Wrap(err, "Get DeploymentIntentGroup Context error") + } + + if result != nil { + ctxVal := string(result[0]) + var cc appcontext.AppContext + _, err = cc.LoadAppContext(ctxVal) + if err != nil { + return appcontext.AppContext{}, pkgerrors.Wrap(err, "Error loading DeploymentIntentGroup Appcontext") + } + return cc, nil + } + + return appcontext.AppContext{}, pkgerrors.New("Error getting DeploymentIntentGroup AppContext") +} + // DeleteDeploymentIntentGroup deletes a DeploymentIntentGroup func (c *DeploymentIntentGroupClient) DeleteDeploymentIntentGroup(di string, p string, ca string, v string) error { k := DeploymentIntentGroupKey{ diff --git a/src/orchestrator/pkg/module/instantiation.go b/src/orchestrator/pkg/module/instantiation.go index 043b80f2..9432e4b9 100644 --- a/src/orchestrator/pkg/module/instantiation.go +++ b/src/orchestrator/pkg/module/instantiation.go @@ -21,6 +21,7 @@ import ( "encoding/json" "fmt" + rb "github.com/onap/multicloud-k8s/src/monitor/pkg/apis/k8splugin/v1alpha1" gpic "github.com/onap/multicloud-k8s/src/orchestrator/pkg/gpic" "github.com/onap/multicloud-k8s/src/orchestrator/pkg/infra/db" log "github.com/onap/multicloud-k8s/src/orchestrator/pkg/infra/logutils" @@ -42,17 +43,25 @@ type InstantiationClient struct { db InstantiationClientDbInfo } +type ClusterAppStatus struct { + Cluster string + App string + Status rb.ResourceBundleStatus +} + +type StatusData struct { + Data []ClusterAppStatus +} + /* InstantiationKey used in storing the contextid in the momgodb It consists of -GenericPlacementIntentName, ProjectName, CompositeAppName, CompositeAppVersion, DeploymentIntentGroup */ type InstantiationKey struct { - IntentName string Project string CompositeApp string Version string @@ -64,6 +73,8 @@ type InstantiationKey struct { type InstantiationManager interface { //ApproveInstantiation(p string, ca string, v string, di string) (error) Instantiate(p string, ca string, v string, di string) error + Status(p string, ca string, v string, di string) (StatusData, error) + Terminate(p string, ca string, v string, di string) error } // InstantiationClientDbInfo consists of storeName and tagContext @@ -229,6 +240,12 @@ func (c InstantiationClient) Instantiate(p string, ca string, v string, di strin return pkgerrors.Wrapf(err, "Unable to get the resources for app :: %s", eachApp.Metadata.Name) } + statusResource, err := getStatusResource(ctxval.(string), eachApp.Metadata.Name) + if err != nil { + return pkgerrors.Wrapf(err, "Unable to generate the status resource for app :: %s", eachApp.Metadata.Name) + } + resources = append(resources, statusResource) + specData, err := NewAppIntentClient().GetAllIntentsByApp(eachApp.Metadata.Name, p, ca, v, gIntent) if err != nil { return pkgerrors.Wrap(err, "Unable to get the intents for app") @@ -269,12 +286,11 @@ func (c InstantiationClient) Instantiate(p string, ca string, v string, di strin //END: storing into etcd // BEGIN:: save the context in the orchestrator db record - key := InstantiationKey{ - IntentName: gIntent, - Project: p, - CompositeApp: ca, - Version: v, - DeploymentIntentGroup: di, + key := DeploymentIntentGroupKey{ + Name: di, + Project: p, + CompositeApp: ca, + Version: v, } err = db.DBconn.Insert(c.db.storeName, key, nil, c.db.tagContext, ctxval) @@ -324,3 +340,84 @@ func (c InstantiationClient) Instantiate(p string, ca string, v string, di strin log.Info(":: Done with instantiation... ::", log.Fields{"CompositeAppName": ca}) return err } + +/* +Status takes in projectName, compositeAppName, compositeAppVersion, +DeploymentIntentName. This method is responsible obtaining the status of +the deployment, which is made available in the appcontext. +*/ +func (c InstantiationClient) Status(p string, ca string, v string, di string) (StatusData, error) { + + ac, err := NewDeploymentIntentGroupClient().GetDeploymentIntentGroupContext(di, p, ca, v) + if err != nil { + return StatusData{}, pkgerrors.Wrap(err, "deploymentIntentGroup not found "+di) + } + + // Get all apps in this composite app + allApps, err := NewAppClient().GetApps(p, ca, v) + if err != nil { + return StatusData{}, pkgerrors.Wrap(err, "Not finding the apps") + } + + var diStatus StatusData + diStatus.Data = make([]ClusterAppStatus, 0) + + // Loop through each app and get the status data for each cluster in the app + for _, app := range allApps { + // Get the clusters in the appcontext for this app + clusters, err := ac.GetClusterNames(app.Metadata.Name) + if err != nil { + log.Info(":: No clusters for app ::", log.Fields{"AppName": app.Metadata.Name}) + continue + } + + for _, cluster := range clusters { + handle, err := ac.GetStatusHandle(app.Metadata.Name, cluster) + if err != nil { + log.Info(":: No status handle for cluster, app ::", + log.Fields{"Cluster": cluster, "AppName": app.Metadata.Name, "Error": err}) + continue + } + statusValue, err := ac.GetValue(handle) + if err != nil { + log.Info(":: No status value for cluster, app ::", + log.Fields{"Cluster": cluster, "AppName": app.Metadata.Name, "Error": err}) + continue + } + log.Info(":: STATUS VALUE ::", log.Fields{"statusValue": statusValue}) + var statusData ClusterAppStatus + err = json.Unmarshal([]byte(statusValue.(string)), &statusData.Status) + if err != nil { + log.Info(":: Error unmarshaling status value for cluster, app ::", + log.Fields{"Cluster": cluster, "AppName": app.Metadata.Name, "Error": err}) + continue + } + statusData.Cluster = cluster + statusData.App = app.Metadata.Name + log.Info(":: STATUS DATA ::", log.Fields{"status": statusData}) + + diStatus.Data = append(diStatus.Data, statusData) + } + } + + return diStatus, nil +} + +/* +Terminate takes in projectName, compositeAppName, compositeAppVersion, +DeploymentIntentName and calls rsync to terminate. +*/ +func (c InstantiationClient) Terminate(p string, ca string, v string, di string) error { + + //ac, err := NewDeploymentIntentGroupClient().GetDeploymentIntentGroupContext(di, p, ca, v) + _, err := NewDeploymentIntentGroupClient().GetDeploymentIntentGroupContext(di, p, ca, v) + if err != nil { + return pkgerrors.Wrap(err, "deploymentIntentGroup not found "+di) + } + + // TODO - make call to rsync to terminate the composite app deployment + // will leave the appcontext in place for clean up later + // so monitoring status can be performed + + return nil +} diff --git a/src/orchestrator/pkg/module/instantiation_appcontext_helper.go b/src/orchestrator/pkg/module/instantiation_appcontext_helper.go index 43ddd6df..e6e2bf30 100644 --- a/src/orchestrator/pkg/module/instantiation_appcontext_helper.go +++ b/src/orchestrator/pkg/module/instantiation_appcontext_helper.go @@ -25,12 +25,16 @@ import ( "encoding/json" "io/ioutil" + jyaml "github.com/ghodss/yaml" + + rb "github.com/onap/multicloud-k8s/src/monitor/pkg/apis/k8splugin/v1alpha1" "github.com/onap/multicloud-k8s/src/orchestrator/pkg/appcontext" gpic "github.com/onap/multicloud-k8s/src/orchestrator/pkg/gpic" log "github.com/onap/multicloud-k8s/src/orchestrator/pkg/infra/logutils" "github.com/onap/multicloud-k8s/src/orchestrator/utils" "github.com/onap/multicloud-k8s/src/orchestrator/utils/helm" pkgerrors "github.com/pkg/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) // resource consists of name of reource @@ -90,6 +94,45 @@ func getResources(st []helm.KubernetesResourceTemplate) ([]resource, error) { return resources, nil } +// addStatusResource adds a status monitoring resource to the app +// which consists of name(name+kind) and content +func getStatusResource(id, app string) (resource, error) { + + var statusCr rb.ResourceBundleState + + label := id + "-" + app + name := app + "-" + id + + statusCr.TypeMeta.APIVersion = "k8splugin.io/v1alpha1" + statusCr.TypeMeta.Kind = "ResourceBundleState" + statusCr.SetName(name) + + labels := make(map[string]string) + labels["emco/deployment-id"] = label + statusCr.SetLabels(labels) + + labelSelector, err := metav1.ParseToLabelSelector("emco/deployment-id = " + label) + if err != nil { + log.Info(":: ERROR Parsing Label Selector ::", log.Fields{"Error": err}) + } else { + statusCr.Spec.Selector = labelSelector + } + + // Marshaling to json then convert to yaml works better than marshaling to yaml + // The 'apiVersion' attribute was marshaling to 'apiversion' + // y, _ := yaml.Marshal(&statusCr) + j, _ := json.Marshal(&statusCr) + y, _ := jyaml.JSONToYAML(j) + log.Info(":: RESULTING STATUS CR ::", log.Fields{"StatusCR": y}) + + statusResource := resource{ + name: name + "+" + "ResourceBundleState", + filecontent: string(y), + } + + return statusResource, nil +} + func addResourcesToCluster(ct appcontext.AppContext, ch interface{}, resources []resource) error { var resOrderInstr struct { diff --git a/src/orchestrator/pkg/rtcontext/rtcontext.go b/src/orchestrator/pkg/rtcontext/rtcontext.go index 432c5d87..f3905eb0 100644 --- a/src/orchestrator/pkg/rtcontext/rtcontext.go +++ b/src/orchestrator/pkg/rtcontext/rtcontext.go @@ -18,11 +18,12 @@ package rtcontext import ( "fmt" - "github.com/onap/multicloud-k8s/src/orchestrator/pkg/infra/contextdb" - pkgerrors "github.com/pkg/errors" "math/rand" "strings" "time" + + "github.com/onap/multicloud-k8s/src/orchestrator/pkg/infra/contextdb" + pkgerrors "github.com/pkg/errors" ) const maxrand = 0x7fffffffffffffff @@ -40,6 +41,7 @@ type Rtcontext interface { RtcAddMeta(meta interface{}) error RtcGet() (interface{}, error) RtcAddLevel(handle interface{}, level string, value string) (interface{}, error) + RtcAddStatus(handle interface{}, value interface{}) (interface{}, error) RtcAddResource(handle interface{}, resname string, value interface{}) (interface{}, error) RtcAddInstruction(handle interface{}, level string, insttype string, value interface{}) (interface{}, error) RtcDeletePair(handle interface{}) error @@ -201,6 +203,26 @@ func (rtc *RunTimeContext) RtcAddOneLevel(pl interface{}, level string, value in return (interface{})(key), nil } +// Add status under the given level and return new handle +func (rtc *RunTimeContext) RtcAddStatus(handle interface{}, value interface{}) (interface{}, error) { + + str := fmt.Sprintf("%v", handle) + sid := fmt.Sprintf("%v", rtc.cid) + if !strings.HasPrefix(str, sid) { + return nil, pkgerrors.Errorf("Not a valid run time context handle") + } + if value == nil { + return nil, pkgerrors.Errorf("Not a valid run time context resource value") + } + + k := str + "status" + "/" + err := contextdb.Db.Put(k, value) + if err != nil { + return nil, pkgerrors.Errorf("Error adding run time context status: %s", err.Error()) + } + return (interface{})(k), nil +} + // Add a resource under the given level and return new handle func (rtc *RunTimeContext) RtcAddResource(handle interface{}, resname string, value interface{}) (interface{}, error) { diff --git a/src/rsync/pkg/connector/connector.go b/src/rsync/pkg/connector/connector.go index 6e17f87a..fc8aa839 100644 --- a/src/rsync/pkg/connector/connector.go +++ b/src/rsync/pkg/connector/connector.go @@ -19,8 +19,6 @@ package connector import ( "log" - "github.com/onap/multicloud-k8s/src/rsync/pkg/internal/config" - corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/meta" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" @@ -52,7 +50,7 @@ type KubernetesConnector interface { // Reference is the interface that is implemented type Reference interface { //Create a kubernetes resource described by the yaml in yamlFilePath - Create(yamlFilePath string, namespace string, client KubernetesConnector) (string, error) + Create(yamlFilePath string, namespace string, label string, client KubernetesConnector) (string, error) //Delete a kubernetes resource described in the provided namespace Delete(yamlFilePath string, resname string, namespace string, client KubernetesConnector) error } @@ -86,7 +84,7 @@ func TagPodsIfPresent(unstruct *unstructured.Unstructured, tag string) { if labels == nil { labels = map[string]string{} } - labels[config.GetConfiguration().KubernetesLabelName] = tag + labels["emco/deployment-id"] = tag podTemplateSpec.SetLabels(labels) updatedTemplate, err := runtime.DefaultUnstructuredConverter.ToUnstructured(podTemplateSpec) diff --git a/src/rsync/pkg/context/context.go b/src/rsync/pkg/context/context.go index e5da1296..7e0fce3c 100644 --- a/src/rsync/pkg/context/context.go +++ b/src/rsync/pkg/context/context.go @@ -18,67 +18,68 @@ package context import ( "encoding/json" - "fmt" - "log" - "sync" - "strings" - "github.com/onap/multicloud-k8s/src/orchestrator/pkg/appcontext" - pkgerrors "github.com/pkg/errors" - res "github.com/onap/multicloud-k8s/src/rsync/pkg/resource" - con "github.com/onap/multicloud-k8s/src/rsync/pkg/connector" - "github.com/onap/multicloud-k8s/src/rsync/pkg/app" + "fmt" + "log" + "strings" + "sync" + + "github.com/onap/multicloud-k8s/src/orchestrator/pkg/appcontext" + "github.com/onap/multicloud-k8s/src/rsync/pkg/app" + con "github.com/onap/multicloud-k8s/src/rsync/pkg/connector" + res "github.com/onap/multicloud-k8s/src/rsync/pkg/resource" + status "github.com/onap/multicloud-k8s/src/rsync/pkg/status" + pkgerrors "github.com/pkg/errors" ) type CompositeAppContext struct { - cid interface{} + cid interface{} appsorder string appsdependency string - appsmap []instMap + appsmap []instMap } type clusterInfo struct { - name string + name string resorder string resdependency string - ressmap []instMap + ressmap []instMap } type instMap struct { - name string - depinfo string - status string - rerr error + name string + depinfo string + status string + rerr error clusters []clusterInfo } func getInstMap(order string, dependency string, level string) ([]instMap, error) { - if order == "" { - return nil, pkgerrors.Errorf("Not a valid order value") - } - if dependency == "" { - return nil, pkgerrors.Errorf("Not a valid dependency value") - } - - if !(level == "app" || level == "res") { - return nil, pkgerrors.Errorf("Not a valid level name given to create map") - } + if order == "" { + return nil, pkgerrors.Errorf("Not a valid order value") + } + if dependency == "" { + return nil, pkgerrors.Errorf("Not a valid dependency value") + } + if !(level == "app" || level == "res") { + return nil, pkgerrors.Errorf("Not a valid level name given to create map") + } - var aov map[string]interface{} - json.Unmarshal([]byte(order), &aov) + var aov map[string]interface{} + json.Unmarshal([]byte(order), &aov) - s := fmt.Sprintf("%vorder", level) - appso := aov[s].([]interface{}) - var instmap = make([]instMap, len(appso)) + s := fmt.Sprintf("%vorder", level) + appso := aov[s].([]interface{}) + var instmap = make([]instMap, len(appso)) - var adv map[string]interface{} - json.Unmarshal([]byte(dependency), &adv) - s = fmt.Sprintf("%vdependency", level) - appsd := adv[s].(map[string]interface{}) - for i, u := range appso { - instmap[i] = instMap{u.(string), appsd[u.(string)].(string), "none", nil, nil} - } + var adv map[string]interface{} + json.Unmarshal([]byte(dependency), &adv) + s = fmt.Sprintf("%vdependency", level) + appsd := adv[s].(map[string]interface{}) + for i, u := range appso { + instmap[i] = instMap{u.(string), appsd[u.(string)].(string), "none", nil, nil} + } - return instmap, nil + return instmap, nil } func deleteResource(clustername string, resname string, respath string) error { @@ -94,7 +95,7 @@ func deleteResource(clustername string, resname string, respath string) error { var gp res.Resource err = gp.Delete(respath, resname, "default", c) if err != nil { - log.Println("Delete resource failed: " + err.Error() + resname) + log.Println("Delete resource failed: " + err.Error() + resname) return err } log.Println("Resource succesfully deleted", resname) @@ -102,7 +103,7 @@ func deleteResource(clustername string, resname string, respath string) error { } -func createResource(clustername string, resname string, respath string) error { +func createResource(clustername string, resname string, respath string, label string) error { k8sClient := app.KubernetesClient{} err := k8sClient.Init(clustername, resname) if err != nil { @@ -113,9 +114,9 @@ func createResource(clustername string, resname string, respath string) error { var c con.KubernetesConnector c = &k8sClient var gp res.Resource - _, err = gp.Create(respath,"default", c) + _, err = gp.Create(respath, "default", label, c) if err != nil { - log.Println("Create failed: " + err.Error() + resname) + log.Println("Create failed: " + err.Error() + resname) return err } log.Println("Resource succesfully created", resname) @@ -152,7 +153,7 @@ func terminateResource(ac appcontext.AppContext, resmap instMap, appname string, } -func instantiateResource(ac appcontext.AppContext, resmap instMap, appname string, clustername string) error { +func instantiateResource(ac appcontext.AppContext, resmap instMap, appname string, clustername string, label string) error { rh, err := ac.GetResourceHandle(appname, clustername, resmap.name) if err != nil { return err @@ -168,7 +169,7 @@ func instantiateResource(ac appcontext.AppContext, resmap instMap, appname strin if result[0] == "" { return pkgerrors.Errorf("Resource name is nil") } - err = createResource(clustername, result[0], resval.(string)) + err = createResource(clustername, result[0], resval.(string), label) if err != nil { return err } @@ -180,97 +181,102 @@ func instantiateResource(ac appcontext.AppContext, resmap instMap, appname strin } -func terminateResources(ac appcontext.AppContext, ressmap []instMap, appname string, clustername string) error { - var wg sync.WaitGroup - var chans = make([]chan int, len(ressmap)) - for l := range chans { - chans[l] = make(chan int) - } - for i:=0; i<len(ressmap); i++ { - wg.Add(1) - go func(index int) { - if ressmap[index].depinfo == "go" { - ressmap[index].status = "start" - } else { - ressmap[index].status = "waiting" - c := <- chans[index] - if c != index { +func terminateResources(ac appcontext.AppContext, ressmap []instMap, appname string, clustername string) error { + var wg sync.WaitGroup + var chans = make([]chan int, len(ressmap)) + for l := range chans { + chans[l] = make(chan int) + } + for i := 0; i < len(ressmap); i++ { + wg.Add(1) + go func(index int) { + if ressmap[index].depinfo == "go" { + ressmap[index].status = "start" + } else { + ressmap[index].status = "waiting" + c := <-chans[index] + if c != index { ressmap[index].status = "error" - ressmap[index].rerr = pkgerrors.Errorf("channel does not match") + ressmap[index].rerr = pkgerrors.Errorf("channel does not match") wg.Done() return - } - ressmap[index].status = "start" - } - ressmap[index].rerr = terminateResource(ac, ressmap[index], appname, clustername) - ressmap[index].status = "done" - waitstr := fmt.Sprintf("wait on %v",ressmap[index].name) - for j:=0; j<len(ressmap); j++ { - if ressmap[j].depinfo == waitstr { - chans[j] <- j - } - } - wg.Done() - }(i) - } - wg.Wait() - for k:=0; k<len(ressmap); k++ { - if ressmap[k].rerr != nil { - return pkgerrors.Errorf("error during resources termination") - } - } - return nil + } + ressmap[index].status = "start" + } + ressmap[index].rerr = terminateResource(ac, ressmap[index], appname, clustername) + ressmap[index].status = "done" + waitstr := fmt.Sprintf("wait on %v", ressmap[index].name) + for j := 0; j < len(ressmap); j++ { + if ressmap[j].depinfo == waitstr { + chans[j] <- j + } + } + wg.Done() + }(i) + } + wg.Wait() + for k := 0; k < len(ressmap); k++ { + if ressmap[k].rerr != nil { + return pkgerrors.Errorf("error during resources termination") + } + } + return nil } -func instantiateResources(ac appcontext.AppContext, ressmap []instMap, appname string, clustername string) error { - var wg sync.WaitGroup - var chans = make([]chan int, len(ressmap)) - for l := range chans { - chans[l] = make(chan int) - } - for i:=0; i<len(ressmap); i++ { - wg.Add(1) - go func(index int) { - if ressmap[index].depinfo == "go" { - ressmap[index].status = "start" - } else { - ressmap[index].status = "waiting" - c := <- chans[index] - if c != index { +func instantiateResources(ac appcontext.AppContext, ressmap []instMap, appname string, clustername string) error { + var wg sync.WaitGroup + var chans = make([]chan int, len(ressmap)) + cid, _ := ac.GetCompositeAppHandle() + + results := strings.Split(cid.(string), "/") + label := results[2] + "-" + appname + + for l := range chans { + chans[l] = make(chan int) + } + for i := 0; i < len(ressmap); i++ { + wg.Add(1) + go func(index int) { + if ressmap[index].depinfo == "go" { + ressmap[index].status = "start" + } else { + ressmap[index].status = "waiting" + c := <-chans[index] + if c != index { ressmap[index].status = "error" - ressmap[index].rerr = pkgerrors.Errorf("channel does not match") + ressmap[index].rerr = pkgerrors.Errorf("channel does not match") wg.Done() return - } - ressmap[index].status = "start" - } - ressmap[index].rerr = instantiateResource(ac, ressmap[index], appname, clustername) - ressmap[index].status = "done" - waitstr := fmt.Sprintf("wait on %v",ressmap[index].name) - for j:=0; j<len(ressmap); j++ { - if ressmap[j].depinfo == waitstr { - chans[j] <- j - } - } - wg.Done() - }(i) - } - wg.Wait() - for k:=0; k<len(ressmap); k++ { - if ressmap[k].rerr != nil { - return pkgerrors.Errorf("error during resources instantiation") - } - } - return nil + } + ressmap[index].status = "start" + } + ressmap[index].rerr = instantiateResource(ac, ressmap[index], appname, clustername, label) + ressmap[index].status = "done" + waitstr := fmt.Sprintf("wait on %v", ressmap[index].name) + for j := 0; j < len(ressmap); j++ { + if ressmap[j].depinfo == waitstr { + chans[j] <- j + } + } + wg.Done() + }(i) + } + wg.Wait() + for k := 0; k < len(ressmap); k++ { + if ressmap[k].rerr != nil { + return pkgerrors.Errorf("error during resources instantiation") + } + } + return nil } func terminateApp(ac appcontext.AppContext, appmap instMap) error { - for i:=0; i<len(appmap.clusters); i++ { + for i := 0; i < len(appmap.clusters); i++ { err := terminateResources(ac, appmap.clusters[i].ressmap, appmap.name, - appmap.clusters[i].name) + appmap.clusters[i].name) if err != nil { return err } @@ -281,38 +287,41 @@ func terminateApp(ac appcontext.AppContext, appmap instMap) error { } - func instantiateApp(ac appcontext.AppContext, appmap instMap) error { - for i:=0; i<len(appmap.clusters); i++ { + for i := 0; i < len(appmap.clusters); i++ { err := instantiateResources(ac, appmap.clusters[i].ressmap, appmap.name, - appmap.clusters[i].name) + appmap.clusters[i].name) if err != nil { return err } + err = status.StartClusterWatcher(appmap.clusters[i].name) + if err != nil { + log.Printf("Error starting Cluster Watcher %v: %v\n", appmap.clusters[i], err) + } } log.Println("Instantiation of app done: " + appmap.name) return nil } -func instantiateApps(ac appcontext.AppContext, appsmap []instMap) error { +func instantiateApps(ac appcontext.AppContext, appsmap []instMap) error { var wg sync.WaitGroup var chans = make([]chan int, len(appsmap)) for l := range chans { chans[l] = make(chan int) } - for i:=0; i<len(appsmap); i++ { + for i := 0; i < len(appsmap); i++ { wg.Add(1) - go func(index int) { - if appsmap[index].depinfo == "go" { + go func(index int) { + if appsmap[index].depinfo == "go" { appsmap[index].status = "start" } else { appsmap[index].status = "waiting" - c := <- chans[index] + c := <-chans[index] if c != index { appsmap[index].status = "error" - appsmap[index].rerr = pkgerrors.Errorf("channel does not match") + appsmap[index].rerr = pkgerrors.Errorf("channel does not match") wg.Done() return } @@ -320,17 +329,17 @@ func instantiateApps(ac appcontext.AppContext, appsmap []instMap) error { } appsmap[index].rerr = instantiateApp(ac, appsmap[index]) appsmap[index].status = "done" - waitstr := fmt.Sprintf("wait on %v",appsmap[index].name) - for j:=0; j<len(appsmap); j++ { + waitstr := fmt.Sprintf("wait on %v", appsmap[index].name) + for j := 0; j < len(appsmap); j++ { if appsmap[j].depinfo == waitstr { chans[j] <- j } } wg.Done() - }(i) - } + }(i) + } wg.Wait() - for k:=0; k<len(appsmap); k++ { + for k := 0; k < len(appsmap); k++ { if appsmap[k].rerr != nil { return pkgerrors.Errorf("error during apps instantiation") } @@ -343,45 +352,45 @@ func (instca *CompositeAppContext) InstantiateComApp(cid interface{}) error { ac := appcontext.AppContext{} _, err := ac.LoadAppContext(cid) - if err != nil { - return err - } + if err != nil { + return err + } instca.cid = cid appsorder, err := ac.GetAppInstruction("order") - if err != nil { - return err - } + if err != nil { + return err + } instca.appsorder = appsorder.(string) appsdependency, err := ac.GetAppInstruction("dependency") - if err != nil { - return err - } + if err != nil { + return err + } instca.appsdependency = appsdependency.(string) - instca.appsmap, err = getInstMap(instca.appsorder,instca.appsdependency, "app") - if err != nil { - return err - } + instca.appsmap, err = getInstMap(instca.appsorder, instca.appsdependency, "app") + if err != nil { + return err + } - for j:=0; j<len(instca.appsmap); j++ { + for j := 0; j < len(instca.appsmap); j++ { clusternames, err := ac.GetClusterNames(instca.appsmap[j].name) if err != nil { - return err + return err } instca.appsmap[j].clusters = make([]clusterInfo, len(clusternames)) - for k:=0; k<len(clusternames); k++ { + for k := 0; k < len(clusternames); k++ { instca.appsmap[j].clusters[k].name = clusternames[k] resorder, err := ac.GetResourceInstruction( - instca.appsmap[j].name, clusternames[k], "order") + instca.appsmap[j].name, clusternames[k], "order") if err != nil { - return err + return err } instca.appsmap[j].clusters[k].resorder = resorder.(string) resdependency, err := ac.GetResourceInstruction( - instca.appsmap[j].name, clusternames[k], "dependency") + instca.appsmap[j].name, clusternames[k], "dependency") if err != nil { - return err + return err } instca.appsmap[j].clusters[k].resdependency = resdependency.(string) @@ -389,36 +398,36 @@ func (instca *CompositeAppContext) InstantiateComApp(cid interface{}) error { instca.appsmap[j].clusters[k].resorder, instca.appsmap[j].clusters[k].resdependency, "res") if err != nil { - return err + return err } } } err = instantiateApps(ac, instca.appsmap) - if err != nil { - return err - } + if err != nil { + return err + } return nil } // Delete all the apps -func terminateApps(ac appcontext.AppContext, appsmap []instMap) error { +func terminateApps(ac appcontext.AppContext, appsmap []instMap) error { var wg sync.WaitGroup var chans = make([]chan int, len(appsmap)) for l := range chans { chans[l] = make(chan int) } - for i:=0; i<len(appsmap); i++ { + for i := 0; i < len(appsmap); i++ { wg.Add(1) - go func(index int) { - if appsmap[index].depinfo == "go" { + go func(index int) { + if appsmap[index].depinfo == "go" { appsmap[index].status = "start" } else { appsmap[index].status = "waiting" - c := <- chans[index] + c := <-chans[index] if c != index { appsmap[index].status = "error" - appsmap[index].rerr = pkgerrors.Errorf("channel does not match") + appsmap[index].rerr = pkgerrors.Errorf("channel does not match") wg.Done() return } @@ -426,17 +435,17 @@ func terminateApps(ac appcontext.AppContext, appsmap []instMap) error { } appsmap[index].rerr = terminateApp(ac, appsmap[index]) appsmap[index].status = "done" - waitstr := fmt.Sprintf("wait on %v",appsmap[index].name) - for j:=0; j<len(appsmap); j++ { + waitstr := fmt.Sprintf("wait on %v", appsmap[index].name) + for j := 0; j < len(appsmap); j++ { if appsmap[j].depinfo == waitstr { chans[j] <- j } } wg.Done() - }(i) - } + }(i) + } wg.Wait() - for k:=0; k<len(appsmap); k++ { + for k := 0; k < len(appsmap); k++ { if appsmap[k].rerr != nil { return pkgerrors.Errorf("error during apps instantiation") } @@ -444,50 +453,51 @@ func terminateApps(ac appcontext.AppContext, appsmap []instMap) error { return nil } + // Delete all the resources for a given context func (instca *CompositeAppContext) TerminateComApp(cid interface{}) error { ac := appcontext.AppContext{} _, err := ac.LoadAppContext(cid) - if err != nil { - return err - } + if err != nil { + return err + } instca.cid = cid appsorder, err := ac.GetAppInstruction("order") - if err != nil { - return err - } + if err != nil { + return err + } instca.appsorder = appsorder.(string) appsdependency, err := ac.GetAppInstruction("dependency") - if err != nil { - return err - } + if err != nil { + return err + } instca.appsdependency = appsdependency.(string) - instca.appsmap, err = getInstMap(instca.appsorder,instca.appsdependency, "app") - if err != nil { - return err - } + instca.appsmap, err = getInstMap(instca.appsorder, instca.appsdependency, "app") + if err != nil { + return err + } - for j:=0; j<len(instca.appsmap); j++ { + for j := 0; j < len(instca.appsmap); j++ { clusternames, err := ac.GetClusterNames(instca.appsmap[j].name) if err != nil { - return err + return err } instca.appsmap[j].clusters = make([]clusterInfo, len(clusternames)) - for k:=0; k<len(clusternames); k++ { + for k := 0; k < len(clusternames); k++ { instca.appsmap[j].clusters[k].name = clusternames[k] resorder, err := ac.GetResourceInstruction( - instca.appsmap[j].name, clusternames[k], "order") + instca.appsmap[j].name, clusternames[k], "order") if err != nil { - return err + return err } instca.appsmap[j].clusters[k].resorder = resorder.(string) resdependency, err := ac.GetResourceInstruction( - instca.appsmap[j].name, clusternames[k], "dependency") + instca.appsmap[j].name, clusternames[k], "dependency") if err != nil { - return err + return err } instca.appsmap[j].clusters[k].resdependency = resdependency.(string) @@ -495,14 +505,14 @@ func (instca *CompositeAppContext) TerminateComApp(cid interface{}) error { instca.appsmap[j].clusters[k].resorder, instca.appsmap[j].clusters[k].resdependency, "res") if err != nil { - return err + return err } } } err = terminateApps(ac, instca.appsmap) - if err != nil { - return err - } + if err != nil { + return err + } return nil diff --git a/src/rsync/pkg/resource/resource.go b/src/rsync/pkg/resource/resource.go index 8b45c341..2877e2a3 100644 --- a/src/rsync/pkg/resource/resource.go +++ b/src/rsync/pkg/resource/resource.go @@ -20,16 +20,15 @@ import ( "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/runtime/schema" - utils "github.com/onap/multicloud-k8s/src/rsync/pkg/internal" - "github.com/onap/multicloud-k8s/src/rsync/pkg/internal/config" "github.com/onap/multicloud-k8s/src/rsync/pkg/connector" + utils "github.com/onap/multicloud-k8s/src/rsync/pkg/internal" ) type Resource struct { } // Create deployment object in a specific Kubernetes cluster -func (r Resource) Create(data string, namespace string, client connector.KubernetesConnector) (string, error) { +func (r Resource) Create(data string, namespace string, label string, client connector.KubernetesConnector) (string, error) { if namespace == "" { namespace = "default" } @@ -57,13 +56,15 @@ func (r Resource) Create(data string, namespace string, client connector.Kuberne if labels == nil { labels = map[string]string{} } - labels[config.GetConfiguration().KubernetesLabelName] = client.GetInstanceID() + //labels[config.GetConfiguration().KubernetesLabelName] = client.GetInstanceID() + labels["emco/deployment-id"] = label unstruct.SetLabels(labels) // This checks if the resource we are creating has a podSpec in it // Eg: Deployment, StatefulSet, Job etc.. // If a PodSpec is found, the label will be added to it too. - connector.TagPodsIfPresent(unstruct, client.GetInstanceID()) + //connector.TagPodsIfPresent(unstruct, client.GetInstanceID()) + connector.TagPodsIfPresent(unstruct, label) gvr := mapping.Resource var createdObj *unstructured.Unstructured @@ -86,44 +87,44 @@ func (r Resource) Create(data string, namespace string, client connector.Kuberne // Delete an existing resource hosted in a specific Kubernetes cluster func (r Resource) Delete(data string, resname string, namespace string, client connector.KubernetesConnector) error { - if namespace == "" { - namespace = "default" - } - - //Decode the yaml file to create a runtime.Object - unstruct := &unstructured.Unstructured{} - //Ignore the returned obj as we expect the data in unstruct - _, err := utils.DecodeYAMLData(data, unstruct) - if err != nil { - return pkgerrors.Wrap(err, "Decode deployment object error") - } - - dynClient := client.GetDynamicClient() - mapper := client.GetMapper() - - gvk := unstruct.GroupVersionKind() - mapping, err := mapper.RESTMapping(schema.GroupKind{Group: gvk.Group, Kind: gvk.Kind}, gvk.Version) - if err != nil { - return pkgerrors.Wrap(err, "Mapping kind to resource error") - } - - gvr := mapping.Resource - deletePolicy := metav1.DeletePropagationForeground - opts := &metav1.DeleteOptions{ - PropagationPolicy: &deletePolicy, - } - - switch mapping.Scope.Name() { - case meta.RESTScopeNameNamespace: - err = dynClient.Resource(gvr).Namespace(namespace).Delete(resname, opts) - case meta.RESTScopeNameRoot: - err = dynClient.Resource(gvr).Delete(resname, opts) - default: - return pkgerrors.New("Got an unknown RESTSCopeName for mappin") - } - - if err != nil { - return pkgerrors.Wrap(err, "Delete object error") - } - return nil + if namespace == "" { + namespace = "default" + } + + //Decode the yaml file to create a runtime.Object + unstruct := &unstructured.Unstructured{} + //Ignore the returned obj as we expect the data in unstruct + _, err := utils.DecodeYAMLData(data, unstruct) + if err != nil { + return pkgerrors.Wrap(err, "Decode deployment object error") + } + + dynClient := client.GetDynamicClient() + mapper := client.GetMapper() + + gvk := unstruct.GroupVersionKind() + mapping, err := mapper.RESTMapping(schema.GroupKind{Group: gvk.Group, Kind: gvk.Kind}, gvk.Version) + if err != nil { + return pkgerrors.Wrap(err, "Mapping kind to resource error") + } + + gvr := mapping.Resource + deletePolicy := metav1.DeletePropagationForeground + opts := &metav1.DeleteOptions{ + PropagationPolicy: &deletePolicy, + } + + switch mapping.Scope.Name() { + case meta.RESTScopeNameNamespace: + err = dynClient.Resource(gvr).Namespace(namespace).Delete(resname, opts) + case meta.RESTScopeNameRoot: + err = dynClient.Resource(gvr).Delete(resname, opts) + default: + return pkgerrors.New("Got an unknown RESTSCopeName for mappin") + } + + if err != nil { + return pkgerrors.Wrap(err, "Delete object error") + } + return nil } diff --git a/src/rsync/pkg/status/status.go b/src/rsync/pkg/status/status.go index 28bffefd..351da027 100644 --- a/src/rsync/pkg/status/status.go +++ b/src/rsync/pkg/status/status.go @@ -17,16 +17,20 @@ package status import ( + "encoding/base64" "encoding/json" "fmt" + "strings" "sync" pkgerrors "github.com/pkg/errors" "github.com/sirupsen/logrus" + "github.com/onap/multicloud-k8s/src/clm/pkg/cluster" v1alpha1 "github.com/onap/multicloud-k8s/src/monitor/pkg/apis/k8splugin/v1alpha1" clientset "github.com/onap/multicloud-k8s/src/monitor/pkg/generated/clientset/versioned" informers "github.com/onap/multicloud-k8s/src/monitor/pkg/generated/informers/externalversions" + appcontext "github.com/onap/multicloud-k8s/src/orchestrator/pkg/appcontext" "k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/clientcmd" ) @@ -42,20 +46,75 @@ const monitorLabel = "emco/deployment-id" // HandleStatusUpdate for an application in a cluster // TODO: Add code for specific handling -func HandleStatusUpdate(provider, name string, id string, v *v1alpha1.ResourceBundleState) error { - logrus.Info("label::", id) +func HandleStatusUpdate(clusterId string, id string, v *v1alpha1.ResourceBundleState) { //status := v.Status.ServiceStatuses //podStatus := v.Status.PodStatuses - // Store Pod Status in app context - out, _ := json.Marshal(v.Status) - logrus.Info("Status::", string(out)) - return nil + + // Get the contextId from the label (id) + result := strings.SplitN(id, "-", 2) + if result[0] == "" { + logrus.Info(clusterId, "::label is missing an appcontext identifier::", id) + return + } + + if len(result) != 2 { + logrus.Info(clusterId, "::invalid label format::", id) + return + } + + // Get the app from the label (id) + if result[1] == "" { + logrus.Info(clusterId, "::label is missing an app identifier::", id) + return + } + + // Look up the contextId + var ac appcontext.AppContext + _, err := ac.LoadAppContext(result[0]) + if err != nil { + logrus.Info(clusterId, "::App context not found::", result[0], "::Error::", err) + return + } + + // produce yaml representation of the status + vjson, err := json.Marshal(v.Status) + if err != nil { + logrus.Info(clusterId, "::Error marshalling status information::", err) + return + } + + // Get the handle for the context/app/cluster status object + handle, err := ac.GetStatusHandle(result[1], clusterId) + if err != nil { + // Expected first time + logrus.Info(clusterId, "::Status context handle not found::", id, "::Error::", err) + } + + // If status handle was not found, then create the status object in the appcontext + if handle == nil { + chandle, err := ac.GetClusterHandle(result[1], clusterId) + if err != nil { + logrus.Info(clusterId, "::Cluster context handle not found::", id, "::Error::", err) + } else { + ac.AddStatus(chandle, string(vjson)) + } + } else { + ac.UpdateStatusValue(handle, string(vjson)) + } + + return } // StartClusterWatcher watches for CR // configBytes - Kubectl file data -func StartClusterWatcher(provider, name string, configBytes []byte) error { - key := provider + "+" + name +func StartClusterWatcher(clusterId string) error { + + configBytes, err := getKubeConfig(clusterId) + if err != nil { + return err + } + + //key := provider + "+" + name // Get the lock channelData.Lock() defer channelData.Unlock() @@ -63,10 +122,10 @@ func StartClusterWatcher(provider, name string, configBytes []byte) error { if channelData.channels == nil { channelData.channels = make(map[string]chan struct{}) } - _, ok := channelData.channels[key] + _, ok := channelData.channels[clusterId] if !ok { // Create Channel - channelData.channels[key] = make(chan struct{}) + channelData.channels[clusterId] = make(chan struct{}) // Create config config, err := clientcmd.RESTConfigFromKubeConfig(configBytes) if err != nil { @@ -80,16 +139,16 @@ func StartClusterWatcher(provider, name string, configBytes []byte) error { // Create Informer mInformerFactory := informers.NewSharedInformerFactory(k8sClient, 0) mInformer := mInformerFactory.K8splugin().V1alpha1().ResourceBundleStates().Informer() - go scheduleStatus(provider, name, channelData.channels[key], mInformer) + go scheduleStatus(clusterId, channelData.channels[clusterId], mInformer) } return nil } // StopClusterWatcher stop watching a cluster -func StopClusterWatcher(provider, name string) { - key := provider + "+" + name +func StopClusterWatcher(clusterId string) { + //key := provider + "+" + name if channelData.channels != nil { - c, ok := channelData.channels[key] + c, ok := channelData.channels[clusterId] if ok { close(c) } @@ -108,7 +167,7 @@ func CloseAllClusterWatchers() { } // Per Cluster Go routine to watch CR -func scheduleStatus(provider, name string, c <-chan struct{}, s cache.SharedIndexInformer) { +func scheduleStatus(clusterId string, c <-chan struct{}, s cache.SharedIndexInformer) { handlers := cache.ResourceEventHandlerFuncs{ AddFunc: func(obj interface{}) { v, ok := obj.(*v1alpha1.ResourceBundleState) @@ -116,7 +175,7 @@ func scheduleStatus(provider, name string, c <-chan struct{}, s cache.SharedInde labels := v.GetLabels() l, ok := labels[monitorLabel] if ok { - HandleStatusUpdate(provider, name, l, v) + HandleStatusUpdate(clusterId, l, v) } } }, @@ -126,7 +185,7 @@ func scheduleStatus(provider, name string, c <-chan struct{}, s cache.SharedInde labels := v.GetLabels() l, ok := labels[monitorLabel] if ok { - HandleStatusUpdate(provider, name, l, v) + HandleStatusUpdate(clusterId, l, v) } } }, @@ -137,3 +196,27 @@ func scheduleStatus(provider, name string, c <-chan struct{}, s cache.SharedInde s.AddEventHandler(handlers) s.Run(c) } + +// getKubeConfig uses the connectivity client to get the kubeconfig based on the name +// of the clustername. This is written out to a file. +// TODO - consolidate with other rsync methods to get kubeconfig files +func getKubeConfig(clustername string) ([]byte, error) { + + if !strings.Contains(clustername, "+") { + return nil, pkgerrors.New("Not a valid cluster name") + } + strs := strings.Split(clustername, "+") + if len(strs) != 2 { + return nil, pkgerrors.New("Not a valid cluster name") + } + kubeConfig, err := cluster.NewClusterClient().GetClusterContent(strs[0], strs[1]) + if err != nil { + return nil, pkgerrors.New("Get kubeconfig failed") + } + + dec, err := base64.StdEncoding.DecodeString(kubeConfig.Kubeconfig) + if err != nil { + return nil, err + } + return dec, nil +} |