summaryrefslogtreecommitdiffstats
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/monitor/build/Dockerfile21
-rw-r--r--src/monitor/deploy/cluster_role.yaml72
-rw-r--r--src/monitor/deploy/clusterrole_binding.yaml12
-rwxr-xr-xsrc/monitor/deploy/monitor-cleanup.sh6
-rwxr-xr-xsrc/monitor/deploy/monitor-deploy.sh6
-rw-r--r--src/monitor/deploy/operator.yaml14
-rw-r--r--src/monitor/deploy/role.yaml12
-rw-r--r--src/monitor/go.mod2
-rw-r--r--src/monitor/pkg/apis/k8splugin/v1alpha1/types.go4
-rw-r--r--src/orchestrator/api/api.go2
-rw-r--r--src/orchestrator/api/instantiation_handler.go46
-rw-r--r--src/orchestrator/go.mod15
-rw-r--r--src/orchestrator/pkg/appcontext/appcontext.go56
-rw-r--r--src/orchestrator/pkg/appcontext/appcontext_test.go4
-rw-r--r--src/orchestrator/pkg/module/deployment_intent_groups.go32
-rw-r--r--src/orchestrator/pkg/module/instantiation.go113
-rw-r--r--src/orchestrator/pkg/module/instantiation_appcontext_helper.go43
-rw-r--r--src/orchestrator/pkg/rtcontext/rtcontext.go26
-rw-r--r--src/rsync/pkg/connector/connector.go6
-rw-r--r--src/rsync/pkg/context/context.go400
-rw-r--r--src/rsync/pkg/resource/resource.go91
-rw-r--r--src/rsync/pkg/status/status.go117
22 files changed, 794 insertions, 306 deletions
diff --git a/src/monitor/build/Dockerfile b/src/monitor/build/Dockerfile
index 812eb47e..9ecff169 100644
--- a/src/monitor/build/Dockerfile
+++ b/src/monitor/build/Dockerfile
@@ -1,15 +1,16 @@
-FROM registry.access.redhat.com/ubi7/ubi-minimal:latest
+FROM golang:1.14.1
-ENV OPERATOR=/usr/local/bin/monitor \
- USER_UID=1001 \
- USER_NAME=monitor
+WORKDIR /go/src/github.com/onap/multicloud-k8s/src/monitor
+COPY ./ ./
+RUN make all
-# install operator binary
-COPY _output/bin/monitor ${OPERATOR}
+FROM ubuntu:16.04
-COPY bin /usr/local/bin
-RUN /usr/local/bin/user_setup
+WORKDIR /opt/monitor
+RUN groupadd -r monitor && useradd -r -g monitor monitor
+RUN chown monitor:monitor /opt/monitor -R
+COPY --chown=monitor --from=0 /go/src/github.com/onap/multicloud-k8s/src/monitor/monitor ./
-ENTRYPOINT ["/usr/local/bin/entrypoint"]
+USER monitor
+ENTRYPOINT ["/opt/monitor/monitor"]
-USER ${USER_UID}
diff --git a/src/monitor/deploy/cluster_role.yaml b/src/monitor/deploy/cluster_role.yaml
new file mode 100644
index 00000000..0732e8d3
--- /dev/null
+++ b/src/monitor/deploy/cluster_role.yaml
@@ -0,0 +1,72 @@
+apiVersion: rbac.authorization.k8s.io/v1
+kind: ClusterRole
+metadata:
+ creationTimestamp: null
+ name: monitor
+rules:
+- apiGroups:
+ - ""
+ resources:
+ - pods
+ - services
+ - endpoints
+ - persistentvolumeclaims
+ - events
+ - configmaps
+ - secrets
+ verbs:
+ - '*'
+- apiGroups:
+ - apps
+ resources:
+ - deployments
+ - daemonsets
+ - replicasets
+ - statefulsets
+ verbs:
+ - '*'
+- apiGroups:
+ - monitoring.coreos.com
+ resources:
+ - servicemonitors
+ verbs:
+ - get
+ - create
+- apiGroups:
+ - apps
+ resourceNames:
+ - monitor
+ resources:
+ - deployments/finalizers
+ verbs:
+ - update
+- apiGroups:
+ - ""
+ resources:
+ - pods
+ verbs:
+ - get
+- apiGroups:
+ - apps
+ resources:
+ - replicasets
+ verbs:
+ - get
+- apiGroups:
+ - k8splugin.io
+ resources:
+ - '*'
+ verbs:
+ - '*'
+- apiGroups:
+ - batch
+ resources:
+ - '*'
+ verbs:
+ - '*'
+- apiGroups:
+ - extensions
+ resources:
+ - '*'
+ verbs:
+ - '*'
diff --git a/src/monitor/deploy/clusterrole_binding.yaml b/src/monitor/deploy/clusterrole_binding.yaml
new file mode 100644
index 00000000..73e74039
--- /dev/null
+++ b/src/monitor/deploy/clusterrole_binding.yaml
@@ -0,0 +1,12 @@
+kind: ClusterRoleBinding
+apiVersion: rbac.authorization.k8s.io/v1
+metadata:
+ name: monitor
+subjects:
+- kind: ServiceAccount
+ name: monitor
+ namespace: default
+roleRef:
+ kind: ClusterRole
+ name: monitor
+ apiGroup: rbac.authorization.k8s.io
diff --git a/src/monitor/deploy/monitor-cleanup.sh b/src/monitor/deploy/monitor-cleanup.sh
new file mode 100755
index 00000000..21725073
--- /dev/null
+++ b/src/monitor/deploy/monitor-cleanup.sh
@@ -0,0 +1,6 @@
+kubectl delete rolebinding monitor
+kubectl delete clusterrolebinding monitor
+kubectl delete role monitor
+kubectl delete clusterrole monitor
+kubectl delete serviceaccount monitor
+kubectl delete deploy monitor
diff --git a/src/monitor/deploy/monitor-deploy.sh b/src/monitor/deploy/monitor-deploy.sh
new file mode 100755
index 00000000..47c7120f
--- /dev/null
+++ b/src/monitor/deploy/monitor-deploy.sh
@@ -0,0 +1,6 @@
+kubectl apply -f role.yaml
+kubectl apply -f cluster_role.yaml
+kubectl apply -f role_binding.yaml
+kubectl apply -f clusterrole_binding.yaml
+kubectl apply -f service_account.yaml
+kubectl apply -f operator.yaml
diff --git a/src/monitor/deploy/operator.yaml b/src/monitor/deploy/operator.yaml
index a06c07d3..93e4522c 100644
--- a/src/monitor/deploy/operator.yaml
+++ b/src/monitor/deploy/operator.yaml
@@ -3,30 +3,28 @@ kind: Deployment
metadata:
name: monitor
labels:
- "emco/deployment-id": "bionic-beaver"
+ "emco/deployment-id": "monitor"
spec:
replicas: 1
selector:
matchLabels:
- "emco/deployment-id": "bionic-beaver"
+ "emco/deployment-id": "monitor"
template:
metadata:
labels:
- "emco/deployment-id": "bionic-beaver"
+ "emco/deployment-id": "monitor"
spec:
serviceAccountName: monitor
containers:
- name: monitor
# Replace this with the built image name
- image: k8splugin.io/monitor:latest
+ image: ewmduck/monitor:latest
command:
- - monitor
+ - /opt/monitor/monitor
imagePullPolicy: IfNotPresent
env:
- name: WATCH_NAMESPACE
- valueFrom:
- fieldRef:
- fieldPath: metadata.namespace
+ value: ""
- name: POD_NAME
valueFrom:
fieldRef:
diff --git a/src/monitor/deploy/role.yaml b/src/monitor/deploy/role.yaml
index 4d0fd1b6..c48141ac 100644
--- a/src/monitor/deploy/role.yaml
+++ b/src/monitor/deploy/role.yaml
@@ -58,3 +58,15 @@ rules:
- '*'
verbs:
- '*'
+- apiGroups:
+ - batch
+ resources:
+ - '*'
+ verbs:
+ - '*'
+- apiGroups:
+ - extensions
+ resources:
+ - '*'
+ verbs:
+ - '*'
diff --git a/src/monitor/go.mod b/src/monitor/go.mod
index ec48d268..6eff59af 100644
--- a/src/monitor/go.mod
+++ b/src/monitor/go.mod
@@ -32,6 +32,4 @@ replace (
sigs.k8s.io/controller-tools => sigs.k8s.io/controller-tools v0.1.11-0.20190411181648-9d55346c2bde
)
-// Remove hg dependency using this mirror
replace github.com/operator-framework/operator-sdk => github.com/operator-framework/operator-sdk v0.9.0
-
diff --git a/src/monitor/pkg/apis/k8splugin/v1alpha1/types.go b/src/monitor/pkg/apis/k8splugin/v1alpha1/types.go
index 231f226e..064591fc 100644
--- a/src/monitor/pkg/apis/k8splugin/v1alpha1/types.go
+++ b/src/monitor/pkg/apis/k8splugin/v1alpha1/types.go
@@ -15,8 +15,8 @@ import (
// +kubebuilder:subresource:status
// +genclient
type ResourceBundleState struct {
- metav1.TypeMeta `json:",inline"`
- metav1.ObjectMeta `json:"metadata,omitempty" protobuf:"bytes,1,opt,name=metadata"`
+ metav1.TypeMeta `json:",inline" yaml:",inline"`
+ metav1.ObjectMeta `json:"metadata,omitempty" protobuf:"bytes,1,opt,name=metadata" yaml:"metadata"`
Spec ResourceBundleStateSpec `json:"spec,omitempty" protobuf:"bytes,2,opt,name=spec"`
Status ResourceBundleStatus `json:"status,omitempty" protobuf:"bytes,3,opt,name=status"`
diff --git a/src/orchestrator/api/api.go b/src/orchestrator/api/api.go
index 2470a1be..5abbb96d 100644
--- a/src/orchestrator/api/api.go
+++ b/src/orchestrator/api/api.go
@@ -180,6 +180,8 @@ func NewRouter(projectClient moduleLib.ProjectManager,
}
router.HandleFunc("/projects/{project-name}/composite-apps/{composite-app-name}/{composite-app-version}/deployment-intent-groups/{deployment-intent-group-name}/instantiate", instantiationHandler.instantiateHandler).Methods("POST")
+ router.HandleFunc("/projects/{project-name}/composite-apps/{composite-app-name}/{composite-app-version}/deployment-intent-groups/{deployment-intent-group-name}/terminate", instantiationHandler.terminateHandler).Methods("POST")
+ router.HandleFunc("/projects/{project-name}/composite-apps/{composite-app-name}/{composite-app-version}/deployment-intent-groups/{deployment-intent-group-name}/status", instantiationHandler.statusHandler).Methods("GET")
return router
}
diff --git a/src/orchestrator/api/instantiation_handler.go b/src/orchestrator/api/instantiation_handler.go
index c95785f2..ce50e5b8 100644
--- a/src/orchestrator/api/instantiation_handler.go
+++ b/src/orchestrator/api/instantiation_handler.go
@@ -17,9 +17,11 @@
package api
import (
+ "encoding/json"
+ "net/http"
+
"github.com/gorilla/mux"
moduleLib "github.com/onap/multicloud-k8s/src/orchestrator/pkg/module"
- "net/http"
)
/* Used to store backend implementation objects
@@ -45,3 +47,45 @@ func (h instantiationHandler) instantiateHandler(w http.ResponseWriter, r *http.
w.WriteHeader(http.StatusAccepted)
}
+
+func (h instantiationHandler) terminateHandler(w http.ResponseWriter, r *http.Request) {
+
+ vars := mux.Vars(r)
+ p := vars["project-name"]
+ ca := vars["composite-app-name"]
+ v := vars["composite-app-version"]
+ di := vars["deployment-intent-group-name"]
+
+ iErr := h.client.Terminate(p, ca, v, di)
+ if iErr != nil {
+ http.Error(w, iErr.Error(), http.StatusInternalServerError)
+ return
+ }
+ w.WriteHeader(http.StatusAccepted)
+
+}
+
+func (h instantiationHandler) statusHandler(w http.ResponseWriter, r *http.Request) {
+
+ vars := mux.Vars(r)
+ p := vars["project-name"]
+ ca := vars["composite-app-name"]
+ v := vars["composite-app-version"]
+ di := vars["deployment-intent-group-name"]
+
+ status, iErr := h.client.Status(p, ca, v, di)
+ if iErr != nil {
+ http.Error(w, iErr.Error(), http.StatusInternalServerError)
+ return
+ }
+
+ w.Header().Set("Content-Type", "application/json")
+ w.WriteHeader(http.StatusOK)
+ iErr = json.NewEncoder(w).Encode(status)
+ if iErr != nil {
+ http.Error(w, iErr.Error(), http.StatusInternalServerError)
+ return
+ }
+ w.WriteHeader(http.StatusAccepted)
+
+}
diff --git a/src/orchestrator/go.mod b/src/orchestrator/go.mod
index 223dc068..3f14f00b 100644
--- a/src/orchestrator/go.mod
+++ b/src/orchestrator/go.mod
@@ -2,17 +2,12 @@ module github.com/onap/multicloud-k8s/src/orchestrator
require (
github.com/MakeNowJust/heredoc v1.0.0 // indirect
- github.com/Masterminds/goutils v1.1.0 // indirect
github.com/Masterminds/semver v1.5.0 // indirect
github.com/Masterminds/sprig v2.22.0+incompatible // indirect
github.com/chai2010/gettext-go v0.0.0-20170215093142-bf70f2a70fb1
github.com/coreos/etcd v3.3.12+incompatible
- github.com/cyphar/filepath-securejoin v0.2.2 // indirect
github.com/docker/docker v1.13.1 // indirect
- github.com/docker/spdystream v0.0.0-20181023171402-6480d4af844c // indirect
- github.com/exponent-io/jsonpath v0.0.0-20151013193312-d6023ce2651d // indirect
github.com/ghodss/yaml v1.0.0
- github.com/gobwas/glob v0.2.3 // indirect
github.com/golang/protobuf v1.4.1
github.com/gorilla/handlers v1.3.0
github.com/gorilla/mux v1.7.3
@@ -21,34 +16,32 @@ require (
github.com/lib/pq v1.6.0 // indirect
github.com/liggitt/tabwriter v0.0.0-20181228230101-89fcab3d43de // indirect
github.com/mitchellh/copystructure v1.0.0 // indirect
- github.com/mitchellh/go-wordwrap v1.0.0 // indirect
github.com/onap/multicloud-k8s/src/clm v0.0.0-00010101000000-000000000000
+ github.com/onap/multicloud-k8s/src/monitor v0.0.0-20200630152613-7c20f73e7c5d
github.com/onap/multicloud-k8s/src/ncm v0.0.0-20200515060444-c77850a75eee
+ github.com/onap/multicloud-k8s/src/rsync v0.0.0-20200630152613-7c20f73e7c5d
github.com/pkg/errors v0.8.1
github.com/rubenv/sql-migrate v0.0.0-20200429072036-ae26b214fa43 // indirect
github.com/russross/blackfriday v1.5.2
github.com/sirupsen/logrus v1.4.2
github.com/spf13/cobra v1.0.0 // indirect
- github.com/technosophos/moniker v0.0.0-20180509230615-a5dbd03a2245 // indirect
go.etcd.io/etcd v3.3.12+incompatible
go.mongodb.org/mongo-driver v1.0.0
golang.org/x/net v0.0.0-20200301022130-244492dfa37a
google.golang.org/grpc v1.27.1
google.golang.org/protobuf v1.24.0
gopkg.in/square/go-jose.v2 v2.5.1 // indirect
+ gopkg.in/yaml.v2 v2.2.8
gopkg.in/yaml.v3 v3.0.0-20200506231410-2ff61e1afc86
- k8s.io/apiextensions-apiserver v0.0.0-00010101000000-000000000000 // indirect
k8s.io/apimachinery v0.0.0-20190831074630-461753078381
- k8s.io/apiserver v0.0.0-00010101000000-000000000000 // indirect
- k8s.io/cli-runtime v0.0.0-00010101000000-000000000000 // indirect
k8s.io/cloud-provider v0.0.0-00010101000000-000000000000 // indirect
k8s.io/helm v2.14.3+incompatible
sigs.k8s.io/kustomize v2.0.3+incompatible // indirect
- vbom.ml/util v0.0.0-20180919145318-efcd4e0f9787 // indirect
)
replace (
github.com/onap/multicloud-k8s/src/clm => ../clm
+ github.com/onap/multicloud-k8s/src/monitor => ../monitor
k8s.io/api => k8s.io/api v0.0.0-20190409021203-6e4e0e4f393b
k8s.io/apiextensions-apiserver => k8s.io/apiextensions-apiserver v0.0.0-20190409022649-727a075fdec8
k8s.io/apimachinery => k8s.io/apimachinery v0.0.0-20190404173353-6a84e37a896d
diff --git a/src/orchestrator/pkg/appcontext/appcontext.go b/src/orchestrator/pkg/appcontext/appcontext.go
index a847ae32..cdf23bfa 100644
--- a/src/orchestrator/pkg/appcontext/appcontext.go
+++ b/src/orchestrator/pkg/appcontext/appcontext.go
@@ -18,10 +18,11 @@ package appcontext
import (
"fmt"
+ "strings"
+
log "github.com/onap/multicloud-k8s/src/orchestrator/pkg/infra/logutils"
"github.com/onap/multicloud-k8s/src/orchestrator/pkg/rtcontext"
pkgerrors "github.com/pkg/errors"
- "strings"
)
// metaPrefix used for denoting clusterMeta level
@@ -413,6 +414,59 @@ func (ac *AppContext) GetResourceInstruction(appname string, clustername string,
return v, nil
}
+//AddStatus for holding status of all resources under app and cluster
+// handle should be a cluster handle
+func (ac *AppContext) AddStatus(handle interface{}, value interface{}) (interface{}, error) {
+ h, err := ac.rtc.RtcAddStatus(handle, value)
+ if err != nil {
+ return nil, err
+ }
+ log.Info(":: Added status handle ::", log.Fields{"StatusHandler": h})
+
+ return h, nil
+}
+
+//DeleteStatus for the given the handle
+func (ac *AppContext) DeleteStatus(handle interface{}) error {
+ err := ac.rtc.RtcDeletePair(handle)
+ if err != nil {
+ return err
+ }
+ return nil
+}
+
+//Return the handle for status for a given app and cluster
+func (ac *AppContext) GetStatusHandle(appname string, clustername string) (interface{}, error) {
+ if appname == "" {
+ return nil, pkgerrors.Errorf("Not a valid run time context app name")
+ }
+ if clustername == "" {
+ return nil, pkgerrors.Errorf("Not a valid run time context cluster name")
+ }
+
+ rh, err := ac.rtc.RtcGet()
+ if err != nil {
+ return nil, err
+ }
+
+ acrh := fmt.Sprintf("%v", rh) + "app/" + appname + "/cluster/" + clustername + "/status/"
+ hs, err := ac.rtc.RtcGetHandles(acrh)
+ if err != nil {
+ return nil, err
+ }
+ for _, v := range hs {
+ if v == acrh {
+ return v, nil
+ }
+ }
+ return nil, pkgerrors.Errorf("No handle was found for the given resource")
+}
+
+//UpdateStatusValue updates the status value with the given handle
+func (ac *AppContext) UpdateStatusValue(handle interface{}, value interface{}) error {
+ return ac.rtc.RtcUpdateValue(handle, value)
+}
+
//Return all the handles under the composite app
func (ac *AppContext) GetAllHandles(handle interface{}) ([]interface{}, error) {
hs, err := ac.rtc.RtcGetHandles(handle)
diff --git a/src/orchestrator/pkg/appcontext/appcontext_test.go b/src/orchestrator/pkg/appcontext/appcontext_test.go
index 05c73703..92c43113 100644
--- a/src/orchestrator/pkg/appcontext/appcontext_test.go
+++ b/src/orchestrator/pkg/appcontext/appcontext_test.go
@@ -145,6 +145,10 @@ func (c *MockRunTimeContext) RtcUpdateValue(handle interface{}, value interface{
return c.Err
}
+func (rtc *MockRunTimeContext) RtcAddStatus(handle interface{}, value interface{}) (interface{}, error) {
+ return nil, nil
+}
+
func TestCreateCompositeApp(t *testing.T) {
var ac = AppContext{}
testCases := []struct {
diff --git a/src/orchestrator/pkg/module/deployment_intent_groups.go b/src/orchestrator/pkg/module/deployment_intent_groups.go
index 16a14c7b..3412a034 100644
--- a/src/orchestrator/pkg/module/deployment_intent_groups.go
+++ b/src/orchestrator/pkg/module/deployment_intent_groups.go
@@ -20,6 +20,7 @@ import (
"encoding/json"
"reflect"
+ appcontext "github.com/onap/multicloud-k8s/src/orchestrator/pkg/appcontext"
"github.com/onap/multicloud-k8s/src/orchestrator/pkg/infra/db"
pkgerrors "github.com/pkg/errors"
@@ -61,6 +62,7 @@ type OverrideValues struct {
type DeploymentIntentGroupManager interface {
CreateDeploymentIntentGroup(d DeploymentIntentGroup, p string, ca string, v string) (DeploymentIntentGroup, error)
GetDeploymentIntentGroup(di string, p string, ca string, v string) (DeploymentIntentGroup, error)
+ GetDeploymentIntentGroupContext(di string, p string, ca string, v string) (appcontext.AppContext, error)
DeleteDeploymentIntentGroup(di string, p string, ca string, v string) error
}
@@ -86,6 +88,7 @@ func (dk DeploymentIntentGroupKey) String() string {
type DeploymentIntentGroupClient struct {
storeName string
tagMetaData string
+ tagContext string
}
// NewDeploymentIntentGroupClient return an instance of DeploymentIntentGroupClient which implements DeploymentIntentGroupManager
@@ -93,6 +96,7 @@ func NewDeploymentIntentGroupClient() *DeploymentIntentGroupClient {
return &DeploymentIntentGroupClient{
storeName: "orchestrator",
tagMetaData: "deploymentintentgroupmetadata",
+ tagContext: "contextid",
}
}
@@ -160,6 +164,34 @@ func (c *DeploymentIntentGroupClient) GetDeploymentIntentGroup(di string, p stri
}
+// GetDeploymentIntentGroup returns the DeploymentIntentGroup with a given name, project, compositeApp and version of compositeApp
+func (c *DeploymentIntentGroupClient) GetDeploymentIntentGroupContext(di string, p string, ca string, v string) (appcontext.AppContext, error) {
+
+ key := DeploymentIntentGroupKey{
+ Name: di,
+ Project: p,
+ CompositeApp: ca,
+ Version: v,
+ }
+
+ result, err := db.DBconn.Find(c.storeName, key, c.tagContext)
+ if err != nil {
+ return appcontext.AppContext{}, pkgerrors.Wrap(err, "Get DeploymentIntentGroup Context error")
+ }
+
+ if result != nil {
+ ctxVal := string(result[0])
+ var cc appcontext.AppContext
+ _, err = cc.LoadAppContext(ctxVal)
+ if err != nil {
+ return appcontext.AppContext{}, pkgerrors.Wrap(err, "Error loading DeploymentIntentGroup Appcontext")
+ }
+ return cc, nil
+ }
+
+ return appcontext.AppContext{}, pkgerrors.New("Error getting DeploymentIntentGroup AppContext")
+}
+
// DeleteDeploymentIntentGroup deletes a DeploymentIntentGroup
func (c *DeploymentIntentGroupClient) DeleteDeploymentIntentGroup(di string, p string, ca string, v string) error {
k := DeploymentIntentGroupKey{
diff --git a/src/orchestrator/pkg/module/instantiation.go b/src/orchestrator/pkg/module/instantiation.go
index 043b80f2..9432e4b9 100644
--- a/src/orchestrator/pkg/module/instantiation.go
+++ b/src/orchestrator/pkg/module/instantiation.go
@@ -21,6 +21,7 @@ import (
"encoding/json"
"fmt"
+ rb "github.com/onap/multicloud-k8s/src/monitor/pkg/apis/k8splugin/v1alpha1"
gpic "github.com/onap/multicloud-k8s/src/orchestrator/pkg/gpic"
"github.com/onap/multicloud-k8s/src/orchestrator/pkg/infra/db"
log "github.com/onap/multicloud-k8s/src/orchestrator/pkg/infra/logutils"
@@ -42,17 +43,25 @@ type InstantiationClient struct {
db InstantiationClientDbInfo
}
+type ClusterAppStatus struct {
+ Cluster string
+ App string
+ Status rb.ResourceBundleStatus
+}
+
+type StatusData struct {
+ Data []ClusterAppStatus
+}
+
/*
InstantiationKey used in storing the contextid in the momgodb
It consists of
-GenericPlacementIntentName,
ProjectName,
CompositeAppName,
CompositeAppVersion,
DeploymentIntentGroup
*/
type InstantiationKey struct {
- IntentName string
Project string
CompositeApp string
Version string
@@ -64,6 +73,8 @@ type InstantiationKey struct {
type InstantiationManager interface {
//ApproveInstantiation(p string, ca string, v string, di string) (error)
Instantiate(p string, ca string, v string, di string) error
+ Status(p string, ca string, v string, di string) (StatusData, error)
+ Terminate(p string, ca string, v string, di string) error
}
// InstantiationClientDbInfo consists of storeName and tagContext
@@ -229,6 +240,12 @@ func (c InstantiationClient) Instantiate(p string, ca string, v string, di strin
return pkgerrors.Wrapf(err, "Unable to get the resources for app :: %s", eachApp.Metadata.Name)
}
+ statusResource, err := getStatusResource(ctxval.(string), eachApp.Metadata.Name)
+ if err != nil {
+ return pkgerrors.Wrapf(err, "Unable to generate the status resource for app :: %s", eachApp.Metadata.Name)
+ }
+ resources = append(resources, statusResource)
+
specData, err := NewAppIntentClient().GetAllIntentsByApp(eachApp.Metadata.Name, p, ca, v, gIntent)
if err != nil {
return pkgerrors.Wrap(err, "Unable to get the intents for app")
@@ -269,12 +286,11 @@ func (c InstantiationClient) Instantiate(p string, ca string, v string, di strin
//END: storing into etcd
// BEGIN:: save the context in the orchestrator db record
- key := InstantiationKey{
- IntentName: gIntent,
- Project: p,
- CompositeApp: ca,
- Version: v,
- DeploymentIntentGroup: di,
+ key := DeploymentIntentGroupKey{
+ Name: di,
+ Project: p,
+ CompositeApp: ca,
+ Version: v,
}
err = db.DBconn.Insert(c.db.storeName, key, nil, c.db.tagContext, ctxval)
@@ -324,3 +340,84 @@ func (c InstantiationClient) Instantiate(p string, ca string, v string, di strin
log.Info(":: Done with instantiation... ::", log.Fields{"CompositeAppName": ca})
return err
}
+
+/*
+Status takes in projectName, compositeAppName, compositeAppVersion,
+DeploymentIntentName. This method is responsible obtaining the status of
+the deployment, which is made available in the appcontext.
+*/
+func (c InstantiationClient) Status(p string, ca string, v string, di string) (StatusData, error) {
+
+ ac, err := NewDeploymentIntentGroupClient().GetDeploymentIntentGroupContext(di, p, ca, v)
+ if err != nil {
+ return StatusData{}, pkgerrors.Wrap(err, "deploymentIntentGroup not found "+di)
+ }
+
+ // Get all apps in this composite app
+ allApps, err := NewAppClient().GetApps(p, ca, v)
+ if err != nil {
+ return StatusData{}, pkgerrors.Wrap(err, "Not finding the apps")
+ }
+
+ var diStatus StatusData
+ diStatus.Data = make([]ClusterAppStatus, 0)
+
+ // Loop through each app and get the status data for each cluster in the app
+ for _, app := range allApps {
+ // Get the clusters in the appcontext for this app
+ clusters, err := ac.GetClusterNames(app.Metadata.Name)
+ if err != nil {
+ log.Info(":: No clusters for app ::", log.Fields{"AppName": app.Metadata.Name})
+ continue
+ }
+
+ for _, cluster := range clusters {
+ handle, err := ac.GetStatusHandle(app.Metadata.Name, cluster)
+ if err != nil {
+ log.Info(":: No status handle for cluster, app ::",
+ log.Fields{"Cluster": cluster, "AppName": app.Metadata.Name, "Error": err})
+ continue
+ }
+ statusValue, err := ac.GetValue(handle)
+ if err != nil {
+ log.Info(":: No status value for cluster, app ::",
+ log.Fields{"Cluster": cluster, "AppName": app.Metadata.Name, "Error": err})
+ continue
+ }
+ log.Info(":: STATUS VALUE ::", log.Fields{"statusValue": statusValue})
+ var statusData ClusterAppStatus
+ err = json.Unmarshal([]byte(statusValue.(string)), &statusData.Status)
+ if err != nil {
+ log.Info(":: Error unmarshaling status value for cluster, app ::",
+ log.Fields{"Cluster": cluster, "AppName": app.Metadata.Name, "Error": err})
+ continue
+ }
+ statusData.Cluster = cluster
+ statusData.App = app.Metadata.Name
+ log.Info(":: STATUS DATA ::", log.Fields{"status": statusData})
+
+ diStatus.Data = append(diStatus.Data, statusData)
+ }
+ }
+
+ return diStatus, nil
+}
+
+/*
+Terminate takes in projectName, compositeAppName, compositeAppVersion,
+DeploymentIntentName and calls rsync to terminate.
+*/
+func (c InstantiationClient) Terminate(p string, ca string, v string, di string) error {
+
+ //ac, err := NewDeploymentIntentGroupClient().GetDeploymentIntentGroupContext(di, p, ca, v)
+ _, err := NewDeploymentIntentGroupClient().GetDeploymentIntentGroupContext(di, p, ca, v)
+ if err != nil {
+ return pkgerrors.Wrap(err, "deploymentIntentGroup not found "+di)
+ }
+
+ // TODO - make call to rsync to terminate the composite app deployment
+ // will leave the appcontext in place for clean up later
+ // so monitoring status can be performed
+
+ return nil
+}
diff --git a/src/orchestrator/pkg/module/instantiation_appcontext_helper.go b/src/orchestrator/pkg/module/instantiation_appcontext_helper.go
index 43ddd6df..e6e2bf30 100644
--- a/src/orchestrator/pkg/module/instantiation_appcontext_helper.go
+++ b/src/orchestrator/pkg/module/instantiation_appcontext_helper.go
@@ -25,12 +25,16 @@ import (
"encoding/json"
"io/ioutil"
+ jyaml "github.com/ghodss/yaml"
+
+ rb "github.com/onap/multicloud-k8s/src/monitor/pkg/apis/k8splugin/v1alpha1"
"github.com/onap/multicloud-k8s/src/orchestrator/pkg/appcontext"
gpic "github.com/onap/multicloud-k8s/src/orchestrator/pkg/gpic"
log "github.com/onap/multicloud-k8s/src/orchestrator/pkg/infra/logutils"
"github.com/onap/multicloud-k8s/src/orchestrator/utils"
"github.com/onap/multicloud-k8s/src/orchestrator/utils/helm"
pkgerrors "github.com/pkg/errors"
+ metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)
// resource consists of name of reource
@@ -90,6 +94,45 @@ func getResources(st []helm.KubernetesResourceTemplate) ([]resource, error) {
return resources, nil
}
+// addStatusResource adds a status monitoring resource to the app
+// which consists of name(name+kind) and content
+func getStatusResource(id, app string) (resource, error) {
+
+ var statusCr rb.ResourceBundleState
+
+ label := id + "-" + app
+ name := app + "-" + id
+
+ statusCr.TypeMeta.APIVersion = "k8splugin.io/v1alpha1"
+ statusCr.TypeMeta.Kind = "ResourceBundleState"
+ statusCr.SetName(name)
+
+ labels := make(map[string]string)
+ labels["emco/deployment-id"] = label
+ statusCr.SetLabels(labels)
+
+ labelSelector, err := metav1.ParseToLabelSelector("emco/deployment-id = " + label)
+ if err != nil {
+ log.Info(":: ERROR Parsing Label Selector ::", log.Fields{"Error": err})
+ } else {
+ statusCr.Spec.Selector = labelSelector
+ }
+
+ // Marshaling to json then convert to yaml works better than marshaling to yaml
+ // The 'apiVersion' attribute was marshaling to 'apiversion'
+ // y, _ := yaml.Marshal(&statusCr)
+ j, _ := json.Marshal(&statusCr)
+ y, _ := jyaml.JSONToYAML(j)
+ log.Info(":: RESULTING STATUS CR ::", log.Fields{"StatusCR": y})
+
+ statusResource := resource{
+ name: name + "+" + "ResourceBundleState",
+ filecontent: string(y),
+ }
+
+ return statusResource, nil
+}
+
func addResourcesToCluster(ct appcontext.AppContext, ch interface{}, resources []resource) error {
var resOrderInstr struct {
diff --git a/src/orchestrator/pkg/rtcontext/rtcontext.go b/src/orchestrator/pkg/rtcontext/rtcontext.go
index 432c5d87..f3905eb0 100644
--- a/src/orchestrator/pkg/rtcontext/rtcontext.go
+++ b/src/orchestrator/pkg/rtcontext/rtcontext.go
@@ -18,11 +18,12 @@ package rtcontext
import (
"fmt"
- "github.com/onap/multicloud-k8s/src/orchestrator/pkg/infra/contextdb"
- pkgerrors "github.com/pkg/errors"
"math/rand"
"strings"
"time"
+
+ "github.com/onap/multicloud-k8s/src/orchestrator/pkg/infra/contextdb"
+ pkgerrors "github.com/pkg/errors"
)
const maxrand = 0x7fffffffffffffff
@@ -40,6 +41,7 @@ type Rtcontext interface {
RtcAddMeta(meta interface{}) error
RtcGet() (interface{}, error)
RtcAddLevel(handle interface{}, level string, value string) (interface{}, error)
+ RtcAddStatus(handle interface{}, value interface{}) (interface{}, error)
RtcAddResource(handle interface{}, resname string, value interface{}) (interface{}, error)
RtcAddInstruction(handle interface{}, level string, insttype string, value interface{}) (interface{}, error)
RtcDeletePair(handle interface{}) error
@@ -201,6 +203,26 @@ func (rtc *RunTimeContext) RtcAddOneLevel(pl interface{}, level string, value in
return (interface{})(key), nil
}
+// Add status under the given level and return new handle
+func (rtc *RunTimeContext) RtcAddStatus(handle interface{}, value interface{}) (interface{}, error) {
+
+ str := fmt.Sprintf("%v", handle)
+ sid := fmt.Sprintf("%v", rtc.cid)
+ if !strings.HasPrefix(str, sid) {
+ return nil, pkgerrors.Errorf("Not a valid run time context handle")
+ }
+ if value == nil {
+ return nil, pkgerrors.Errorf("Not a valid run time context resource value")
+ }
+
+ k := str + "status" + "/"
+ err := contextdb.Db.Put(k, value)
+ if err != nil {
+ return nil, pkgerrors.Errorf("Error adding run time context status: %s", err.Error())
+ }
+ return (interface{})(k), nil
+}
+
// Add a resource under the given level and return new handle
func (rtc *RunTimeContext) RtcAddResource(handle interface{}, resname string, value interface{}) (interface{}, error) {
diff --git a/src/rsync/pkg/connector/connector.go b/src/rsync/pkg/connector/connector.go
index 6e17f87a..fc8aa839 100644
--- a/src/rsync/pkg/connector/connector.go
+++ b/src/rsync/pkg/connector/connector.go
@@ -19,8 +19,6 @@ package connector
import (
"log"
- "github.com/onap/multicloud-k8s/src/rsync/pkg/internal/config"
-
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/meta"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
@@ -52,7 +50,7 @@ type KubernetesConnector interface {
// Reference is the interface that is implemented
type Reference interface {
//Create a kubernetes resource described by the yaml in yamlFilePath
- Create(yamlFilePath string, namespace string, client KubernetesConnector) (string, error)
+ Create(yamlFilePath string, namespace string, label string, client KubernetesConnector) (string, error)
//Delete a kubernetes resource described in the provided namespace
Delete(yamlFilePath string, resname string, namespace string, client KubernetesConnector) error
}
@@ -86,7 +84,7 @@ func TagPodsIfPresent(unstruct *unstructured.Unstructured, tag string) {
if labels == nil {
labels = map[string]string{}
}
- labels[config.GetConfiguration().KubernetesLabelName] = tag
+ labels["emco/deployment-id"] = tag
podTemplateSpec.SetLabels(labels)
updatedTemplate, err := runtime.DefaultUnstructuredConverter.ToUnstructured(podTemplateSpec)
diff --git a/src/rsync/pkg/context/context.go b/src/rsync/pkg/context/context.go
index e5da1296..7e0fce3c 100644
--- a/src/rsync/pkg/context/context.go
+++ b/src/rsync/pkg/context/context.go
@@ -18,67 +18,68 @@ package context
import (
"encoding/json"
- "fmt"
- "log"
- "sync"
- "strings"
- "github.com/onap/multicloud-k8s/src/orchestrator/pkg/appcontext"
- pkgerrors "github.com/pkg/errors"
- res "github.com/onap/multicloud-k8s/src/rsync/pkg/resource"
- con "github.com/onap/multicloud-k8s/src/rsync/pkg/connector"
- "github.com/onap/multicloud-k8s/src/rsync/pkg/app"
+ "fmt"
+ "log"
+ "strings"
+ "sync"
+
+ "github.com/onap/multicloud-k8s/src/orchestrator/pkg/appcontext"
+ "github.com/onap/multicloud-k8s/src/rsync/pkg/app"
+ con "github.com/onap/multicloud-k8s/src/rsync/pkg/connector"
+ res "github.com/onap/multicloud-k8s/src/rsync/pkg/resource"
+ status "github.com/onap/multicloud-k8s/src/rsync/pkg/status"
+ pkgerrors "github.com/pkg/errors"
)
type CompositeAppContext struct {
- cid interface{}
+ cid interface{}
appsorder string
appsdependency string
- appsmap []instMap
+ appsmap []instMap
}
type clusterInfo struct {
- name string
+ name string
resorder string
resdependency string
- ressmap []instMap
+ ressmap []instMap
}
type instMap struct {
- name string
- depinfo string
- status string
- rerr error
+ name string
+ depinfo string
+ status string
+ rerr error
clusters []clusterInfo
}
func getInstMap(order string, dependency string, level string) ([]instMap, error) {
- if order == "" {
- return nil, pkgerrors.Errorf("Not a valid order value")
- }
- if dependency == "" {
- return nil, pkgerrors.Errorf("Not a valid dependency value")
- }
-
- if !(level == "app" || level == "res") {
- return nil, pkgerrors.Errorf("Not a valid level name given to create map")
- }
+ if order == "" {
+ return nil, pkgerrors.Errorf("Not a valid order value")
+ }
+ if dependency == "" {
+ return nil, pkgerrors.Errorf("Not a valid dependency value")
+ }
+ if !(level == "app" || level == "res") {
+ return nil, pkgerrors.Errorf("Not a valid level name given to create map")
+ }
- var aov map[string]interface{}
- json.Unmarshal([]byte(order), &aov)
+ var aov map[string]interface{}
+ json.Unmarshal([]byte(order), &aov)
- s := fmt.Sprintf("%vorder", level)
- appso := aov[s].([]interface{})
- var instmap = make([]instMap, len(appso))
+ s := fmt.Sprintf("%vorder", level)
+ appso := aov[s].([]interface{})
+ var instmap = make([]instMap, len(appso))
- var adv map[string]interface{}
- json.Unmarshal([]byte(dependency), &adv)
- s = fmt.Sprintf("%vdependency", level)
- appsd := adv[s].(map[string]interface{})
- for i, u := range appso {
- instmap[i] = instMap{u.(string), appsd[u.(string)].(string), "none", nil, nil}
- }
+ var adv map[string]interface{}
+ json.Unmarshal([]byte(dependency), &adv)
+ s = fmt.Sprintf("%vdependency", level)
+ appsd := adv[s].(map[string]interface{})
+ for i, u := range appso {
+ instmap[i] = instMap{u.(string), appsd[u.(string)].(string), "none", nil, nil}
+ }
- return instmap, nil
+ return instmap, nil
}
func deleteResource(clustername string, resname string, respath string) error {
@@ -94,7 +95,7 @@ func deleteResource(clustername string, resname string, respath string) error {
var gp res.Resource
err = gp.Delete(respath, resname, "default", c)
if err != nil {
- log.Println("Delete resource failed: " + err.Error() + resname)
+ log.Println("Delete resource failed: " + err.Error() + resname)
return err
}
log.Println("Resource succesfully deleted", resname)
@@ -102,7 +103,7 @@ func deleteResource(clustername string, resname string, respath string) error {
}
-func createResource(clustername string, resname string, respath string) error {
+func createResource(clustername string, resname string, respath string, label string) error {
k8sClient := app.KubernetesClient{}
err := k8sClient.Init(clustername, resname)
if err != nil {
@@ -113,9 +114,9 @@ func createResource(clustername string, resname string, respath string) error {
var c con.KubernetesConnector
c = &k8sClient
var gp res.Resource
- _, err = gp.Create(respath,"default", c)
+ _, err = gp.Create(respath, "default", label, c)
if err != nil {
- log.Println("Create failed: " + err.Error() + resname)
+ log.Println("Create failed: " + err.Error() + resname)
return err
}
log.Println("Resource succesfully created", resname)
@@ -152,7 +153,7 @@ func terminateResource(ac appcontext.AppContext, resmap instMap, appname string,
}
-func instantiateResource(ac appcontext.AppContext, resmap instMap, appname string, clustername string) error {
+func instantiateResource(ac appcontext.AppContext, resmap instMap, appname string, clustername string, label string) error {
rh, err := ac.GetResourceHandle(appname, clustername, resmap.name)
if err != nil {
return err
@@ -168,7 +169,7 @@ func instantiateResource(ac appcontext.AppContext, resmap instMap, appname strin
if result[0] == "" {
return pkgerrors.Errorf("Resource name is nil")
}
- err = createResource(clustername, result[0], resval.(string))
+ err = createResource(clustername, result[0], resval.(string), label)
if err != nil {
return err
}
@@ -180,97 +181,102 @@ func instantiateResource(ac appcontext.AppContext, resmap instMap, appname strin
}
-func terminateResources(ac appcontext.AppContext, ressmap []instMap, appname string, clustername string) error {
- var wg sync.WaitGroup
- var chans = make([]chan int, len(ressmap))
- for l := range chans {
- chans[l] = make(chan int)
- }
- for i:=0; i<len(ressmap); i++ {
- wg.Add(1)
- go func(index int) {
- if ressmap[index].depinfo == "go" {
- ressmap[index].status = "start"
- } else {
- ressmap[index].status = "waiting"
- c := <- chans[index]
- if c != index {
+func terminateResources(ac appcontext.AppContext, ressmap []instMap, appname string, clustername string) error {
+ var wg sync.WaitGroup
+ var chans = make([]chan int, len(ressmap))
+ for l := range chans {
+ chans[l] = make(chan int)
+ }
+ for i := 0; i < len(ressmap); i++ {
+ wg.Add(1)
+ go func(index int) {
+ if ressmap[index].depinfo == "go" {
+ ressmap[index].status = "start"
+ } else {
+ ressmap[index].status = "waiting"
+ c := <-chans[index]
+ if c != index {
ressmap[index].status = "error"
- ressmap[index].rerr = pkgerrors.Errorf("channel does not match")
+ ressmap[index].rerr = pkgerrors.Errorf("channel does not match")
wg.Done()
return
- }
- ressmap[index].status = "start"
- }
- ressmap[index].rerr = terminateResource(ac, ressmap[index], appname, clustername)
- ressmap[index].status = "done"
- waitstr := fmt.Sprintf("wait on %v",ressmap[index].name)
- for j:=0; j<len(ressmap); j++ {
- if ressmap[j].depinfo == waitstr {
- chans[j] <- j
- }
- }
- wg.Done()
- }(i)
- }
- wg.Wait()
- for k:=0; k<len(ressmap); k++ {
- if ressmap[k].rerr != nil {
- return pkgerrors.Errorf("error during resources termination")
- }
- }
- return nil
+ }
+ ressmap[index].status = "start"
+ }
+ ressmap[index].rerr = terminateResource(ac, ressmap[index], appname, clustername)
+ ressmap[index].status = "done"
+ waitstr := fmt.Sprintf("wait on %v", ressmap[index].name)
+ for j := 0; j < len(ressmap); j++ {
+ if ressmap[j].depinfo == waitstr {
+ chans[j] <- j
+ }
+ }
+ wg.Done()
+ }(i)
+ }
+ wg.Wait()
+ for k := 0; k < len(ressmap); k++ {
+ if ressmap[k].rerr != nil {
+ return pkgerrors.Errorf("error during resources termination")
+ }
+ }
+ return nil
}
-func instantiateResources(ac appcontext.AppContext, ressmap []instMap, appname string, clustername string) error {
- var wg sync.WaitGroup
- var chans = make([]chan int, len(ressmap))
- for l := range chans {
- chans[l] = make(chan int)
- }
- for i:=0; i<len(ressmap); i++ {
- wg.Add(1)
- go func(index int) {
- if ressmap[index].depinfo == "go" {
- ressmap[index].status = "start"
- } else {
- ressmap[index].status = "waiting"
- c := <- chans[index]
- if c != index {
+func instantiateResources(ac appcontext.AppContext, ressmap []instMap, appname string, clustername string) error {
+ var wg sync.WaitGroup
+ var chans = make([]chan int, len(ressmap))
+ cid, _ := ac.GetCompositeAppHandle()
+
+ results := strings.Split(cid.(string), "/")
+ label := results[2] + "-" + appname
+
+ for l := range chans {
+ chans[l] = make(chan int)
+ }
+ for i := 0; i < len(ressmap); i++ {
+ wg.Add(1)
+ go func(index int) {
+ if ressmap[index].depinfo == "go" {
+ ressmap[index].status = "start"
+ } else {
+ ressmap[index].status = "waiting"
+ c := <-chans[index]
+ if c != index {
ressmap[index].status = "error"
- ressmap[index].rerr = pkgerrors.Errorf("channel does not match")
+ ressmap[index].rerr = pkgerrors.Errorf("channel does not match")
wg.Done()
return
- }
- ressmap[index].status = "start"
- }
- ressmap[index].rerr = instantiateResource(ac, ressmap[index], appname, clustername)
- ressmap[index].status = "done"
- waitstr := fmt.Sprintf("wait on %v",ressmap[index].name)
- for j:=0; j<len(ressmap); j++ {
- if ressmap[j].depinfo == waitstr {
- chans[j] <- j
- }
- }
- wg.Done()
- }(i)
- }
- wg.Wait()
- for k:=0; k<len(ressmap); k++ {
- if ressmap[k].rerr != nil {
- return pkgerrors.Errorf("error during resources instantiation")
- }
- }
- return nil
+ }
+ ressmap[index].status = "start"
+ }
+ ressmap[index].rerr = instantiateResource(ac, ressmap[index], appname, clustername, label)
+ ressmap[index].status = "done"
+ waitstr := fmt.Sprintf("wait on %v", ressmap[index].name)
+ for j := 0; j < len(ressmap); j++ {
+ if ressmap[j].depinfo == waitstr {
+ chans[j] <- j
+ }
+ }
+ wg.Done()
+ }(i)
+ }
+ wg.Wait()
+ for k := 0; k < len(ressmap); k++ {
+ if ressmap[k].rerr != nil {
+ return pkgerrors.Errorf("error during resources instantiation")
+ }
+ }
+ return nil
}
func terminateApp(ac appcontext.AppContext, appmap instMap) error {
- for i:=0; i<len(appmap.clusters); i++ {
+ for i := 0; i < len(appmap.clusters); i++ {
err := terminateResources(ac, appmap.clusters[i].ressmap, appmap.name,
- appmap.clusters[i].name)
+ appmap.clusters[i].name)
if err != nil {
return err
}
@@ -281,38 +287,41 @@ func terminateApp(ac appcontext.AppContext, appmap instMap) error {
}
-
func instantiateApp(ac appcontext.AppContext, appmap instMap) error {
- for i:=0; i<len(appmap.clusters); i++ {
+ for i := 0; i < len(appmap.clusters); i++ {
err := instantiateResources(ac, appmap.clusters[i].ressmap, appmap.name,
- appmap.clusters[i].name)
+ appmap.clusters[i].name)
if err != nil {
return err
}
+ err = status.StartClusterWatcher(appmap.clusters[i].name)
+ if err != nil {
+ log.Printf("Error starting Cluster Watcher %v: %v\n", appmap.clusters[i], err)
+ }
}
log.Println("Instantiation of app done: " + appmap.name)
return nil
}
-func instantiateApps(ac appcontext.AppContext, appsmap []instMap) error {
+func instantiateApps(ac appcontext.AppContext, appsmap []instMap) error {
var wg sync.WaitGroup
var chans = make([]chan int, len(appsmap))
for l := range chans {
chans[l] = make(chan int)
}
- for i:=0; i<len(appsmap); i++ {
+ for i := 0; i < len(appsmap); i++ {
wg.Add(1)
- go func(index int) {
- if appsmap[index].depinfo == "go" {
+ go func(index int) {
+ if appsmap[index].depinfo == "go" {
appsmap[index].status = "start"
} else {
appsmap[index].status = "waiting"
- c := <- chans[index]
+ c := <-chans[index]
if c != index {
appsmap[index].status = "error"
- appsmap[index].rerr = pkgerrors.Errorf("channel does not match")
+ appsmap[index].rerr = pkgerrors.Errorf("channel does not match")
wg.Done()
return
}
@@ -320,17 +329,17 @@ func instantiateApps(ac appcontext.AppContext, appsmap []instMap) error {
}
appsmap[index].rerr = instantiateApp(ac, appsmap[index])
appsmap[index].status = "done"
- waitstr := fmt.Sprintf("wait on %v",appsmap[index].name)
- for j:=0; j<len(appsmap); j++ {
+ waitstr := fmt.Sprintf("wait on %v", appsmap[index].name)
+ for j := 0; j < len(appsmap); j++ {
if appsmap[j].depinfo == waitstr {
chans[j] <- j
}
}
wg.Done()
- }(i)
- }
+ }(i)
+ }
wg.Wait()
- for k:=0; k<len(appsmap); k++ {
+ for k := 0; k < len(appsmap); k++ {
if appsmap[k].rerr != nil {
return pkgerrors.Errorf("error during apps instantiation")
}
@@ -343,45 +352,45 @@ func (instca *CompositeAppContext) InstantiateComApp(cid interface{}) error {
ac := appcontext.AppContext{}
_, err := ac.LoadAppContext(cid)
- if err != nil {
- return err
- }
+ if err != nil {
+ return err
+ }
instca.cid = cid
appsorder, err := ac.GetAppInstruction("order")
- if err != nil {
- return err
- }
+ if err != nil {
+ return err
+ }
instca.appsorder = appsorder.(string)
appsdependency, err := ac.GetAppInstruction("dependency")
- if err != nil {
- return err
- }
+ if err != nil {
+ return err
+ }
instca.appsdependency = appsdependency.(string)
- instca.appsmap, err = getInstMap(instca.appsorder,instca.appsdependency, "app")
- if err != nil {
- return err
- }
+ instca.appsmap, err = getInstMap(instca.appsorder, instca.appsdependency, "app")
+ if err != nil {
+ return err
+ }
- for j:=0; j<len(instca.appsmap); j++ {
+ for j := 0; j < len(instca.appsmap); j++ {
clusternames, err := ac.GetClusterNames(instca.appsmap[j].name)
if err != nil {
- return err
+ return err
}
instca.appsmap[j].clusters = make([]clusterInfo, len(clusternames))
- for k:=0; k<len(clusternames); k++ {
+ for k := 0; k < len(clusternames); k++ {
instca.appsmap[j].clusters[k].name = clusternames[k]
resorder, err := ac.GetResourceInstruction(
- instca.appsmap[j].name, clusternames[k], "order")
+ instca.appsmap[j].name, clusternames[k], "order")
if err != nil {
- return err
+ return err
}
instca.appsmap[j].clusters[k].resorder = resorder.(string)
resdependency, err := ac.GetResourceInstruction(
- instca.appsmap[j].name, clusternames[k], "dependency")
+ instca.appsmap[j].name, clusternames[k], "dependency")
if err != nil {
- return err
+ return err
}
instca.appsmap[j].clusters[k].resdependency = resdependency.(string)
@@ -389,36 +398,36 @@ func (instca *CompositeAppContext) InstantiateComApp(cid interface{}) error {
instca.appsmap[j].clusters[k].resorder,
instca.appsmap[j].clusters[k].resdependency, "res")
if err != nil {
- return err
+ return err
}
}
}
err = instantiateApps(ac, instca.appsmap)
- if err != nil {
- return err
- }
+ if err != nil {
+ return err
+ }
return nil
}
// Delete all the apps
-func terminateApps(ac appcontext.AppContext, appsmap []instMap) error {
+func terminateApps(ac appcontext.AppContext, appsmap []instMap) error {
var wg sync.WaitGroup
var chans = make([]chan int, len(appsmap))
for l := range chans {
chans[l] = make(chan int)
}
- for i:=0; i<len(appsmap); i++ {
+ for i := 0; i < len(appsmap); i++ {
wg.Add(1)
- go func(index int) {
- if appsmap[index].depinfo == "go" {
+ go func(index int) {
+ if appsmap[index].depinfo == "go" {
appsmap[index].status = "start"
} else {
appsmap[index].status = "waiting"
- c := <- chans[index]
+ c := <-chans[index]
if c != index {
appsmap[index].status = "error"
- appsmap[index].rerr = pkgerrors.Errorf("channel does not match")
+ appsmap[index].rerr = pkgerrors.Errorf("channel does not match")
wg.Done()
return
}
@@ -426,17 +435,17 @@ func terminateApps(ac appcontext.AppContext, appsmap []instMap) error {
}
appsmap[index].rerr = terminateApp(ac, appsmap[index])
appsmap[index].status = "done"
- waitstr := fmt.Sprintf("wait on %v",appsmap[index].name)
- for j:=0; j<len(appsmap); j++ {
+ waitstr := fmt.Sprintf("wait on %v", appsmap[index].name)
+ for j := 0; j < len(appsmap); j++ {
if appsmap[j].depinfo == waitstr {
chans[j] <- j
}
}
wg.Done()
- }(i)
- }
+ }(i)
+ }
wg.Wait()
- for k:=0; k<len(appsmap); k++ {
+ for k := 0; k < len(appsmap); k++ {
if appsmap[k].rerr != nil {
return pkgerrors.Errorf("error during apps instantiation")
}
@@ -444,50 +453,51 @@ func terminateApps(ac appcontext.AppContext, appsmap []instMap) error {
return nil
}
+
// Delete all the resources for a given context
func (instca *CompositeAppContext) TerminateComApp(cid interface{}) error {
ac := appcontext.AppContext{}
_, err := ac.LoadAppContext(cid)
- if err != nil {
- return err
- }
+ if err != nil {
+ return err
+ }
instca.cid = cid
appsorder, err := ac.GetAppInstruction("order")
- if err != nil {
- return err
- }
+ if err != nil {
+ return err
+ }
instca.appsorder = appsorder.(string)
appsdependency, err := ac.GetAppInstruction("dependency")
- if err != nil {
- return err
- }
+ if err != nil {
+ return err
+ }
instca.appsdependency = appsdependency.(string)
- instca.appsmap, err = getInstMap(instca.appsorder,instca.appsdependency, "app")
- if err != nil {
- return err
- }
+ instca.appsmap, err = getInstMap(instca.appsorder, instca.appsdependency, "app")
+ if err != nil {
+ return err
+ }
- for j:=0; j<len(instca.appsmap); j++ {
+ for j := 0; j < len(instca.appsmap); j++ {
clusternames, err := ac.GetClusterNames(instca.appsmap[j].name)
if err != nil {
- return err
+ return err
}
instca.appsmap[j].clusters = make([]clusterInfo, len(clusternames))
- for k:=0; k<len(clusternames); k++ {
+ for k := 0; k < len(clusternames); k++ {
instca.appsmap[j].clusters[k].name = clusternames[k]
resorder, err := ac.GetResourceInstruction(
- instca.appsmap[j].name, clusternames[k], "order")
+ instca.appsmap[j].name, clusternames[k], "order")
if err != nil {
- return err
+ return err
}
instca.appsmap[j].clusters[k].resorder = resorder.(string)
resdependency, err := ac.GetResourceInstruction(
- instca.appsmap[j].name, clusternames[k], "dependency")
+ instca.appsmap[j].name, clusternames[k], "dependency")
if err != nil {
- return err
+ return err
}
instca.appsmap[j].clusters[k].resdependency = resdependency.(string)
@@ -495,14 +505,14 @@ func (instca *CompositeAppContext) TerminateComApp(cid interface{}) error {
instca.appsmap[j].clusters[k].resorder,
instca.appsmap[j].clusters[k].resdependency, "res")
if err != nil {
- return err
+ return err
}
}
}
err = terminateApps(ac, instca.appsmap)
- if err != nil {
- return err
- }
+ if err != nil {
+ return err
+ }
return nil
diff --git a/src/rsync/pkg/resource/resource.go b/src/rsync/pkg/resource/resource.go
index 8b45c341..2877e2a3 100644
--- a/src/rsync/pkg/resource/resource.go
+++ b/src/rsync/pkg/resource/resource.go
@@ -20,16 +20,15 @@ import (
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime/schema"
- utils "github.com/onap/multicloud-k8s/src/rsync/pkg/internal"
- "github.com/onap/multicloud-k8s/src/rsync/pkg/internal/config"
"github.com/onap/multicloud-k8s/src/rsync/pkg/connector"
+ utils "github.com/onap/multicloud-k8s/src/rsync/pkg/internal"
)
type Resource struct {
}
// Create deployment object in a specific Kubernetes cluster
-func (r Resource) Create(data string, namespace string, client connector.KubernetesConnector) (string, error) {
+func (r Resource) Create(data string, namespace string, label string, client connector.KubernetesConnector) (string, error) {
if namespace == "" {
namespace = "default"
}
@@ -57,13 +56,15 @@ func (r Resource) Create(data string, namespace string, client connector.Kuberne
if labels == nil {
labels = map[string]string{}
}
- labels[config.GetConfiguration().KubernetesLabelName] = client.GetInstanceID()
+ //labels[config.GetConfiguration().KubernetesLabelName] = client.GetInstanceID()
+ labels["emco/deployment-id"] = label
unstruct.SetLabels(labels)
// This checks if the resource we are creating has a podSpec in it
// Eg: Deployment, StatefulSet, Job etc..
// If a PodSpec is found, the label will be added to it too.
- connector.TagPodsIfPresent(unstruct, client.GetInstanceID())
+ //connector.TagPodsIfPresent(unstruct, client.GetInstanceID())
+ connector.TagPodsIfPresent(unstruct, label)
gvr := mapping.Resource
var createdObj *unstructured.Unstructured
@@ -86,44 +87,44 @@ func (r Resource) Create(data string, namespace string, client connector.Kuberne
// Delete an existing resource hosted in a specific Kubernetes cluster
func (r Resource) Delete(data string, resname string, namespace string, client connector.KubernetesConnector) error {
- if namespace == "" {
- namespace = "default"
- }
-
- //Decode the yaml file to create a runtime.Object
- unstruct := &unstructured.Unstructured{}
- //Ignore the returned obj as we expect the data in unstruct
- _, err := utils.DecodeYAMLData(data, unstruct)
- if err != nil {
- return pkgerrors.Wrap(err, "Decode deployment object error")
- }
-
- dynClient := client.GetDynamicClient()
- mapper := client.GetMapper()
-
- gvk := unstruct.GroupVersionKind()
- mapping, err := mapper.RESTMapping(schema.GroupKind{Group: gvk.Group, Kind: gvk.Kind}, gvk.Version)
- if err != nil {
- return pkgerrors.Wrap(err, "Mapping kind to resource error")
- }
-
- gvr := mapping.Resource
- deletePolicy := metav1.DeletePropagationForeground
- opts := &metav1.DeleteOptions{
- PropagationPolicy: &deletePolicy,
- }
-
- switch mapping.Scope.Name() {
- case meta.RESTScopeNameNamespace:
- err = dynClient.Resource(gvr).Namespace(namespace).Delete(resname, opts)
- case meta.RESTScopeNameRoot:
- err = dynClient.Resource(gvr).Delete(resname, opts)
- default:
- return pkgerrors.New("Got an unknown RESTSCopeName for mappin")
- }
-
- if err != nil {
- return pkgerrors.Wrap(err, "Delete object error")
- }
- return nil
+ if namespace == "" {
+ namespace = "default"
+ }
+
+ //Decode the yaml file to create a runtime.Object
+ unstruct := &unstructured.Unstructured{}
+ //Ignore the returned obj as we expect the data in unstruct
+ _, err := utils.DecodeYAMLData(data, unstruct)
+ if err != nil {
+ return pkgerrors.Wrap(err, "Decode deployment object error")
+ }
+
+ dynClient := client.GetDynamicClient()
+ mapper := client.GetMapper()
+
+ gvk := unstruct.GroupVersionKind()
+ mapping, err := mapper.RESTMapping(schema.GroupKind{Group: gvk.Group, Kind: gvk.Kind}, gvk.Version)
+ if err != nil {
+ return pkgerrors.Wrap(err, "Mapping kind to resource error")
+ }
+
+ gvr := mapping.Resource
+ deletePolicy := metav1.DeletePropagationForeground
+ opts := &metav1.DeleteOptions{
+ PropagationPolicy: &deletePolicy,
+ }
+
+ switch mapping.Scope.Name() {
+ case meta.RESTScopeNameNamespace:
+ err = dynClient.Resource(gvr).Namespace(namespace).Delete(resname, opts)
+ case meta.RESTScopeNameRoot:
+ err = dynClient.Resource(gvr).Delete(resname, opts)
+ default:
+ return pkgerrors.New("Got an unknown RESTSCopeName for mappin")
+ }
+
+ if err != nil {
+ return pkgerrors.Wrap(err, "Delete object error")
+ }
+ return nil
}
diff --git a/src/rsync/pkg/status/status.go b/src/rsync/pkg/status/status.go
index 28bffefd..351da027 100644
--- a/src/rsync/pkg/status/status.go
+++ b/src/rsync/pkg/status/status.go
@@ -17,16 +17,20 @@
package status
import (
+ "encoding/base64"
"encoding/json"
"fmt"
+ "strings"
"sync"
pkgerrors "github.com/pkg/errors"
"github.com/sirupsen/logrus"
+ "github.com/onap/multicloud-k8s/src/clm/pkg/cluster"
v1alpha1 "github.com/onap/multicloud-k8s/src/monitor/pkg/apis/k8splugin/v1alpha1"
clientset "github.com/onap/multicloud-k8s/src/monitor/pkg/generated/clientset/versioned"
informers "github.com/onap/multicloud-k8s/src/monitor/pkg/generated/informers/externalversions"
+ appcontext "github.com/onap/multicloud-k8s/src/orchestrator/pkg/appcontext"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/clientcmd"
)
@@ -42,20 +46,75 @@ const monitorLabel = "emco/deployment-id"
// HandleStatusUpdate for an application in a cluster
// TODO: Add code for specific handling
-func HandleStatusUpdate(provider, name string, id string, v *v1alpha1.ResourceBundleState) error {
- logrus.Info("label::", id)
+func HandleStatusUpdate(clusterId string, id string, v *v1alpha1.ResourceBundleState) {
//status := v.Status.ServiceStatuses
//podStatus := v.Status.PodStatuses
- // Store Pod Status in app context
- out, _ := json.Marshal(v.Status)
- logrus.Info("Status::", string(out))
- return nil
+
+ // Get the contextId from the label (id)
+ result := strings.SplitN(id, "-", 2)
+ if result[0] == "" {
+ logrus.Info(clusterId, "::label is missing an appcontext identifier::", id)
+ return
+ }
+
+ if len(result) != 2 {
+ logrus.Info(clusterId, "::invalid label format::", id)
+ return
+ }
+
+ // Get the app from the label (id)
+ if result[1] == "" {
+ logrus.Info(clusterId, "::label is missing an app identifier::", id)
+ return
+ }
+
+ // Look up the contextId
+ var ac appcontext.AppContext
+ _, err := ac.LoadAppContext(result[0])
+ if err != nil {
+ logrus.Info(clusterId, "::App context not found::", result[0], "::Error::", err)
+ return
+ }
+
+ // produce yaml representation of the status
+ vjson, err := json.Marshal(v.Status)
+ if err != nil {
+ logrus.Info(clusterId, "::Error marshalling status information::", err)
+ return
+ }
+
+ // Get the handle for the context/app/cluster status object
+ handle, err := ac.GetStatusHandle(result[1], clusterId)
+ if err != nil {
+ // Expected first time
+ logrus.Info(clusterId, "::Status context handle not found::", id, "::Error::", err)
+ }
+
+ // If status handle was not found, then create the status object in the appcontext
+ if handle == nil {
+ chandle, err := ac.GetClusterHandle(result[1], clusterId)
+ if err != nil {
+ logrus.Info(clusterId, "::Cluster context handle not found::", id, "::Error::", err)
+ } else {
+ ac.AddStatus(chandle, string(vjson))
+ }
+ } else {
+ ac.UpdateStatusValue(handle, string(vjson))
+ }
+
+ return
}
// StartClusterWatcher watches for CR
// configBytes - Kubectl file data
-func StartClusterWatcher(provider, name string, configBytes []byte) error {
- key := provider + "+" + name
+func StartClusterWatcher(clusterId string) error {
+
+ configBytes, err := getKubeConfig(clusterId)
+ if err != nil {
+ return err
+ }
+
+ //key := provider + "+" + name
// Get the lock
channelData.Lock()
defer channelData.Unlock()
@@ -63,10 +122,10 @@ func StartClusterWatcher(provider, name string, configBytes []byte) error {
if channelData.channels == nil {
channelData.channels = make(map[string]chan struct{})
}
- _, ok := channelData.channels[key]
+ _, ok := channelData.channels[clusterId]
if !ok {
// Create Channel
- channelData.channels[key] = make(chan struct{})
+ channelData.channels[clusterId] = make(chan struct{})
// Create config
config, err := clientcmd.RESTConfigFromKubeConfig(configBytes)
if err != nil {
@@ -80,16 +139,16 @@ func StartClusterWatcher(provider, name string, configBytes []byte) error {
// Create Informer
mInformerFactory := informers.NewSharedInformerFactory(k8sClient, 0)
mInformer := mInformerFactory.K8splugin().V1alpha1().ResourceBundleStates().Informer()
- go scheduleStatus(provider, name, channelData.channels[key], mInformer)
+ go scheduleStatus(clusterId, channelData.channels[clusterId], mInformer)
}
return nil
}
// StopClusterWatcher stop watching a cluster
-func StopClusterWatcher(provider, name string) {
- key := provider + "+" + name
+func StopClusterWatcher(clusterId string) {
+ //key := provider + "+" + name
if channelData.channels != nil {
- c, ok := channelData.channels[key]
+ c, ok := channelData.channels[clusterId]
if ok {
close(c)
}
@@ -108,7 +167,7 @@ func CloseAllClusterWatchers() {
}
// Per Cluster Go routine to watch CR
-func scheduleStatus(provider, name string, c <-chan struct{}, s cache.SharedIndexInformer) {
+func scheduleStatus(clusterId string, c <-chan struct{}, s cache.SharedIndexInformer) {
handlers := cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
v, ok := obj.(*v1alpha1.ResourceBundleState)
@@ -116,7 +175,7 @@ func scheduleStatus(provider, name string, c <-chan struct{}, s cache.SharedInde
labels := v.GetLabels()
l, ok := labels[monitorLabel]
if ok {
- HandleStatusUpdate(provider, name, l, v)
+ HandleStatusUpdate(clusterId, l, v)
}
}
},
@@ -126,7 +185,7 @@ func scheduleStatus(provider, name string, c <-chan struct{}, s cache.SharedInde
labels := v.GetLabels()
l, ok := labels[monitorLabel]
if ok {
- HandleStatusUpdate(provider, name, l, v)
+ HandleStatusUpdate(clusterId, l, v)
}
}
},
@@ -137,3 +196,27 @@ func scheduleStatus(provider, name string, c <-chan struct{}, s cache.SharedInde
s.AddEventHandler(handlers)
s.Run(c)
}
+
+// getKubeConfig uses the connectivity client to get the kubeconfig based on the name
+// of the clustername. This is written out to a file.
+// TODO - consolidate with other rsync methods to get kubeconfig files
+func getKubeConfig(clustername string) ([]byte, error) {
+
+ if !strings.Contains(clustername, "+") {
+ return nil, pkgerrors.New("Not a valid cluster name")
+ }
+ strs := strings.Split(clustername, "+")
+ if len(strs) != 2 {
+ return nil, pkgerrors.New("Not a valid cluster name")
+ }
+ kubeConfig, err := cluster.NewClusterClient().GetClusterContent(strs[0], strs[1])
+ if err != nil {
+ return nil, pkgerrors.New("Get kubeconfig failed")
+ }
+
+ dec, err := base64.StdEncoding.DecodeString(kubeConfig.Kubeconfig)
+ if err != nil {
+ return nil, err
+ }
+ return dec, nil
+}