diff options
author | Eric Multanen <eric.w.multanen@intel.com> | 2020-07-01 23:30:49 -0700 |
---|---|---|
committer | Eric Multanen <eric.w.multanen@intel.com> | 2020-07-08 13:36:34 -0700 |
commit | e06b947b03c3fcce2c954feb68890a519c7740c3 (patch) | |
tree | 5617b570ea85bf07dd76c6410975059acc23cc70 /src/rsync/pkg | |
parent | a43096cbdca3fdabeda3d404bedadd7a7272a3c2 (diff) |
Adds composite app status update and query
This patch provides the basic framework for supporting
monitoring of composite application resources in clusters.
1. Updates to the monitor files for use with v2.
2. Invokes the Watcher process per cluster/app when the
app is instantiated.
3. Adds a ResourceBundleState CR resource to the cluster/app
so that monitor will be able to update status to it.
4. Watcher updates appropriate appcontext status object
when updates are made in clusters by monitor
5. Update appcontext library to define a status handle
and object at the app/cluster level
6. Labels resources with an appropriate tracking label
to coordinate with the ResourceBundleState CR
Issue-ID: MULTICLOUD-1042
Signed-off-by: Eric Multanen <eric.w.multanen@intel.com>
Change-Id: If007c1fd86ca7a65bb941d1776cfd2d3afed766b
Diffstat (limited to 'src/rsync/pkg')
-rw-r--r-- | src/rsync/pkg/connector/connector.go | 6 | ||||
-rw-r--r-- | src/rsync/pkg/context/context.go | 400 | ||||
-rw-r--r-- | src/rsync/pkg/resource/resource.go | 91 | ||||
-rw-r--r-- | src/rsync/pkg/status/status.go | 117 |
4 files changed, 353 insertions, 261 deletions
diff --git a/src/rsync/pkg/connector/connector.go b/src/rsync/pkg/connector/connector.go index 6e17f87a..fc8aa839 100644 --- a/src/rsync/pkg/connector/connector.go +++ b/src/rsync/pkg/connector/connector.go @@ -19,8 +19,6 @@ package connector import ( "log" - "github.com/onap/multicloud-k8s/src/rsync/pkg/internal/config" - corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/meta" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" @@ -52,7 +50,7 @@ type KubernetesConnector interface { // Reference is the interface that is implemented type Reference interface { //Create a kubernetes resource described by the yaml in yamlFilePath - Create(yamlFilePath string, namespace string, client KubernetesConnector) (string, error) + Create(yamlFilePath string, namespace string, label string, client KubernetesConnector) (string, error) //Delete a kubernetes resource described in the provided namespace Delete(yamlFilePath string, resname string, namespace string, client KubernetesConnector) error } @@ -86,7 +84,7 @@ func TagPodsIfPresent(unstruct *unstructured.Unstructured, tag string) { if labels == nil { labels = map[string]string{} } - labels[config.GetConfiguration().KubernetesLabelName] = tag + labels["emco/deployment-id"] = tag podTemplateSpec.SetLabels(labels) updatedTemplate, err := runtime.DefaultUnstructuredConverter.ToUnstructured(podTemplateSpec) diff --git a/src/rsync/pkg/context/context.go b/src/rsync/pkg/context/context.go index e5da1296..7e0fce3c 100644 --- a/src/rsync/pkg/context/context.go +++ b/src/rsync/pkg/context/context.go @@ -18,67 +18,68 @@ package context import ( "encoding/json" - "fmt" - "log" - "sync" - "strings" - "github.com/onap/multicloud-k8s/src/orchestrator/pkg/appcontext" - pkgerrors "github.com/pkg/errors" - res "github.com/onap/multicloud-k8s/src/rsync/pkg/resource" - con "github.com/onap/multicloud-k8s/src/rsync/pkg/connector" - "github.com/onap/multicloud-k8s/src/rsync/pkg/app" + "fmt" + "log" + "strings" + "sync" + + "github.com/onap/multicloud-k8s/src/orchestrator/pkg/appcontext" + "github.com/onap/multicloud-k8s/src/rsync/pkg/app" + con "github.com/onap/multicloud-k8s/src/rsync/pkg/connector" + res "github.com/onap/multicloud-k8s/src/rsync/pkg/resource" + status "github.com/onap/multicloud-k8s/src/rsync/pkg/status" + pkgerrors "github.com/pkg/errors" ) type CompositeAppContext struct { - cid interface{} + cid interface{} appsorder string appsdependency string - appsmap []instMap + appsmap []instMap } type clusterInfo struct { - name string + name string resorder string resdependency string - ressmap []instMap + ressmap []instMap } type instMap struct { - name string - depinfo string - status string - rerr error + name string + depinfo string + status string + rerr error clusters []clusterInfo } func getInstMap(order string, dependency string, level string) ([]instMap, error) { - if order == "" { - return nil, pkgerrors.Errorf("Not a valid order value") - } - if dependency == "" { - return nil, pkgerrors.Errorf("Not a valid dependency value") - } - - if !(level == "app" || level == "res") { - return nil, pkgerrors.Errorf("Not a valid level name given to create map") - } + if order == "" { + return nil, pkgerrors.Errorf("Not a valid order value") + } + if dependency == "" { + return nil, pkgerrors.Errorf("Not a valid dependency value") + } + if !(level == "app" || level == "res") { + return nil, pkgerrors.Errorf("Not a valid level name given to create map") + } - var aov map[string]interface{} - json.Unmarshal([]byte(order), &aov) + var aov map[string]interface{} + json.Unmarshal([]byte(order), &aov) - s := fmt.Sprintf("%vorder", level) - appso := aov[s].([]interface{}) - var instmap = make([]instMap, len(appso)) + s := fmt.Sprintf("%vorder", level) + appso := aov[s].([]interface{}) + var instmap = make([]instMap, len(appso)) - var adv map[string]interface{} - json.Unmarshal([]byte(dependency), &adv) - s = fmt.Sprintf("%vdependency", level) - appsd := adv[s].(map[string]interface{}) - for i, u := range appso { - instmap[i] = instMap{u.(string), appsd[u.(string)].(string), "none", nil, nil} - } + var adv map[string]interface{} + json.Unmarshal([]byte(dependency), &adv) + s = fmt.Sprintf("%vdependency", level) + appsd := adv[s].(map[string]interface{}) + for i, u := range appso { + instmap[i] = instMap{u.(string), appsd[u.(string)].(string), "none", nil, nil} + } - return instmap, nil + return instmap, nil } func deleteResource(clustername string, resname string, respath string) error { @@ -94,7 +95,7 @@ func deleteResource(clustername string, resname string, respath string) error { var gp res.Resource err = gp.Delete(respath, resname, "default", c) if err != nil { - log.Println("Delete resource failed: " + err.Error() + resname) + log.Println("Delete resource failed: " + err.Error() + resname) return err } log.Println("Resource succesfully deleted", resname) @@ -102,7 +103,7 @@ func deleteResource(clustername string, resname string, respath string) error { } -func createResource(clustername string, resname string, respath string) error { +func createResource(clustername string, resname string, respath string, label string) error { k8sClient := app.KubernetesClient{} err := k8sClient.Init(clustername, resname) if err != nil { @@ -113,9 +114,9 @@ func createResource(clustername string, resname string, respath string) error { var c con.KubernetesConnector c = &k8sClient var gp res.Resource - _, err = gp.Create(respath,"default", c) + _, err = gp.Create(respath, "default", label, c) if err != nil { - log.Println("Create failed: " + err.Error() + resname) + log.Println("Create failed: " + err.Error() + resname) return err } log.Println("Resource succesfully created", resname) @@ -152,7 +153,7 @@ func terminateResource(ac appcontext.AppContext, resmap instMap, appname string, } -func instantiateResource(ac appcontext.AppContext, resmap instMap, appname string, clustername string) error { +func instantiateResource(ac appcontext.AppContext, resmap instMap, appname string, clustername string, label string) error { rh, err := ac.GetResourceHandle(appname, clustername, resmap.name) if err != nil { return err @@ -168,7 +169,7 @@ func instantiateResource(ac appcontext.AppContext, resmap instMap, appname strin if result[0] == "" { return pkgerrors.Errorf("Resource name is nil") } - err = createResource(clustername, result[0], resval.(string)) + err = createResource(clustername, result[0], resval.(string), label) if err != nil { return err } @@ -180,97 +181,102 @@ func instantiateResource(ac appcontext.AppContext, resmap instMap, appname strin } -func terminateResources(ac appcontext.AppContext, ressmap []instMap, appname string, clustername string) error { - var wg sync.WaitGroup - var chans = make([]chan int, len(ressmap)) - for l := range chans { - chans[l] = make(chan int) - } - for i:=0; i<len(ressmap); i++ { - wg.Add(1) - go func(index int) { - if ressmap[index].depinfo == "go" { - ressmap[index].status = "start" - } else { - ressmap[index].status = "waiting" - c := <- chans[index] - if c != index { +func terminateResources(ac appcontext.AppContext, ressmap []instMap, appname string, clustername string) error { + var wg sync.WaitGroup + var chans = make([]chan int, len(ressmap)) + for l := range chans { + chans[l] = make(chan int) + } + for i := 0; i < len(ressmap); i++ { + wg.Add(1) + go func(index int) { + if ressmap[index].depinfo == "go" { + ressmap[index].status = "start" + } else { + ressmap[index].status = "waiting" + c := <-chans[index] + if c != index { ressmap[index].status = "error" - ressmap[index].rerr = pkgerrors.Errorf("channel does not match") + ressmap[index].rerr = pkgerrors.Errorf("channel does not match") wg.Done() return - } - ressmap[index].status = "start" - } - ressmap[index].rerr = terminateResource(ac, ressmap[index], appname, clustername) - ressmap[index].status = "done" - waitstr := fmt.Sprintf("wait on %v",ressmap[index].name) - for j:=0; j<len(ressmap); j++ { - if ressmap[j].depinfo == waitstr { - chans[j] <- j - } - } - wg.Done() - }(i) - } - wg.Wait() - for k:=0; k<len(ressmap); k++ { - if ressmap[k].rerr != nil { - return pkgerrors.Errorf("error during resources termination") - } - } - return nil + } + ressmap[index].status = "start" + } + ressmap[index].rerr = terminateResource(ac, ressmap[index], appname, clustername) + ressmap[index].status = "done" + waitstr := fmt.Sprintf("wait on %v", ressmap[index].name) + for j := 0; j < len(ressmap); j++ { + if ressmap[j].depinfo == waitstr { + chans[j] <- j + } + } + wg.Done() + }(i) + } + wg.Wait() + for k := 0; k < len(ressmap); k++ { + if ressmap[k].rerr != nil { + return pkgerrors.Errorf("error during resources termination") + } + } + return nil } -func instantiateResources(ac appcontext.AppContext, ressmap []instMap, appname string, clustername string) error { - var wg sync.WaitGroup - var chans = make([]chan int, len(ressmap)) - for l := range chans { - chans[l] = make(chan int) - } - for i:=0; i<len(ressmap); i++ { - wg.Add(1) - go func(index int) { - if ressmap[index].depinfo == "go" { - ressmap[index].status = "start" - } else { - ressmap[index].status = "waiting" - c := <- chans[index] - if c != index { +func instantiateResources(ac appcontext.AppContext, ressmap []instMap, appname string, clustername string) error { + var wg sync.WaitGroup + var chans = make([]chan int, len(ressmap)) + cid, _ := ac.GetCompositeAppHandle() + + results := strings.Split(cid.(string), "/") + label := results[2] + "-" + appname + + for l := range chans { + chans[l] = make(chan int) + } + for i := 0; i < len(ressmap); i++ { + wg.Add(1) + go func(index int) { + if ressmap[index].depinfo == "go" { + ressmap[index].status = "start" + } else { + ressmap[index].status = "waiting" + c := <-chans[index] + if c != index { ressmap[index].status = "error" - ressmap[index].rerr = pkgerrors.Errorf("channel does not match") + ressmap[index].rerr = pkgerrors.Errorf("channel does not match") wg.Done() return - } - ressmap[index].status = "start" - } - ressmap[index].rerr = instantiateResource(ac, ressmap[index], appname, clustername) - ressmap[index].status = "done" - waitstr := fmt.Sprintf("wait on %v",ressmap[index].name) - for j:=0; j<len(ressmap); j++ { - if ressmap[j].depinfo == waitstr { - chans[j] <- j - } - } - wg.Done() - }(i) - } - wg.Wait() - for k:=0; k<len(ressmap); k++ { - if ressmap[k].rerr != nil { - return pkgerrors.Errorf("error during resources instantiation") - } - } - return nil + } + ressmap[index].status = "start" + } + ressmap[index].rerr = instantiateResource(ac, ressmap[index], appname, clustername, label) + ressmap[index].status = "done" + waitstr := fmt.Sprintf("wait on %v", ressmap[index].name) + for j := 0; j < len(ressmap); j++ { + if ressmap[j].depinfo == waitstr { + chans[j] <- j + } + } + wg.Done() + }(i) + } + wg.Wait() + for k := 0; k < len(ressmap); k++ { + if ressmap[k].rerr != nil { + return pkgerrors.Errorf("error during resources instantiation") + } + } + return nil } func terminateApp(ac appcontext.AppContext, appmap instMap) error { - for i:=0; i<len(appmap.clusters); i++ { + for i := 0; i < len(appmap.clusters); i++ { err := terminateResources(ac, appmap.clusters[i].ressmap, appmap.name, - appmap.clusters[i].name) + appmap.clusters[i].name) if err != nil { return err } @@ -281,38 +287,41 @@ func terminateApp(ac appcontext.AppContext, appmap instMap) error { } - func instantiateApp(ac appcontext.AppContext, appmap instMap) error { - for i:=0; i<len(appmap.clusters); i++ { + for i := 0; i < len(appmap.clusters); i++ { err := instantiateResources(ac, appmap.clusters[i].ressmap, appmap.name, - appmap.clusters[i].name) + appmap.clusters[i].name) if err != nil { return err } + err = status.StartClusterWatcher(appmap.clusters[i].name) + if err != nil { + log.Printf("Error starting Cluster Watcher %v: %v\n", appmap.clusters[i], err) + } } log.Println("Instantiation of app done: " + appmap.name) return nil } -func instantiateApps(ac appcontext.AppContext, appsmap []instMap) error { +func instantiateApps(ac appcontext.AppContext, appsmap []instMap) error { var wg sync.WaitGroup var chans = make([]chan int, len(appsmap)) for l := range chans { chans[l] = make(chan int) } - for i:=0; i<len(appsmap); i++ { + for i := 0; i < len(appsmap); i++ { wg.Add(1) - go func(index int) { - if appsmap[index].depinfo == "go" { + go func(index int) { + if appsmap[index].depinfo == "go" { appsmap[index].status = "start" } else { appsmap[index].status = "waiting" - c := <- chans[index] + c := <-chans[index] if c != index { appsmap[index].status = "error" - appsmap[index].rerr = pkgerrors.Errorf("channel does not match") + appsmap[index].rerr = pkgerrors.Errorf("channel does not match") wg.Done() return } @@ -320,17 +329,17 @@ func instantiateApps(ac appcontext.AppContext, appsmap []instMap) error { } appsmap[index].rerr = instantiateApp(ac, appsmap[index]) appsmap[index].status = "done" - waitstr := fmt.Sprintf("wait on %v",appsmap[index].name) - for j:=0; j<len(appsmap); j++ { + waitstr := fmt.Sprintf("wait on %v", appsmap[index].name) + for j := 0; j < len(appsmap); j++ { if appsmap[j].depinfo == waitstr { chans[j] <- j } } wg.Done() - }(i) - } + }(i) + } wg.Wait() - for k:=0; k<len(appsmap); k++ { + for k := 0; k < len(appsmap); k++ { if appsmap[k].rerr != nil { return pkgerrors.Errorf("error during apps instantiation") } @@ -343,45 +352,45 @@ func (instca *CompositeAppContext) InstantiateComApp(cid interface{}) error { ac := appcontext.AppContext{} _, err := ac.LoadAppContext(cid) - if err != nil { - return err - } + if err != nil { + return err + } instca.cid = cid appsorder, err := ac.GetAppInstruction("order") - if err != nil { - return err - } + if err != nil { + return err + } instca.appsorder = appsorder.(string) appsdependency, err := ac.GetAppInstruction("dependency") - if err != nil { - return err - } + if err != nil { + return err + } instca.appsdependency = appsdependency.(string) - instca.appsmap, err = getInstMap(instca.appsorder,instca.appsdependency, "app") - if err != nil { - return err - } + instca.appsmap, err = getInstMap(instca.appsorder, instca.appsdependency, "app") + if err != nil { + return err + } - for j:=0; j<len(instca.appsmap); j++ { + for j := 0; j < len(instca.appsmap); j++ { clusternames, err := ac.GetClusterNames(instca.appsmap[j].name) if err != nil { - return err + return err } instca.appsmap[j].clusters = make([]clusterInfo, len(clusternames)) - for k:=0; k<len(clusternames); k++ { + for k := 0; k < len(clusternames); k++ { instca.appsmap[j].clusters[k].name = clusternames[k] resorder, err := ac.GetResourceInstruction( - instca.appsmap[j].name, clusternames[k], "order") + instca.appsmap[j].name, clusternames[k], "order") if err != nil { - return err + return err } instca.appsmap[j].clusters[k].resorder = resorder.(string) resdependency, err := ac.GetResourceInstruction( - instca.appsmap[j].name, clusternames[k], "dependency") + instca.appsmap[j].name, clusternames[k], "dependency") if err != nil { - return err + return err } instca.appsmap[j].clusters[k].resdependency = resdependency.(string) @@ -389,36 +398,36 @@ func (instca *CompositeAppContext) InstantiateComApp(cid interface{}) error { instca.appsmap[j].clusters[k].resorder, instca.appsmap[j].clusters[k].resdependency, "res") if err != nil { - return err + return err } } } err = instantiateApps(ac, instca.appsmap) - if err != nil { - return err - } + if err != nil { + return err + } return nil } // Delete all the apps -func terminateApps(ac appcontext.AppContext, appsmap []instMap) error { +func terminateApps(ac appcontext.AppContext, appsmap []instMap) error { var wg sync.WaitGroup var chans = make([]chan int, len(appsmap)) for l := range chans { chans[l] = make(chan int) } - for i:=0; i<len(appsmap); i++ { + for i := 0; i < len(appsmap); i++ { wg.Add(1) - go func(index int) { - if appsmap[index].depinfo == "go" { + go func(index int) { + if appsmap[index].depinfo == "go" { appsmap[index].status = "start" } else { appsmap[index].status = "waiting" - c := <- chans[index] + c := <-chans[index] if c != index { appsmap[index].status = "error" - appsmap[index].rerr = pkgerrors.Errorf("channel does not match") + appsmap[index].rerr = pkgerrors.Errorf("channel does not match") wg.Done() return } @@ -426,17 +435,17 @@ func terminateApps(ac appcontext.AppContext, appsmap []instMap) error { } appsmap[index].rerr = terminateApp(ac, appsmap[index]) appsmap[index].status = "done" - waitstr := fmt.Sprintf("wait on %v",appsmap[index].name) - for j:=0; j<len(appsmap); j++ { + waitstr := fmt.Sprintf("wait on %v", appsmap[index].name) + for j := 0; j < len(appsmap); j++ { if appsmap[j].depinfo == waitstr { chans[j] <- j } } wg.Done() - }(i) - } + }(i) + } wg.Wait() - for k:=0; k<len(appsmap); k++ { + for k := 0; k < len(appsmap); k++ { if appsmap[k].rerr != nil { return pkgerrors.Errorf("error during apps instantiation") } @@ -444,50 +453,51 @@ func terminateApps(ac appcontext.AppContext, appsmap []instMap) error { return nil } + // Delete all the resources for a given context func (instca *CompositeAppContext) TerminateComApp(cid interface{}) error { ac := appcontext.AppContext{} _, err := ac.LoadAppContext(cid) - if err != nil { - return err - } + if err != nil { + return err + } instca.cid = cid appsorder, err := ac.GetAppInstruction("order") - if err != nil { - return err - } + if err != nil { + return err + } instca.appsorder = appsorder.(string) appsdependency, err := ac.GetAppInstruction("dependency") - if err != nil { - return err - } + if err != nil { + return err + } instca.appsdependency = appsdependency.(string) - instca.appsmap, err = getInstMap(instca.appsorder,instca.appsdependency, "app") - if err != nil { - return err - } + instca.appsmap, err = getInstMap(instca.appsorder, instca.appsdependency, "app") + if err != nil { + return err + } - for j:=0; j<len(instca.appsmap); j++ { + for j := 0; j < len(instca.appsmap); j++ { clusternames, err := ac.GetClusterNames(instca.appsmap[j].name) if err != nil { - return err + return err } instca.appsmap[j].clusters = make([]clusterInfo, len(clusternames)) - for k:=0; k<len(clusternames); k++ { + for k := 0; k < len(clusternames); k++ { instca.appsmap[j].clusters[k].name = clusternames[k] resorder, err := ac.GetResourceInstruction( - instca.appsmap[j].name, clusternames[k], "order") + instca.appsmap[j].name, clusternames[k], "order") if err != nil { - return err + return err } instca.appsmap[j].clusters[k].resorder = resorder.(string) resdependency, err := ac.GetResourceInstruction( - instca.appsmap[j].name, clusternames[k], "dependency") + instca.appsmap[j].name, clusternames[k], "dependency") if err != nil { - return err + return err } instca.appsmap[j].clusters[k].resdependency = resdependency.(string) @@ -495,14 +505,14 @@ func (instca *CompositeAppContext) TerminateComApp(cid interface{}) error { instca.appsmap[j].clusters[k].resorder, instca.appsmap[j].clusters[k].resdependency, "res") if err != nil { - return err + return err } } } err = terminateApps(ac, instca.appsmap) - if err != nil { - return err - } + if err != nil { + return err + } return nil diff --git a/src/rsync/pkg/resource/resource.go b/src/rsync/pkg/resource/resource.go index 8b45c341..2877e2a3 100644 --- a/src/rsync/pkg/resource/resource.go +++ b/src/rsync/pkg/resource/resource.go @@ -20,16 +20,15 @@ import ( "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/runtime/schema" - utils "github.com/onap/multicloud-k8s/src/rsync/pkg/internal" - "github.com/onap/multicloud-k8s/src/rsync/pkg/internal/config" "github.com/onap/multicloud-k8s/src/rsync/pkg/connector" + utils "github.com/onap/multicloud-k8s/src/rsync/pkg/internal" ) type Resource struct { } // Create deployment object in a specific Kubernetes cluster -func (r Resource) Create(data string, namespace string, client connector.KubernetesConnector) (string, error) { +func (r Resource) Create(data string, namespace string, label string, client connector.KubernetesConnector) (string, error) { if namespace == "" { namespace = "default" } @@ -57,13 +56,15 @@ func (r Resource) Create(data string, namespace string, client connector.Kuberne if labels == nil { labels = map[string]string{} } - labels[config.GetConfiguration().KubernetesLabelName] = client.GetInstanceID() + //labels[config.GetConfiguration().KubernetesLabelName] = client.GetInstanceID() + labels["emco/deployment-id"] = label unstruct.SetLabels(labels) // This checks if the resource we are creating has a podSpec in it // Eg: Deployment, StatefulSet, Job etc.. // If a PodSpec is found, the label will be added to it too. - connector.TagPodsIfPresent(unstruct, client.GetInstanceID()) + //connector.TagPodsIfPresent(unstruct, client.GetInstanceID()) + connector.TagPodsIfPresent(unstruct, label) gvr := mapping.Resource var createdObj *unstructured.Unstructured @@ -86,44 +87,44 @@ func (r Resource) Create(data string, namespace string, client connector.Kuberne // Delete an existing resource hosted in a specific Kubernetes cluster func (r Resource) Delete(data string, resname string, namespace string, client connector.KubernetesConnector) error { - if namespace == "" { - namespace = "default" - } - - //Decode the yaml file to create a runtime.Object - unstruct := &unstructured.Unstructured{} - //Ignore the returned obj as we expect the data in unstruct - _, err := utils.DecodeYAMLData(data, unstruct) - if err != nil { - return pkgerrors.Wrap(err, "Decode deployment object error") - } - - dynClient := client.GetDynamicClient() - mapper := client.GetMapper() - - gvk := unstruct.GroupVersionKind() - mapping, err := mapper.RESTMapping(schema.GroupKind{Group: gvk.Group, Kind: gvk.Kind}, gvk.Version) - if err != nil { - return pkgerrors.Wrap(err, "Mapping kind to resource error") - } - - gvr := mapping.Resource - deletePolicy := metav1.DeletePropagationForeground - opts := &metav1.DeleteOptions{ - PropagationPolicy: &deletePolicy, - } - - switch mapping.Scope.Name() { - case meta.RESTScopeNameNamespace: - err = dynClient.Resource(gvr).Namespace(namespace).Delete(resname, opts) - case meta.RESTScopeNameRoot: - err = dynClient.Resource(gvr).Delete(resname, opts) - default: - return pkgerrors.New("Got an unknown RESTSCopeName for mappin") - } - - if err != nil { - return pkgerrors.Wrap(err, "Delete object error") - } - return nil + if namespace == "" { + namespace = "default" + } + + //Decode the yaml file to create a runtime.Object + unstruct := &unstructured.Unstructured{} + //Ignore the returned obj as we expect the data in unstruct + _, err := utils.DecodeYAMLData(data, unstruct) + if err != nil { + return pkgerrors.Wrap(err, "Decode deployment object error") + } + + dynClient := client.GetDynamicClient() + mapper := client.GetMapper() + + gvk := unstruct.GroupVersionKind() + mapping, err := mapper.RESTMapping(schema.GroupKind{Group: gvk.Group, Kind: gvk.Kind}, gvk.Version) + if err != nil { + return pkgerrors.Wrap(err, "Mapping kind to resource error") + } + + gvr := mapping.Resource + deletePolicy := metav1.DeletePropagationForeground + opts := &metav1.DeleteOptions{ + PropagationPolicy: &deletePolicy, + } + + switch mapping.Scope.Name() { + case meta.RESTScopeNameNamespace: + err = dynClient.Resource(gvr).Namespace(namespace).Delete(resname, opts) + case meta.RESTScopeNameRoot: + err = dynClient.Resource(gvr).Delete(resname, opts) + default: + return pkgerrors.New("Got an unknown RESTSCopeName for mappin") + } + + if err != nil { + return pkgerrors.Wrap(err, "Delete object error") + } + return nil } diff --git a/src/rsync/pkg/status/status.go b/src/rsync/pkg/status/status.go index 28bffefd..351da027 100644 --- a/src/rsync/pkg/status/status.go +++ b/src/rsync/pkg/status/status.go @@ -17,16 +17,20 @@ package status import ( + "encoding/base64" "encoding/json" "fmt" + "strings" "sync" pkgerrors "github.com/pkg/errors" "github.com/sirupsen/logrus" + "github.com/onap/multicloud-k8s/src/clm/pkg/cluster" v1alpha1 "github.com/onap/multicloud-k8s/src/monitor/pkg/apis/k8splugin/v1alpha1" clientset "github.com/onap/multicloud-k8s/src/monitor/pkg/generated/clientset/versioned" informers "github.com/onap/multicloud-k8s/src/monitor/pkg/generated/informers/externalversions" + appcontext "github.com/onap/multicloud-k8s/src/orchestrator/pkg/appcontext" "k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/clientcmd" ) @@ -42,20 +46,75 @@ const monitorLabel = "emco/deployment-id" // HandleStatusUpdate for an application in a cluster // TODO: Add code for specific handling -func HandleStatusUpdate(provider, name string, id string, v *v1alpha1.ResourceBundleState) error { - logrus.Info("label::", id) +func HandleStatusUpdate(clusterId string, id string, v *v1alpha1.ResourceBundleState) { //status := v.Status.ServiceStatuses //podStatus := v.Status.PodStatuses - // Store Pod Status in app context - out, _ := json.Marshal(v.Status) - logrus.Info("Status::", string(out)) - return nil + + // Get the contextId from the label (id) + result := strings.SplitN(id, "-", 2) + if result[0] == "" { + logrus.Info(clusterId, "::label is missing an appcontext identifier::", id) + return + } + + if len(result) != 2 { + logrus.Info(clusterId, "::invalid label format::", id) + return + } + + // Get the app from the label (id) + if result[1] == "" { + logrus.Info(clusterId, "::label is missing an app identifier::", id) + return + } + + // Look up the contextId + var ac appcontext.AppContext + _, err := ac.LoadAppContext(result[0]) + if err != nil { + logrus.Info(clusterId, "::App context not found::", result[0], "::Error::", err) + return + } + + // produce yaml representation of the status + vjson, err := json.Marshal(v.Status) + if err != nil { + logrus.Info(clusterId, "::Error marshalling status information::", err) + return + } + + // Get the handle for the context/app/cluster status object + handle, err := ac.GetStatusHandle(result[1], clusterId) + if err != nil { + // Expected first time + logrus.Info(clusterId, "::Status context handle not found::", id, "::Error::", err) + } + + // If status handle was not found, then create the status object in the appcontext + if handle == nil { + chandle, err := ac.GetClusterHandle(result[1], clusterId) + if err != nil { + logrus.Info(clusterId, "::Cluster context handle not found::", id, "::Error::", err) + } else { + ac.AddStatus(chandle, string(vjson)) + } + } else { + ac.UpdateStatusValue(handle, string(vjson)) + } + + return } // StartClusterWatcher watches for CR // configBytes - Kubectl file data -func StartClusterWatcher(provider, name string, configBytes []byte) error { - key := provider + "+" + name +func StartClusterWatcher(clusterId string) error { + + configBytes, err := getKubeConfig(clusterId) + if err != nil { + return err + } + + //key := provider + "+" + name // Get the lock channelData.Lock() defer channelData.Unlock() @@ -63,10 +122,10 @@ func StartClusterWatcher(provider, name string, configBytes []byte) error { if channelData.channels == nil { channelData.channels = make(map[string]chan struct{}) } - _, ok := channelData.channels[key] + _, ok := channelData.channels[clusterId] if !ok { // Create Channel - channelData.channels[key] = make(chan struct{}) + channelData.channels[clusterId] = make(chan struct{}) // Create config config, err := clientcmd.RESTConfigFromKubeConfig(configBytes) if err != nil { @@ -80,16 +139,16 @@ func StartClusterWatcher(provider, name string, configBytes []byte) error { // Create Informer mInformerFactory := informers.NewSharedInformerFactory(k8sClient, 0) mInformer := mInformerFactory.K8splugin().V1alpha1().ResourceBundleStates().Informer() - go scheduleStatus(provider, name, channelData.channels[key], mInformer) + go scheduleStatus(clusterId, channelData.channels[clusterId], mInformer) } return nil } // StopClusterWatcher stop watching a cluster -func StopClusterWatcher(provider, name string) { - key := provider + "+" + name +func StopClusterWatcher(clusterId string) { + //key := provider + "+" + name if channelData.channels != nil { - c, ok := channelData.channels[key] + c, ok := channelData.channels[clusterId] if ok { close(c) } @@ -108,7 +167,7 @@ func CloseAllClusterWatchers() { } // Per Cluster Go routine to watch CR -func scheduleStatus(provider, name string, c <-chan struct{}, s cache.SharedIndexInformer) { +func scheduleStatus(clusterId string, c <-chan struct{}, s cache.SharedIndexInformer) { handlers := cache.ResourceEventHandlerFuncs{ AddFunc: func(obj interface{}) { v, ok := obj.(*v1alpha1.ResourceBundleState) @@ -116,7 +175,7 @@ func scheduleStatus(provider, name string, c <-chan struct{}, s cache.SharedInde labels := v.GetLabels() l, ok := labels[monitorLabel] if ok { - HandleStatusUpdate(provider, name, l, v) + HandleStatusUpdate(clusterId, l, v) } } }, @@ -126,7 +185,7 @@ func scheduleStatus(provider, name string, c <-chan struct{}, s cache.SharedInde labels := v.GetLabels() l, ok := labels[monitorLabel] if ok { - HandleStatusUpdate(provider, name, l, v) + HandleStatusUpdate(clusterId, l, v) } } }, @@ -137,3 +196,27 @@ func scheduleStatus(provider, name string, c <-chan struct{}, s cache.SharedInde s.AddEventHandler(handlers) s.Run(c) } + +// getKubeConfig uses the connectivity client to get the kubeconfig based on the name +// of the clustername. This is written out to a file. +// TODO - consolidate with other rsync methods to get kubeconfig files +func getKubeConfig(clustername string) ([]byte, error) { + + if !strings.Contains(clustername, "+") { + return nil, pkgerrors.New("Not a valid cluster name") + } + strs := strings.Split(clustername, "+") + if len(strs) != 2 { + return nil, pkgerrors.New("Not a valid cluster name") + } + kubeConfig, err := cluster.NewClusterClient().GetClusterContent(strs[0], strs[1]) + if err != nil { + return nil, pkgerrors.New("Get kubeconfig failed") + } + + dec, err := base64.StdEncoding.DecodeString(kubeConfig.Kubeconfig) + if err != nil { + return nil, err + } + return dec, nil +} |