diff options
Diffstat (limited to 'src/rsync/pkg/context')
-rw-r--r-- | src/rsync/pkg/context/context.go | 303 |
1 files changed, 244 insertions, 59 deletions
diff --git a/src/rsync/pkg/context/context.go b/src/rsync/pkg/context/context.go index cc7773b8..f77482e6 100644 --- a/src/rsync/pkg/context/context.go +++ b/src/rsync/pkg/context/context.go @@ -20,11 +20,12 @@ import ( "context" "encoding/json" "fmt" - "log" "strings" + "time" "github.com/onap/multicloud-k8s/src/orchestrator/pkg/appcontext" "github.com/onap/multicloud-k8s/src/orchestrator/pkg/infra/logutils" + "github.com/onap/multicloud-k8s/src/orchestrator/pkg/resourcestatus" kubeclient "github.com/onap/multicloud-k8s/src/rsync/pkg/client" connector "github.com/onap/multicloud-k8s/src/rsync/pkg/connector" utils "github.com/onap/multicloud-k8s/src/rsync/pkg/internal" @@ -38,40 +39,49 @@ type CompositeAppContext struct { cid interface{} } -func getRes(ac appcontext.AppContext, name string, app string, cluster string) ([]byte, error) { +func getRes(ac appcontext.AppContext, name string, app string, cluster string) ([]byte, interface{}, error) { var byteRes []byte rh, err := ac.GetResourceHandle(app, cluster, name) if err != nil { - return nil, err + return nil, nil, err + } + sh, err := ac.GetLevelHandle(rh, "status") + if err != nil { + return nil, nil, err } resval, err := ac.GetValue(rh) if err != nil { - return nil, err + return nil, sh, err } if resval != "" { result := strings.Split(name, "+") if result[0] == "" { - return nil, pkgerrors.Errorf("Resource name is nil %s:", name) + return nil, sh, pkgerrors.Errorf("Resource name is nil %s:", name) } byteRes = []byte(fmt.Sprintf("%v", resval.(interface{}))) } else { - return nil, pkgerrors.Errorf("Resource value is nil %s", name) + return nil, sh, pkgerrors.Errorf("Resource value is nil %s", name) } - return byteRes, nil + return byteRes, sh, nil } func terminateResource(ac appcontext.AppContext, c *kubeclient.Client, name string, app string, cluster string, label string) error { - res, err := getRes(ac, name, app, cluster) + res, sh, err := getRes(ac, name, app, cluster) if err != nil { + if sh != nil { + ac.UpdateStatusValue(sh, resourcestatus.ResourceStatus{Status: resourcestatus.RsyncStatusEnum.Failed}) + } return err } if err := c.Delete(res); err != nil { + ac.UpdateStatusValue(sh, resourcestatus.ResourceStatus{Status: resourcestatus.RsyncStatusEnum.Failed}) logutils.Error("Failed to delete res", logutils.Fields{ "error": err, "resource": name, }) return err } + ac.UpdateStatusValue(sh, resourcestatus.ResourceStatus{Status: resourcestatus.RsyncStatusEnum.Deleted}) logutils.Info("Deleted::", logutils.Fields{ "cluster": cluster, "resource": name, @@ -80,8 +90,11 @@ func terminateResource(ac appcontext.AppContext, c *kubeclient.Client, name stri } func instantiateResource(ac appcontext.AppContext, c *kubeclient.Client, name string, app string, cluster string, label string) error { - res, err := getRes(ac, name, app, cluster) + res, sh, err := getRes(ac, name, app, cluster) if err != nil { + if sh != nil { + ac.UpdateStatusValue(sh, resourcestatus.ResourceStatus{Status: resourcestatus.RsyncStatusEnum.Failed}) + } return err } //Decode the yaml to create a runtime.Object @@ -116,12 +129,14 @@ func instantiateResource(ac appcontext.AppContext, c *kubeclient.Client, name st return err } if err := c.Apply(b); err != nil { + ac.UpdateStatusValue(sh, resourcestatus.ResourceStatus{Status: resourcestatus.RsyncStatusEnum.Failed}) logutils.Error("Failed to apply res", logutils.Fields{ "error": err, "resource": name, }) return err } + ac.UpdateStatusValue(sh, resourcestatus.ResourceStatus{Status: resourcestatus.RsyncStatusEnum.Applied}) logutils.Info("Installed::", logutils.Fields{ "cluster": cluster, "resource": name, @@ -129,6 +144,116 @@ func instantiateResource(ac appcontext.AppContext, c *kubeclient.Client, name st return nil } +// Wait for 2 secs +const waitTime = 2 + +func waitForClusterReady(c *kubeclient.Client, cluster string) error { + for { + if err := c.IsReachable(); err != nil { + // TODO: Add more realistic error checking + // TODO: Add Incremental wait logic here + time.Sleep(waitTime * time.Second) + } else { + break + } + } + logutils.Info("Cluster is reachable::", logutils.Fields{ + "cluster": cluster, + }) + return nil +} + +// initializeResourceStatus sets the initial status of every resource appropriately based on the state of the AppContext +func initializeAppContextStatus(ac appcontext.AppContext, acStatus appcontext.AppContextStatus) error { + h, err := ac.GetCompositeAppHandle() + if err != nil { + return err + } + sh, err := ac.GetLevelHandle(h, "status") + if sh == nil { + _, err = ac.AddLevelValue(h, "status", acStatus) + } else { + err = ac.UpdateValue(sh, acStatus) + } + if err != nil { + return err + } + return nil +} + +// initializeResourceStatus sets the initial status of every resource appropriately based on the state of the AppContext +func initializeResourceStatus(ac appcontext.AppContext, acStatus appcontext.AppContextStatus) error { + statusPending := resourcestatus.ResourceStatus{ + Status: resourcestatus.RsyncStatusEnum.Pending, + } + statusDeleted := resourcestatus.ResourceStatus{ + Status: resourcestatus.RsyncStatusEnum.Deleted, + } + + appsOrder, err := ac.GetAppInstruction("order") + if err != nil { + return err + } + var appList map[string][]string + json.Unmarshal([]byte(appsOrder.(string)), &appList) + + for _, app := range appList["apporder"] { + clusterNames, err := ac.GetClusterNames(app) + if err != nil { + return err + } + for k := 0; k < len(clusterNames); k++ { + cluster := clusterNames[k] + resorder, err := ac.GetResourceInstruction(app, cluster, "order") + if err != nil { + return err + } + var aov map[string][]string + json.Unmarshal([]byte(resorder.(string)), &aov) + for _, res := range aov["resorder"] { + rh, err := ac.GetResourceHandle(app, cluster, res) + if err != nil { + return err + } + sh, err := ac.GetLevelHandle(rh, "status") + if acStatus.Status == appcontext.AppContextStatusEnum.Instantiating { + if sh == nil { + _, err = ac.AddLevelValue(rh, "status", statusPending) + } else { + err = ac.UpdateStatusValue(sh, statusPending) + } + if err != nil { + return err + } + } else if acStatus.Status == appcontext.AppContextStatusEnum.Terminating { + if sh == nil { + _, err = ac.AddLevelValue(rh, "status", statusDeleted) + } else { + s, err := ac.GetValue(sh) + if err != nil { + return err + } + rStatus := resourcestatus.ResourceStatus{} + js, _ := json.Marshal(s) + json.Unmarshal(js, &rStatus) + if rStatus.Status == resourcestatus.RsyncStatusEnum.Applied { + err = ac.UpdateStatusValue(sh, statusPending) + } else { + err = ac.UpdateStatusValue(sh, statusDeleted) + } + if err != nil { + return err + } + } + } else { + return pkgerrors.Errorf("Error intializing AppContext Resource Statuses") + } + } + } + } + return nil +} + func addStatusTracker(c *kubeclient.Client, app string, cluster string, label string) error { b, err := status.GetStatusCR(label) @@ -139,6 +264,7 @@ func addStatusTracker(c *kubeclient.Client, app string, cluster string, label st }) return err } + // TODO: Check reachability? if err = c.Apply(b); err != nil { logutils.Error("Failed to apply status tracker", logutils.Fields{ "error": err, @@ -181,17 +307,62 @@ func deleteStatusTracker(c *kubeclient.Client, app string, cluster string, label return nil } +func updateEndingAppContextStatus(ac appcontext.AppContext, handle interface{}, failure bool) error { + sh, err := ac.GetLevelHandle(handle, "status") + if err != nil { + return err + } + s, err := ac.GetValue(sh) + if err != nil { + return err + } + acStatus := appcontext.AppContextStatus{} + js, _ := json.Marshal(s) + json.Unmarshal(js, &acStatus) + + if failure { + acStatus.Status = appcontext.AppContextStatusEnum.Failed + } else if acStatus.Status == appcontext.AppContextStatusEnum.Instantiating { + acStatus.Status = appcontext.AppContextStatusEnum.Instantiated + } else if acStatus.Status == appcontext.AppContextStatusEnum.Terminating { + acStatus.Status = appcontext.AppContextStatusEnum.Terminated + } else { + return pkgerrors.Errorf("Invalid AppContextStatus %v", acStatus) + } + + err = ac.UpdateValue(sh, acStatus) + if err != nil { + return err + } + return nil +} + type fn func(ac appcontext.AppContext, client *kubeclient.Client, res string, app string, cluster string, label string) error type statusfn func(client *kubeclient.Client, app string, cluster string, label string) error -func applyFnComApp(cid interface{}, con *connector.Connector, f fn, sfn statusfn, breakonError bool) error { +func applyFnComApp(cid interface{}, acStatus appcontext.AppContextStatus, f fn, sfn statusfn, breakonError bool) error { + con := connector.Init(cid) + //Cleanup + defer con.RemoveClient() ac := appcontext.AppContext{} - g, _ := errgroup.WithContext(context.Background()) - _, err := ac.LoadAppContext(cid) + h, err := ac.LoadAppContext(cid) + if err != nil { + return err + } + + // initialize appcontext status + err = initializeAppContextStatus(ac, acStatus) + if err != nil { + return err + } + + // initialize the resource status values before proceeding with the function + err = initializeResourceStatus(ac, acStatus) if err != nil { return err } + appsOrder, err := ac.GetAppInstruction("order") if err != nil { return err @@ -203,9 +374,9 @@ func applyFnComApp(cid interface{}, con *connector.Connector, f fn, sfn statusfn "string": appList, }) id, _ := ac.GetCompositeAppHandle() - + g, _ := errgroup.WithContext(context.Background()) + // Iterate over all the subapps for _, app := range appList["apporder"] { - appName := app results := strings.Split(id.(string), "/") label := results[2] + "-" + app @@ -214,14 +385,17 @@ func applyFnComApp(cid interface{}, con *connector.Connector, f fn, sfn statusfn if err != nil { return err } - rg, _ := errgroup.WithContext(context.Background()) + // Iterate over all clusters for k := 0; k < len(clusterNames); k++ { cluster := clusterNames[k] err = status.StartClusterWatcher(cluster) if err != nil { - log.Printf("Error starting Cluster Watcher %v: %v\n", cluster, err) + logutils.Error("Error starting Cluster Watcher", logutils.Fields{ + "error": err, + "cluster": cluster, + }) } - rg.Go(func() error { + g.Go(func() error { c, err := con.GetClient(cluster) if err != nil { logutils.Error("Error in creating kubeconfig client", logutils.Fields{ @@ -238,75 +412,86 @@ func applyFnComApp(cid interface{}, con *connector.Connector, f fn, sfn statusfn } var aov map[string][]string json.Unmarshal([]byte(resorder.(string)), &aov) - for i, res := range aov["resorder"] { - err = f(ac, c, res, appName, cluster, label) + // Keep retrying for reachability + for { + // Wait for cluster to be reachable + err = waitForClusterReady(c, cluster) if err != nil { - logutils.Error("Error in resource %s: %v", logutils.Fields{ - "error": err, - "cluster": cluster, - "resource": res, - }) - if breakonError { - // handle status tracking before exiting if at least one resource got handled - if i > 0 { - serr := sfn(c, appName, cluster, label) - if serr != nil { - logutils.Warn("Error handling status tracker", logutils.Fields{"error": serr}) + // TODO: Add error handling + return err + } + reachable := true + // Handle all resources in order + for i, res := range aov["resorder"] { + err = f(ac, c, res, appName, cluster, label) + if err != nil { + logutils.Error("Error in resource %s: %v", logutils.Fields{ + "error": err, + "cluster": cluster, + "resource": res, + }) + // If failure is due to reachability issues start retrying + if err = c.IsReachable(); err != nil { + reachable = false + break + } + if breakonError { + // handle status tracking before exiting if at least one resource got handled + if i > 0 { + serr := sfn(c, appName, cluster, label) + if serr != nil { + logutils.Warn("Error handling status tracker", logutils.Fields{"error": serr}) + } } + return err } - return err } } - } - serr := sfn(c, appName, cluster, label) - if serr != nil { - logutils.Warn("Error handling status tracker", logutils.Fields{"error": serr}) + // Check if the break from loop due to reachabilty issues + if reachable != false { + serr := sfn(c, appName, cluster, label) + if serr != nil { + logutils.Warn("Error handling status tracker", logutils.Fields{"error": serr}) + } + // Done processing cluster without errors + return nil + } } return nil }) } - if err := rg.Wait(); err != nil { - logutils.Error("Encountered error in App cluster", logutils.Fields{ - "error": err, - }) - return err - } return nil }) } + // Wait for all subtasks to complete if err := g.Wait(); err != nil { + uperr := updateEndingAppContextStatus(ac, h, true) + if uperr != nil { + logutils.Error("Encountered error updating AppContext to Failed status", logutils.Fields{"error": uperr}) + } logutils.Error("Encountered error", logutils.Fields{ "error": err, }) return err } + err = updateEndingAppContextStatus(ac, h, false) + if err != nil { + logutils.Error("Encountered error updating AppContext status", logutils.Fields{"error": err}) + return err + } return nil } // InstantiateComApp Instantiate Apps in Composite App func (instca *CompositeAppContext) InstantiateComApp(cid interface{}) error { - con := connector.Init(cid) - err := applyFnComApp(cid, con, instantiateResource, addStatusTracker, true) - if err != nil { - logutils.Error("InstantiateComApp unsuccessful", logutils.Fields{"error": err}) - return err - } - //Cleanup - con.RemoveClient() + go applyFnComApp(cid, appcontext.AppContextStatus{Status: appcontext.AppContextStatusEnum.Instantiating}, + instantiateResource, addStatusTracker, true) return nil } // TerminateComApp Terminates Apps in Composite App func (instca *CompositeAppContext) TerminateComApp(cid interface{}) error { - con := connector.Init(cid) - err := applyFnComApp(cid, con, terminateResource, deleteStatusTracker, false) - if err != nil { - logutils.Error("TerminateComApp unsuccessful", logutils.Fields{ - "error": err, - }) - return err - } - //Cleanup - con.RemoveClient() + go applyFnComApp(cid, appcontext.AppContextStatus{Status: appcontext.AppContextStatusEnum.Terminating}, + terminateResource, deleteStatusTracker, false) return nil } |