diff options
Diffstat (limited to 'src/rsync/pkg')
-rw-r--r-- | src/rsync/pkg/connector/connector.go | 6 | ||||
-rw-r--r-- | src/rsync/pkg/context/context.go | 400 | ||||
-rw-r--r-- | src/rsync/pkg/resource/resource.go | 91 | ||||
-rw-r--r-- | src/rsync/pkg/status/status.go | 222 |
4 files changed, 475 insertions, 244 deletions
diff --git a/src/rsync/pkg/connector/connector.go b/src/rsync/pkg/connector/connector.go index 6e17f87a..fc8aa839 100644 --- a/src/rsync/pkg/connector/connector.go +++ b/src/rsync/pkg/connector/connector.go @@ -19,8 +19,6 @@ package connector import ( "log" - "github.com/onap/multicloud-k8s/src/rsync/pkg/internal/config" - corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/meta" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" @@ -52,7 +50,7 @@ type KubernetesConnector interface { // Reference is the interface that is implemented type Reference interface { //Create a kubernetes resource described by the yaml in yamlFilePath - Create(yamlFilePath string, namespace string, client KubernetesConnector) (string, error) + Create(yamlFilePath string, namespace string, label string, client KubernetesConnector) (string, error) //Delete a kubernetes resource described in the provided namespace Delete(yamlFilePath string, resname string, namespace string, client KubernetesConnector) error } @@ -86,7 +84,7 @@ func TagPodsIfPresent(unstruct *unstructured.Unstructured, tag string) { if labels == nil { labels = map[string]string{} } - labels[config.GetConfiguration().KubernetesLabelName] = tag + labels["emco/deployment-id"] = tag podTemplateSpec.SetLabels(labels) updatedTemplate, err := runtime.DefaultUnstructuredConverter.ToUnstructured(podTemplateSpec) diff --git a/src/rsync/pkg/context/context.go b/src/rsync/pkg/context/context.go index e5da1296..7e0fce3c 100644 --- a/src/rsync/pkg/context/context.go +++ b/src/rsync/pkg/context/context.go @@ -18,67 +18,68 @@ package context import ( "encoding/json" - "fmt" - "log" - "sync" - "strings" - "github.com/onap/multicloud-k8s/src/orchestrator/pkg/appcontext" - pkgerrors "github.com/pkg/errors" - res "github.com/onap/multicloud-k8s/src/rsync/pkg/resource" - con "github.com/onap/multicloud-k8s/src/rsync/pkg/connector" - "github.com/onap/multicloud-k8s/src/rsync/pkg/app" + "fmt" + "log" + "strings" + "sync" + + "github.com/onap/multicloud-k8s/src/orchestrator/pkg/appcontext" + "github.com/onap/multicloud-k8s/src/rsync/pkg/app" + con "github.com/onap/multicloud-k8s/src/rsync/pkg/connector" + res "github.com/onap/multicloud-k8s/src/rsync/pkg/resource" + status "github.com/onap/multicloud-k8s/src/rsync/pkg/status" + pkgerrors "github.com/pkg/errors" ) type CompositeAppContext struct { - cid interface{} + cid interface{} appsorder string appsdependency string - appsmap []instMap + appsmap []instMap } type clusterInfo struct { - name string + name string resorder string resdependency string - ressmap []instMap + ressmap []instMap } type instMap struct { - name string - depinfo string - status string - rerr error + name string + depinfo string + status string + rerr error clusters []clusterInfo } func getInstMap(order string, dependency string, level string) ([]instMap, error) { - if order == "" { - return nil, pkgerrors.Errorf("Not a valid order value") - } - if dependency == "" { - return nil, pkgerrors.Errorf("Not a valid dependency value") - } - - if !(level == "app" || level == "res") { - return nil, pkgerrors.Errorf("Not a valid level name given to create map") - } + if order == "" { + return nil, pkgerrors.Errorf("Not a valid order value") + } + if dependency == "" { + return nil, pkgerrors.Errorf("Not a valid dependency value") + } + if !(level == "app" || level == "res") { + return nil, pkgerrors.Errorf("Not a valid level name given to create map") + } - var aov map[string]interface{} - json.Unmarshal([]byte(order), &aov) + var aov map[string]interface{} + json.Unmarshal([]byte(order), &aov) - s := fmt.Sprintf("%vorder", level) - appso := aov[s].([]interface{}) - var instmap = make([]instMap, len(appso)) + s := fmt.Sprintf("%vorder", level) + appso := aov[s].([]interface{}) + var instmap = make([]instMap, len(appso)) - var adv map[string]interface{} - json.Unmarshal([]byte(dependency), &adv) - s = fmt.Sprintf("%vdependency", level) - appsd := adv[s].(map[string]interface{}) - for i, u := range appso { - instmap[i] = instMap{u.(string), appsd[u.(string)].(string), "none", nil, nil} - } + var adv map[string]interface{} + json.Unmarshal([]byte(dependency), &adv) + s = fmt.Sprintf("%vdependency", level) + appsd := adv[s].(map[string]interface{}) + for i, u := range appso { + instmap[i] = instMap{u.(string), appsd[u.(string)].(string), "none", nil, nil} + } - return instmap, nil + return instmap, nil } func deleteResource(clustername string, resname string, respath string) error { @@ -94,7 +95,7 @@ func deleteResource(clustername string, resname string, respath string) error { var gp res.Resource err = gp.Delete(respath, resname, "default", c) if err != nil { - log.Println("Delete resource failed: " + err.Error() + resname) + log.Println("Delete resource failed: " + err.Error() + resname) return err } log.Println("Resource succesfully deleted", resname) @@ -102,7 +103,7 @@ func deleteResource(clustername string, resname string, respath string) error { } -func createResource(clustername string, resname string, respath string) error { +func createResource(clustername string, resname string, respath string, label string) error { k8sClient := app.KubernetesClient{} err := k8sClient.Init(clustername, resname) if err != nil { @@ -113,9 +114,9 @@ func createResource(clustername string, resname string, respath string) error { var c con.KubernetesConnector c = &k8sClient var gp res.Resource - _, err = gp.Create(respath,"default", c) + _, err = gp.Create(respath, "default", label, c) if err != nil { - log.Println("Create failed: " + err.Error() + resname) + log.Println("Create failed: " + err.Error() + resname) return err } log.Println("Resource succesfully created", resname) @@ -152,7 +153,7 @@ func terminateResource(ac appcontext.AppContext, resmap instMap, appname string, } -func instantiateResource(ac appcontext.AppContext, resmap instMap, appname string, clustername string) error { +func instantiateResource(ac appcontext.AppContext, resmap instMap, appname string, clustername string, label string) error { rh, err := ac.GetResourceHandle(appname, clustername, resmap.name) if err != nil { return err @@ -168,7 +169,7 @@ func instantiateResource(ac appcontext.AppContext, resmap instMap, appname strin if result[0] == "" { return pkgerrors.Errorf("Resource name is nil") } - err = createResource(clustername, result[0], resval.(string)) + err = createResource(clustername, result[0], resval.(string), label) if err != nil { return err } @@ -180,97 +181,102 @@ func instantiateResource(ac appcontext.AppContext, resmap instMap, appname strin } -func terminateResources(ac appcontext.AppContext, ressmap []instMap, appname string, clustername string) error { - var wg sync.WaitGroup - var chans = make([]chan int, len(ressmap)) - for l := range chans { - chans[l] = make(chan int) - } - for i:=0; i<len(ressmap); i++ { - wg.Add(1) - go func(index int) { - if ressmap[index].depinfo == "go" { - ressmap[index].status = "start" - } else { - ressmap[index].status = "waiting" - c := <- chans[index] - if c != index { +func terminateResources(ac appcontext.AppContext, ressmap []instMap, appname string, clustername string) error { + var wg sync.WaitGroup + var chans = make([]chan int, len(ressmap)) + for l := range chans { + chans[l] = make(chan int) + } + for i := 0; i < len(ressmap); i++ { + wg.Add(1) + go func(index int) { + if ressmap[index].depinfo == "go" { + ressmap[index].status = "start" + } else { + ressmap[index].status = "waiting" + c := <-chans[index] + if c != index { ressmap[index].status = "error" - ressmap[index].rerr = pkgerrors.Errorf("channel does not match") + ressmap[index].rerr = pkgerrors.Errorf("channel does not match") wg.Done() return - } - ressmap[index].status = "start" - } - ressmap[index].rerr = terminateResource(ac, ressmap[index], appname, clustername) - ressmap[index].status = "done" - waitstr := fmt.Sprintf("wait on %v",ressmap[index].name) - for j:=0; j<len(ressmap); j++ { - if ressmap[j].depinfo == waitstr { - chans[j] <- j - } - } - wg.Done() - }(i) - } - wg.Wait() - for k:=0; k<len(ressmap); k++ { - if ressmap[k].rerr != nil { - return pkgerrors.Errorf("error during resources termination") - } - } - return nil + } + ressmap[index].status = "start" + } + ressmap[index].rerr = terminateResource(ac, ressmap[index], appname, clustername) + ressmap[index].status = "done" + waitstr := fmt.Sprintf("wait on %v", ressmap[index].name) + for j := 0; j < len(ressmap); j++ { + if ressmap[j].depinfo == waitstr { + chans[j] <- j + } + } + wg.Done() + }(i) + } + wg.Wait() + for k := 0; k < len(ressmap); k++ { + if ressmap[k].rerr != nil { + return pkgerrors.Errorf("error during resources termination") + } + } + return nil } -func instantiateResources(ac appcontext.AppContext, ressmap []instMap, appname string, clustername string) error { - var wg sync.WaitGroup - var chans = make([]chan int, len(ressmap)) - for l := range chans { - chans[l] = make(chan int) - } - for i:=0; i<len(ressmap); i++ { - wg.Add(1) - go func(index int) { - if ressmap[index].depinfo == "go" { - ressmap[index].status = "start" - } else { - ressmap[index].status = "waiting" - c := <- chans[index] - if c != index { +func instantiateResources(ac appcontext.AppContext, ressmap []instMap, appname string, clustername string) error { + var wg sync.WaitGroup + var chans = make([]chan int, len(ressmap)) + cid, _ := ac.GetCompositeAppHandle() + + results := strings.Split(cid.(string), "/") + label := results[2] + "-" + appname + + for l := range chans { + chans[l] = make(chan int) + } + for i := 0; i < len(ressmap); i++ { + wg.Add(1) + go func(index int) { + if ressmap[index].depinfo == "go" { + ressmap[index].status = "start" + } else { + ressmap[index].status = "waiting" + c := <-chans[index] + if c != index { ressmap[index].status = "error" - ressmap[index].rerr = pkgerrors.Errorf("channel does not match") + ressmap[index].rerr = pkgerrors.Errorf("channel does not match") wg.Done() return - } - ressmap[index].status = "start" - } - ressmap[index].rerr = instantiateResource(ac, ressmap[index], appname, clustername) - ressmap[index].status = "done" - waitstr := fmt.Sprintf("wait on %v",ressmap[index].name) - for j:=0; j<len(ressmap); j++ { - if ressmap[j].depinfo == waitstr { - chans[j] <- j - } - } - wg.Done() - }(i) - } - wg.Wait() - for k:=0; k<len(ressmap); k++ { - if ressmap[k].rerr != nil { - return pkgerrors.Errorf("error during resources instantiation") - } - } - return nil + } + ressmap[index].status = "start" + } + ressmap[index].rerr = instantiateResource(ac, ressmap[index], appname, clustername, label) + ressmap[index].status = "done" + waitstr := fmt.Sprintf("wait on %v", ressmap[index].name) + for j := 0; j < len(ressmap); j++ { + if ressmap[j].depinfo == waitstr { + chans[j] <- j + } + } + wg.Done() + }(i) + } + wg.Wait() + for k := 0; k < len(ressmap); k++ { + if ressmap[k].rerr != nil { + return pkgerrors.Errorf("error during resources instantiation") + } + } + return nil } func terminateApp(ac appcontext.AppContext, appmap instMap) error { - for i:=0; i<len(appmap.clusters); i++ { + for i := 0; i < len(appmap.clusters); i++ { err := terminateResources(ac, appmap.clusters[i].ressmap, appmap.name, - appmap.clusters[i].name) + appmap.clusters[i].name) if err != nil { return err } @@ -281,38 +287,41 @@ func terminateApp(ac appcontext.AppContext, appmap instMap) error { } - func instantiateApp(ac appcontext.AppContext, appmap instMap) error { - for i:=0; i<len(appmap.clusters); i++ { + for i := 0; i < len(appmap.clusters); i++ { err := instantiateResources(ac, appmap.clusters[i].ressmap, appmap.name, - appmap.clusters[i].name) + appmap.clusters[i].name) if err != nil { return err } + err = status.StartClusterWatcher(appmap.clusters[i].name) + if err != nil { + log.Printf("Error starting Cluster Watcher %v: %v\n", appmap.clusters[i], err) + } } log.Println("Instantiation of app done: " + appmap.name) return nil } -func instantiateApps(ac appcontext.AppContext, appsmap []instMap) error { +func instantiateApps(ac appcontext.AppContext, appsmap []instMap) error { var wg sync.WaitGroup var chans = make([]chan int, len(appsmap)) for l := range chans { chans[l] = make(chan int) } - for i:=0; i<len(appsmap); i++ { + for i := 0; i < len(appsmap); i++ { wg.Add(1) - go func(index int) { - if appsmap[index].depinfo == "go" { + go func(index int) { + if appsmap[index].depinfo == "go" { appsmap[index].status = "start" } else { appsmap[index].status = "waiting" - c := <- chans[index] + c := <-chans[index] if c != index { appsmap[index].status = "error" - appsmap[index].rerr = pkgerrors.Errorf("channel does not match") + appsmap[index].rerr = pkgerrors.Errorf("channel does not match") wg.Done() return } @@ -320,17 +329,17 @@ func instantiateApps(ac appcontext.AppContext, appsmap []instMap) error { } appsmap[index].rerr = instantiateApp(ac, appsmap[index]) appsmap[index].status = "done" - waitstr := fmt.Sprintf("wait on %v",appsmap[index].name) - for j:=0; j<len(appsmap); j++ { + waitstr := fmt.Sprintf("wait on %v", appsmap[index].name) + for j := 0; j < len(appsmap); j++ { if appsmap[j].depinfo == waitstr { chans[j] <- j } } wg.Done() - }(i) - } + }(i) + } wg.Wait() - for k:=0; k<len(appsmap); k++ { + for k := 0; k < len(appsmap); k++ { if appsmap[k].rerr != nil { return pkgerrors.Errorf("error during apps instantiation") } @@ -343,45 +352,45 @@ func (instca *CompositeAppContext) InstantiateComApp(cid interface{}) error { ac := appcontext.AppContext{} _, err := ac.LoadAppContext(cid) - if err != nil { - return err - } + if err != nil { + return err + } instca.cid = cid appsorder, err := ac.GetAppInstruction("order") - if err != nil { - return err - } + if err != nil { + return err + } instca.appsorder = appsorder.(string) appsdependency, err := ac.GetAppInstruction("dependency") - if err != nil { - return err - } + if err != nil { + return err + } instca.appsdependency = appsdependency.(string) - instca.appsmap, err = getInstMap(instca.appsorder,instca.appsdependency, "app") - if err != nil { - return err - } + instca.appsmap, err = getInstMap(instca.appsorder, instca.appsdependency, "app") + if err != nil { + return err + } - for j:=0; j<len(instca.appsmap); j++ { + for j := 0; j < len(instca.appsmap); j++ { clusternames, err := ac.GetClusterNames(instca.appsmap[j].name) if err != nil { - return err + return err } instca.appsmap[j].clusters = make([]clusterInfo, len(clusternames)) - for k:=0; k<len(clusternames); k++ { + for k := 0; k < len(clusternames); k++ { instca.appsmap[j].clusters[k].name = clusternames[k] resorder, err := ac.GetResourceInstruction( - instca.appsmap[j].name, clusternames[k], "order") + instca.appsmap[j].name, clusternames[k], "order") if err != nil { - return err + return err } instca.appsmap[j].clusters[k].resorder = resorder.(string) resdependency, err := ac.GetResourceInstruction( - instca.appsmap[j].name, clusternames[k], "dependency") + instca.appsmap[j].name, clusternames[k], "dependency") if err != nil { - return err + return err } instca.appsmap[j].clusters[k].resdependency = resdependency.(string) @@ -389,36 +398,36 @@ func (instca *CompositeAppContext) InstantiateComApp(cid interface{}) error { instca.appsmap[j].clusters[k].resorder, instca.appsmap[j].clusters[k].resdependency, "res") if err != nil { - return err + return err } } } err = instantiateApps(ac, instca.appsmap) - if err != nil { - return err - } + if err != nil { + return err + } return nil } // Delete all the apps -func terminateApps(ac appcontext.AppContext, appsmap []instMap) error { +func terminateApps(ac appcontext.AppContext, appsmap []instMap) error { var wg sync.WaitGroup var chans = make([]chan int, len(appsmap)) for l := range chans { chans[l] = make(chan int) } - for i:=0; i<len(appsmap); i++ { + for i := 0; i < len(appsmap); i++ { wg.Add(1) - go func(index int) { - if appsmap[index].depinfo == "go" { + go func(index int) { + if appsmap[index].depinfo == "go" { appsmap[index].status = "start" } else { appsmap[index].status = "waiting" - c := <- chans[index] + c := <-chans[index] if c != index { appsmap[index].status = "error" - appsmap[index].rerr = pkgerrors.Errorf("channel does not match") + appsmap[index].rerr = pkgerrors.Errorf("channel does not match") wg.Done() return } @@ -426,17 +435,17 @@ func terminateApps(ac appcontext.AppContext, appsmap []instMap) error { } appsmap[index].rerr = terminateApp(ac, appsmap[index]) appsmap[index].status = "done" - waitstr := fmt.Sprintf("wait on %v",appsmap[index].name) - for j:=0; j<len(appsmap); j++ { + waitstr := fmt.Sprintf("wait on %v", appsmap[index].name) + for j := 0; j < len(appsmap); j++ { if appsmap[j].depinfo == waitstr { chans[j] <- j } } wg.Done() - }(i) - } + }(i) + } wg.Wait() - for k:=0; k<len(appsmap); k++ { + for k := 0; k < len(appsmap); k++ { if appsmap[k].rerr != nil { return pkgerrors.Errorf("error during apps instantiation") } @@ -444,50 +453,51 @@ func terminateApps(ac appcontext.AppContext, appsmap []instMap) error { return nil } + // Delete all the resources for a given context func (instca *CompositeAppContext) TerminateComApp(cid interface{}) error { ac := appcontext.AppContext{} _, err := ac.LoadAppContext(cid) - if err != nil { - return err - } + if err != nil { + return err + } instca.cid = cid appsorder, err := ac.GetAppInstruction("order") - if err != nil { - return err - } + if err != nil { + return err + } instca.appsorder = appsorder.(string) appsdependency, err := ac.GetAppInstruction("dependency") - if err != nil { - return err - } + if err != nil { + return err + } instca.appsdependency = appsdependency.(string) - instca.appsmap, err = getInstMap(instca.appsorder,instca.appsdependency, "app") - if err != nil { - return err - } + instca.appsmap, err = getInstMap(instca.appsorder, instca.appsdependency, "app") + if err != nil { + return err + } - for j:=0; j<len(instca.appsmap); j++ { + for j := 0; j < len(instca.appsmap); j++ { clusternames, err := ac.GetClusterNames(instca.appsmap[j].name) if err != nil { - return err + return err } instca.appsmap[j].clusters = make([]clusterInfo, len(clusternames)) - for k:=0; k<len(clusternames); k++ { + for k := 0; k < len(clusternames); k++ { instca.appsmap[j].clusters[k].name = clusternames[k] resorder, err := ac.GetResourceInstruction( - instca.appsmap[j].name, clusternames[k], "order") + instca.appsmap[j].name, clusternames[k], "order") if err != nil { - return err + return err } instca.appsmap[j].clusters[k].resorder = resorder.(string) resdependency, err := ac.GetResourceInstruction( - instca.appsmap[j].name, clusternames[k], "dependency") + instca.appsmap[j].name, clusternames[k], "dependency") if err != nil { - return err + return err } instca.appsmap[j].clusters[k].resdependency = resdependency.(string) @@ -495,14 +505,14 @@ func (instca *CompositeAppContext) TerminateComApp(cid interface{}) error { instca.appsmap[j].clusters[k].resorder, instca.appsmap[j].clusters[k].resdependency, "res") if err != nil { - return err + return err } } } err = terminateApps(ac, instca.appsmap) - if err != nil { - return err - } + if err != nil { + return err + } return nil diff --git a/src/rsync/pkg/resource/resource.go b/src/rsync/pkg/resource/resource.go index 8b45c341..2877e2a3 100644 --- a/src/rsync/pkg/resource/resource.go +++ b/src/rsync/pkg/resource/resource.go @@ -20,16 +20,15 @@ import ( "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/runtime/schema" - utils "github.com/onap/multicloud-k8s/src/rsync/pkg/internal" - "github.com/onap/multicloud-k8s/src/rsync/pkg/internal/config" "github.com/onap/multicloud-k8s/src/rsync/pkg/connector" + utils "github.com/onap/multicloud-k8s/src/rsync/pkg/internal" ) type Resource struct { } // Create deployment object in a specific Kubernetes cluster -func (r Resource) Create(data string, namespace string, client connector.KubernetesConnector) (string, error) { +func (r Resource) Create(data string, namespace string, label string, client connector.KubernetesConnector) (string, error) { if namespace == "" { namespace = "default" } @@ -57,13 +56,15 @@ func (r Resource) Create(data string, namespace string, client connector.Kuberne if labels == nil { labels = map[string]string{} } - labels[config.GetConfiguration().KubernetesLabelName] = client.GetInstanceID() + //labels[config.GetConfiguration().KubernetesLabelName] = client.GetInstanceID() + labels["emco/deployment-id"] = label unstruct.SetLabels(labels) // This checks if the resource we are creating has a podSpec in it // Eg: Deployment, StatefulSet, Job etc.. // If a PodSpec is found, the label will be added to it too. - connector.TagPodsIfPresent(unstruct, client.GetInstanceID()) + //connector.TagPodsIfPresent(unstruct, client.GetInstanceID()) + connector.TagPodsIfPresent(unstruct, label) gvr := mapping.Resource var createdObj *unstructured.Unstructured @@ -86,44 +87,44 @@ func (r Resource) Create(data string, namespace string, client connector.Kuberne // Delete an existing resource hosted in a specific Kubernetes cluster func (r Resource) Delete(data string, resname string, namespace string, client connector.KubernetesConnector) error { - if namespace == "" { - namespace = "default" - } - - //Decode the yaml file to create a runtime.Object - unstruct := &unstructured.Unstructured{} - //Ignore the returned obj as we expect the data in unstruct - _, err := utils.DecodeYAMLData(data, unstruct) - if err != nil { - return pkgerrors.Wrap(err, "Decode deployment object error") - } - - dynClient := client.GetDynamicClient() - mapper := client.GetMapper() - - gvk := unstruct.GroupVersionKind() - mapping, err := mapper.RESTMapping(schema.GroupKind{Group: gvk.Group, Kind: gvk.Kind}, gvk.Version) - if err != nil { - return pkgerrors.Wrap(err, "Mapping kind to resource error") - } - - gvr := mapping.Resource - deletePolicy := metav1.DeletePropagationForeground - opts := &metav1.DeleteOptions{ - PropagationPolicy: &deletePolicy, - } - - switch mapping.Scope.Name() { - case meta.RESTScopeNameNamespace: - err = dynClient.Resource(gvr).Namespace(namespace).Delete(resname, opts) - case meta.RESTScopeNameRoot: - err = dynClient.Resource(gvr).Delete(resname, opts) - default: - return pkgerrors.New("Got an unknown RESTSCopeName for mappin") - } - - if err != nil { - return pkgerrors.Wrap(err, "Delete object error") - } - return nil + if namespace == "" { + namespace = "default" + } + + //Decode the yaml file to create a runtime.Object + unstruct := &unstructured.Unstructured{} + //Ignore the returned obj as we expect the data in unstruct + _, err := utils.DecodeYAMLData(data, unstruct) + if err != nil { + return pkgerrors.Wrap(err, "Decode deployment object error") + } + + dynClient := client.GetDynamicClient() + mapper := client.GetMapper() + + gvk := unstruct.GroupVersionKind() + mapping, err := mapper.RESTMapping(schema.GroupKind{Group: gvk.Group, Kind: gvk.Kind}, gvk.Version) + if err != nil { + return pkgerrors.Wrap(err, "Mapping kind to resource error") + } + + gvr := mapping.Resource + deletePolicy := metav1.DeletePropagationForeground + opts := &metav1.DeleteOptions{ + PropagationPolicy: &deletePolicy, + } + + switch mapping.Scope.Name() { + case meta.RESTScopeNameNamespace: + err = dynClient.Resource(gvr).Namespace(namespace).Delete(resname, opts) + case meta.RESTScopeNameRoot: + err = dynClient.Resource(gvr).Delete(resname, opts) + default: + return pkgerrors.New("Got an unknown RESTSCopeName for mappin") + } + + if err != nil { + return pkgerrors.Wrap(err, "Delete object error") + } + return nil } diff --git a/src/rsync/pkg/status/status.go b/src/rsync/pkg/status/status.go new file mode 100644 index 00000000..351da027 --- /dev/null +++ b/src/rsync/pkg/status/status.go @@ -0,0 +1,222 @@ +/* + * Copyright 2020 Intel Corporation, Inc + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package status + +import ( + "encoding/base64" + "encoding/json" + "fmt" + "strings" + "sync" + + pkgerrors "github.com/pkg/errors" + "github.com/sirupsen/logrus" + + "github.com/onap/multicloud-k8s/src/clm/pkg/cluster" + v1alpha1 "github.com/onap/multicloud-k8s/src/monitor/pkg/apis/k8splugin/v1alpha1" + clientset "github.com/onap/multicloud-k8s/src/monitor/pkg/generated/clientset/versioned" + informers "github.com/onap/multicloud-k8s/src/monitor/pkg/generated/informers/externalversions" + appcontext "github.com/onap/multicloud-k8s/src/orchestrator/pkg/appcontext" + "k8s.io/client-go/tools/cache" + "k8s.io/client-go/tools/clientcmd" +) + +type channelManager struct { + channels map[string]chan struct{} + sync.Mutex +} + +var channelData channelManager + +const monitorLabel = "emco/deployment-id" + +// HandleStatusUpdate for an application in a cluster +// TODO: Add code for specific handling +func HandleStatusUpdate(clusterId string, id string, v *v1alpha1.ResourceBundleState) { + //status := v.Status.ServiceStatuses + //podStatus := v.Status.PodStatuses + + // Get the contextId from the label (id) + result := strings.SplitN(id, "-", 2) + if result[0] == "" { + logrus.Info(clusterId, "::label is missing an appcontext identifier::", id) + return + } + + if len(result) != 2 { + logrus.Info(clusterId, "::invalid label format::", id) + return + } + + // Get the app from the label (id) + if result[1] == "" { + logrus.Info(clusterId, "::label is missing an app identifier::", id) + return + } + + // Look up the contextId + var ac appcontext.AppContext + _, err := ac.LoadAppContext(result[0]) + if err != nil { + logrus.Info(clusterId, "::App context not found::", result[0], "::Error::", err) + return + } + + // produce yaml representation of the status + vjson, err := json.Marshal(v.Status) + if err != nil { + logrus.Info(clusterId, "::Error marshalling status information::", err) + return + } + + // Get the handle for the context/app/cluster status object + handle, err := ac.GetStatusHandle(result[1], clusterId) + if err != nil { + // Expected first time + logrus.Info(clusterId, "::Status context handle not found::", id, "::Error::", err) + } + + // If status handle was not found, then create the status object in the appcontext + if handle == nil { + chandle, err := ac.GetClusterHandle(result[1], clusterId) + if err != nil { + logrus.Info(clusterId, "::Cluster context handle not found::", id, "::Error::", err) + } else { + ac.AddStatus(chandle, string(vjson)) + } + } else { + ac.UpdateStatusValue(handle, string(vjson)) + } + + return +} + +// StartClusterWatcher watches for CR +// configBytes - Kubectl file data +func StartClusterWatcher(clusterId string) error { + + configBytes, err := getKubeConfig(clusterId) + if err != nil { + return err + } + + //key := provider + "+" + name + // Get the lock + channelData.Lock() + defer channelData.Unlock() + // For first time + if channelData.channels == nil { + channelData.channels = make(map[string]chan struct{}) + } + _, ok := channelData.channels[clusterId] + if !ok { + // Create Channel + channelData.channels[clusterId] = make(chan struct{}) + // Create config + config, err := clientcmd.RESTConfigFromKubeConfig(configBytes) + if err != nil { + logrus.Info(fmt.Sprintf("RESTConfigFromKubeConfig error: %s", err.Error())) + return pkgerrors.Wrap(err, "RESTConfigFromKubeConfig error") + } + k8sClient, err := clientset.NewForConfig(config) + if err != nil { + return pkgerrors.Wrap(err, "Clientset NewForConfig error") + } + // Create Informer + mInformerFactory := informers.NewSharedInformerFactory(k8sClient, 0) + mInformer := mInformerFactory.K8splugin().V1alpha1().ResourceBundleStates().Informer() + go scheduleStatus(clusterId, channelData.channels[clusterId], mInformer) + } + return nil +} + +// StopClusterWatcher stop watching a cluster +func StopClusterWatcher(clusterId string) { + //key := provider + "+" + name + if channelData.channels != nil { + c, ok := channelData.channels[clusterId] + if ok { + close(c) + } + } +} + +// CloseAllClusterWatchers close all channels +func CloseAllClusterWatchers() { + if channelData.channels == nil { + return + } + // Close all Channels to stop all watchers + for _, e := range channelData.channels { + close(e) + } +} + +// Per Cluster Go routine to watch CR +func scheduleStatus(clusterId string, c <-chan struct{}, s cache.SharedIndexInformer) { + handlers := cache.ResourceEventHandlerFuncs{ + AddFunc: func(obj interface{}) { + v, ok := obj.(*v1alpha1.ResourceBundleState) + if ok { + labels := v.GetLabels() + l, ok := labels[monitorLabel] + if ok { + HandleStatusUpdate(clusterId, l, v) + } + } + }, + UpdateFunc: func(oldObj, obj interface{}) { + v, ok := obj.(*v1alpha1.ResourceBundleState) + if ok { + labels := v.GetLabels() + l, ok := labels[monitorLabel] + if ok { + HandleStatusUpdate(clusterId, l, v) + } + } + }, + DeleteFunc: func(obj interface{}) { + // Ignore it + }, + } + s.AddEventHandler(handlers) + s.Run(c) +} + +// getKubeConfig uses the connectivity client to get the kubeconfig based on the name +// of the clustername. This is written out to a file. +// TODO - consolidate with other rsync methods to get kubeconfig files +func getKubeConfig(clustername string) ([]byte, error) { + + if !strings.Contains(clustername, "+") { + return nil, pkgerrors.New("Not a valid cluster name") + } + strs := strings.Split(clustername, "+") + if len(strs) != 2 { + return nil, pkgerrors.New("Not a valid cluster name") + } + kubeConfig, err := cluster.NewClusterClient().GetClusterContent(strs[0], strs[1]) + if err != nil { + return nil, pkgerrors.New("Get kubeconfig failed") + } + + dec, err := base64.StdEncoding.DecodeString(kubeConfig.Kubeconfig) + if err != nil { + return nil, err + } + return dec, nil +} |