diff options
author | Ritu Sood <Ritu.Sood@intel.com> | 2020-08-14 22:57:11 +0000 |
---|---|---|
committer | Gerrit Code Review <gerrit@onap.org> | 2020-08-14 22:57:11 +0000 |
commit | 6e5ca4741dab0de3b4d89f410f0ff9d0313d6aee (patch) | |
tree | 6f4dbbdf8c13944cf5b95d2d59fd565ebee778b9 /src/rsync/pkg/context/context.go | |
parent | 6e671649f67e9bbe2ebdea18efa18e9c2c506946 (diff) | |
parent | 709d6d17a3b2f8bc9d46034295bd7c5a7fb76107 (diff) |
Merge "Add appcontext state, status and resource status"
Diffstat (limited to 'src/rsync/pkg/context/context.go')
-rw-r--r-- | src/rsync/pkg/context/context.go | 193 |
1 files changed, 176 insertions, 17 deletions
diff --git a/src/rsync/pkg/context/context.go b/src/rsync/pkg/context/context.go index 2fadfd00..f77482e6 100644 --- a/src/rsync/pkg/context/context.go +++ b/src/rsync/pkg/context/context.go @@ -20,12 +20,12 @@ import ( "context" "encoding/json" "fmt" - "log" "strings" "time" "github.com/onap/multicloud-k8s/src/orchestrator/pkg/appcontext" "github.com/onap/multicloud-k8s/src/orchestrator/pkg/infra/logutils" + "github.com/onap/multicloud-k8s/src/orchestrator/pkg/resourcestatus" kubeclient "github.com/onap/multicloud-k8s/src/rsync/pkg/client" connector "github.com/onap/multicloud-k8s/src/rsync/pkg/connector" utils "github.com/onap/multicloud-k8s/src/rsync/pkg/internal" @@ -39,40 +39,49 @@ type CompositeAppContext struct { cid interface{} } -func getRes(ac appcontext.AppContext, name string, app string, cluster string) ([]byte, error) { +func getRes(ac appcontext.AppContext, name string, app string, cluster string) ([]byte, interface{}, error) { var byteRes []byte rh, err := ac.GetResourceHandle(app, cluster, name) if err != nil { - return nil, err + return nil, nil, err + } + sh, err := ac.GetLevelHandle(rh, "status") + if err != nil { + return nil, nil, err } resval, err := ac.GetValue(rh) if err != nil { - return nil, err + return nil, sh, err } if resval != "" { result := strings.Split(name, "+") if result[0] == "" { - return nil, pkgerrors.Errorf("Resource name is nil %s:", name) + return nil, sh, pkgerrors.Errorf("Resource name is nil %s:", name) } byteRes = []byte(fmt.Sprintf("%v", resval.(interface{}))) } else { - return nil, pkgerrors.Errorf("Resource value is nil %s", name) + return nil, sh, pkgerrors.Errorf("Resource value is nil %s", name) } - return byteRes, nil + return byteRes, sh, nil } func terminateResource(ac appcontext.AppContext, c *kubeclient.Client, name string, app string, cluster string, label string) error { - res, err := getRes(ac, name, app, cluster) + res, sh, err := getRes(ac, name, app, cluster) if err != nil { + if sh != nil { + ac.UpdateStatusValue(sh, resourcestatus.ResourceStatus{Status: resourcestatus.RsyncStatusEnum.Failed}) + } return err } if err := c.Delete(res); err != nil { + ac.UpdateStatusValue(sh, resourcestatus.ResourceStatus{Status: resourcestatus.RsyncStatusEnum.Failed}) logutils.Error("Failed to delete res", logutils.Fields{ "error": err, "resource": name, }) return err } + ac.UpdateStatusValue(sh, resourcestatus.ResourceStatus{Status: resourcestatus.RsyncStatusEnum.Deleted}) logutils.Info("Deleted::", logutils.Fields{ "cluster": cluster, "resource": name, @@ -81,8 +90,11 @@ func terminateResource(ac appcontext.AppContext, c *kubeclient.Client, name stri } func instantiateResource(ac appcontext.AppContext, c *kubeclient.Client, name string, app string, cluster string, label string) error { - res, err := getRes(ac, name, app, cluster) + res, sh, err := getRes(ac, name, app, cluster) if err != nil { + if sh != nil { + ac.UpdateStatusValue(sh, resourcestatus.ResourceStatus{Status: resourcestatus.RsyncStatusEnum.Failed}) + } return err } //Decode the yaml to create a runtime.Object @@ -117,12 +129,14 @@ func instantiateResource(ac appcontext.AppContext, c *kubeclient.Client, name st return err } if err := c.Apply(b); err != nil { + ac.UpdateStatusValue(sh, resourcestatus.ResourceStatus{Status: resourcestatus.RsyncStatusEnum.Failed}) logutils.Error("Failed to apply res", logutils.Fields{ "error": err, "resource": name, }) return err } + ac.UpdateStatusValue(sh, resourcestatus.ResourceStatus{Status: resourcestatus.RsyncStatusEnum.Applied}) logutils.Info("Installed::", logutils.Fields{ "cluster": cluster, "resource": name, @@ -149,6 +163,97 @@ func waitForClusterReady(c *kubeclient.Client, cluster string) error { return nil } +// initializeResourceStatus sets the initial status of every resource appropriately based on the state of the AppContext +func initializeAppContextStatus(ac appcontext.AppContext, acStatus appcontext.AppContextStatus) error { + h, err := ac.GetCompositeAppHandle() + if err != nil { + return err + } + sh, err := ac.GetLevelHandle(h, "status") + if sh == nil { + _, err = ac.AddLevelValue(h, "status", acStatus) + } else { + err = ac.UpdateValue(sh, acStatus) + } + if err != nil { + return err + } + return nil +} + +// initializeResourceStatus sets the initial status of every resource appropriately based on the state of the AppContext +func initializeResourceStatus(ac appcontext.AppContext, acStatus appcontext.AppContextStatus) error { + statusPending := resourcestatus.ResourceStatus{ + Status: resourcestatus.RsyncStatusEnum.Pending, + } + statusDeleted := resourcestatus.ResourceStatus{ + Status: resourcestatus.RsyncStatusEnum.Deleted, + } + + appsOrder, err := ac.GetAppInstruction("order") + if err != nil { + return err + } + var appList map[string][]string + json.Unmarshal([]byte(appsOrder.(string)), &appList) + + for _, app := range appList["apporder"] { + clusterNames, err := ac.GetClusterNames(app) + if err != nil { + return err + } + for k := 0; k < len(clusterNames); k++ { + cluster := clusterNames[k] + resorder, err := ac.GetResourceInstruction(app, cluster, "order") + if err != nil { + return err + } + var aov map[string][]string + json.Unmarshal([]byte(resorder.(string)), &aov) + for _, res := range aov["resorder"] { + rh, err := ac.GetResourceHandle(app, cluster, res) + if err != nil { + return err + } + sh, err := ac.GetLevelHandle(rh, "status") + if acStatus.Status == appcontext.AppContextStatusEnum.Instantiating { + if sh == nil { + _, err = ac.AddLevelValue(rh, "status", statusPending) + } else { + err = ac.UpdateStatusValue(sh, statusPending) + } + if err != nil { + return err + } + } else if acStatus.Status == appcontext.AppContextStatusEnum.Terminating { + if sh == nil { + _, err = ac.AddLevelValue(rh, "status", statusDeleted) + } else { + s, err := ac.GetValue(sh) + if err != nil { + return err + } + rStatus := resourcestatus.ResourceStatus{} + js, _ := json.Marshal(s) + json.Unmarshal(js, &rStatus) + if rStatus.Status == resourcestatus.RsyncStatusEnum.Applied { + err = ac.UpdateStatusValue(sh, statusPending) + } else { + err = ac.UpdateStatusValue(sh, statusDeleted) + } + if err != nil { + return err + } + } + } else { + return pkgerrors.Errorf("Error intializing AppContext Resource Statuses") + } + } + } + } + return nil +} + func addStatusTracker(c *kubeclient.Client, app string, cluster string, label string) error { b, err := status.GetStatusCR(label) @@ -202,20 +307,62 @@ func deleteStatusTracker(c *kubeclient.Client, app string, cluster string, label return nil } +func updateEndingAppContextStatus(ac appcontext.AppContext, handle interface{}, failure bool) error { + sh, err := ac.GetLevelHandle(handle, "status") + if err != nil { + return err + } + s, err := ac.GetValue(sh) + if err != nil { + return err + } + acStatus := appcontext.AppContextStatus{} + js, _ := json.Marshal(s) + json.Unmarshal(js, &acStatus) + + if failure { + acStatus.Status = appcontext.AppContextStatusEnum.Failed + } else if acStatus.Status == appcontext.AppContextStatusEnum.Instantiating { + acStatus.Status = appcontext.AppContextStatusEnum.Instantiated + } else if acStatus.Status == appcontext.AppContextStatusEnum.Terminating { + acStatus.Status = appcontext.AppContextStatusEnum.Terminated + } else { + return pkgerrors.Errorf("Invalid AppContextStatus %v", acStatus) + } + + err = ac.UpdateValue(sh, acStatus) + if err != nil { + return err + } + return nil +} + type fn func(ac appcontext.AppContext, client *kubeclient.Client, res string, app string, cluster string, label string) error type statusfn func(client *kubeclient.Client, app string, cluster string, label string) error -func applyFnComApp(cid interface{}, f fn, sfn statusfn, breakonError bool) error { - +func applyFnComApp(cid interface{}, acStatus appcontext.AppContextStatus, f fn, sfn statusfn, breakonError bool) error { con := connector.Init(cid) //Cleanup defer con.RemoveClient() ac := appcontext.AppContext{} - _, err := ac.LoadAppContext(cid) + h, err := ac.LoadAppContext(cid) + if err != nil { + return err + } + + // initialize appcontext status + err = initializeAppContextStatus(ac, acStatus) + if err != nil { + return err + } + + // initialize the resource status values before proceeding with the function + err = initializeResourceStatus(ac, acStatus) if err != nil { return err } + appsOrder, err := ac.GetAppInstruction("order") if err != nil { return err @@ -243,7 +390,10 @@ func applyFnComApp(cid interface{}, f fn, sfn statusfn, breakonError bool) error cluster := clusterNames[k] err = status.StartClusterWatcher(cluster) if err != nil { - log.Printf("Error starting Cluster Watcher %v: %v\n", cluster, err) + logutils.Error("Error starting Cluster Watcher", logutils.Fields{ + "error": err, + "cluster": cluster, + }) } g.Go(func() error { c, err := con.GetClient(cluster) @@ -315,24 +465,33 @@ func applyFnComApp(cid interface{}, f fn, sfn statusfn, breakonError bool) error } // Wait for all subtasks to complete if err := g.Wait(); err != nil { + uperr := updateEndingAppContextStatus(ac, h, true) + if uperr != nil { + logutils.Error("Encountered error updating AppContext to Failed status", logutils.Fields{"error": uperr}) + } logutils.Error("Encountered error", logutils.Fields{ "error": err, }) return err } + err = updateEndingAppContextStatus(ac, h, false) + if err != nil { + logutils.Error("Encountered error updating AppContext status", logutils.Fields{"error": err}) + return err + } return nil } // InstantiateComApp Instantiate Apps in Composite App func (instca *CompositeAppContext) InstantiateComApp(cid interface{}) error { - // Start handling and return grpc immediately - go applyFnComApp(cid, instantiateResource, addStatusTracker, true) + go applyFnComApp(cid, appcontext.AppContextStatus{Status: appcontext.AppContextStatusEnum.Instantiating}, + instantiateResource, addStatusTracker, true) return nil } // TerminateComApp Terminates Apps in Composite App func (instca *CompositeAppContext) TerminateComApp(cid interface{}) error { - // Start handling and return grpc immediately - go applyFnComApp(cid, terminateResource, deleteStatusTracker, true) + go applyFnComApp(cid, appcontext.AppContextStatus{Status: appcontext.AppContextStatusEnum.Terminating}, + terminateResource, deleteStatusTracker, false) return nil } |