diff options
Diffstat (limited to 'src/rsync/pkg/context/context.go')
-rw-r--r-- | src/rsync/pkg/context/context.go | 381 |
1 files changed, 359 insertions, 22 deletions
diff --git a/src/rsync/pkg/context/context.go b/src/rsync/pkg/context/context.go index f77482e6..4b886ec7 100644 --- a/src/rsync/pkg/context/context.go +++ b/src/rsync/pkg/context/context.go @@ -21,6 +21,7 @@ import ( "encoding/json" "fmt" "strings" + "sync" "time" "github.com/onap/multicloud-k8s/src/orchestrator/pkg/appcontext" @@ -36,7 +37,9 @@ import ( ) type CompositeAppContext struct { - cid interface{} + cid interface{} + chans []chan bool + mutex sync.Mutex } func getRes(ac appcontext.AppContext, name string, app string, cluster string) ([]byte, interface{}, error) { @@ -144,26 +147,150 @@ func instantiateResource(ac appcontext.AppContext, c *kubeclient.Client, name st return nil } +func updateResourceStatus(ac appcontext.AppContext, resState resourcestatus.ResourceStatus, app string, cluster string, aov map[string][]string) error { + + for _, res := range aov["resorder"] { + + rh, err := ac.GetResourceHandle(app, cluster, res) + if err != nil { + return err + } + sh, err := ac.GetLevelHandle(rh, "status") + if err != nil { + return err + } + + s, err := ac.GetValue(sh) + if err != nil { + return err + } + rStatus := resourcestatus.ResourceStatus{} + js, err := json.Marshal(s) + if err != nil { + return err + } + err = json.Unmarshal(js, &rStatus) + if err != nil { + return err + } + // no need to update a status that has reached a 'done' status + if rStatus.Status == resourcestatus.RsyncStatusEnum.Deleted || + rStatus.Status == resourcestatus.RsyncStatusEnum.Applied || + rStatus.Status == resourcestatus.RsyncStatusEnum.Failed { + continue + } + + err = ac.UpdateStatusValue(sh, resState) + if err != nil { + return err + } + } + + return nil + +} + +// return true if all resources have reached a 'done' status - e.g. Applied, Deleted or Failed +func allResourcesDone(ac appcontext.AppContext, app string, cluster string, aov map[string][]string) bool { + + for _, res := range aov["resorder"] { + + rh, err := ac.GetResourceHandle(app, cluster, res) + if err != nil { + return false + } + sh, err := ac.GetLevelHandle(rh, "status") + if err != nil { + return false + } + + s, err := ac.GetValue(sh) + if err != nil { + return false + } + rStatus := resourcestatus.ResourceStatus{} + js, err := json.Marshal(s) + if err != nil { + return false + } + err = json.Unmarshal(js, &rStatus) + if err != nil { + return false + } + if rStatus.Status != resourcestatus.RsyncStatusEnum.Deleted && + rStatus.Status != resourcestatus.RsyncStatusEnum.Applied && + rStatus.Status != resourcestatus.RsyncStatusEnum.Failed { + return false + } + } + + return true + +} + // Wait for 2 secs const waitTime = 2 -func waitForClusterReady(c *kubeclient.Client, cluster string) error { +func waitForClusterReady(instca *CompositeAppContext, ac appcontext.AppContext, c *kubeclient.Client, appname string, cluster string, aov map[string][]string) error { + + forceDone := false + resStateUpdated := false + ch := addChan(instca) + + rch := make(chan error, 1) + checkReachable := func() { + err := c.IsReachable() + rch <- err + } + + go checkReachable() +Loop: for { - if err := c.IsReachable(); err != nil { - // TODO: Add more realistic error checking - // TODO: Add Incremental wait logic here - time.Sleep(waitTime * time.Second) - } else { + select { + case rerr := <-rch: + if rerr == nil { + break Loop + } else { + logutils.Info("Cluster is not reachable - keep trying::", logutils.Fields{"cluster": cluster}) + go checkReachable() + } + case <-ch: + statusFailed := resourcestatus.ResourceStatus{ + Status: resourcestatus.RsyncStatusEnum.Failed, + } + err := updateResourceStatus(ac, statusFailed, appname, cluster, aov) + if err != nil { + deleteChan(instca, ch) + return err + } + forceDone = true + break Loop + case <-time.After(waitTime * time.Second): + // on first timeout - cluster is apparently not reachable, update resources in + // this group to 'Retrying' + if !resStateUpdated { + statusRetrying := resourcestatus.ResourceStatus{ + Status: resourcestatus.RsyncStatusEnum.Retrying, + } + err := updateResourceStatus(ac, statusRetrying, appname, cluster, aov) + if err != nil { + deleteChan(instca, ch) + return err + } + resStateUpdated = true + } break } } - logutils.Info("Cluster is reachable::", logutils.Fields{ - "cluster": cluster, - }) + + deleteChan(instca, ch) + if forceDone { + return pkgerrors.Errorf("Termination of rsync cluster retry: " + cluster) + } return nil } -// initializeResourceStatus sets the initial status of every resource appropriately based on the state of the AppContext +// initializeAppContextStatus sets the initial status of every resource appropriately based on the state of the AppContext func initializeAppContextStatus(ac appcontext.AppContext, acStatus appcontext.AppContextStatus) error { h, err := ac.GetCompositeAppHandle() if err != nil { @@ -320,12 +447,18 @@ func updateEndingAppContextStatus(ac appcontext.AppContext, handle interface{}, js, _ := json.Marshal(s) json.Unmarshal(js, &acStatus) - if failure { - acStatus.Status = appcontext.AppContextStatusEnum.Failed - } else if acStatus.Status == appcontext.AppContextStatusEnum.Instantiating { - acStatus.Status = appcontext.AppContextStatusEnum.Instantiated + if acStatus.Status == appcontext.AppContextStatusEnum.Instantiating { + if failure { + acStatus.Status = appcontext.AppContextStatusEnum.InstantiateFailed + } else { + acStatus.Status = appcontext.AppContextStatusEnum.Instantiated + } } else if acStatus.Status == appcontext.AppContextStatusEnum.Terminating { - acStatus.Status = appcontext.AppContextStatusEnum.Terminated + if failure { + acStatus.Status = appcontext.AppContextStatusEnum.TerminateFailed + } else { + acStatus.Status = appcontext.AppContextStatusEnum.Terminated + } } else { return pkgerrors.Errorf("Invalid AppContextStatus %v", acStatus) } @@ -337,20 +470,197 @@ func updateEndingAppContextStatus(ac appcontext.AppContext, handle interface{}, return nil } +func getAppContextStatus(ac appcontext.AppContext) (*appcontext.AppContextStatus, error) { + + h, err := ac.GetCompositeAppHandle() + if err != nil { + return nil, err + } + sh, err := ac.GetLevelHandle(h, "status") + if err != nil { + return nil, err + } + s, err := ac.GetValue(sh) + if err != nil { + return nil, err + } + acStatus := appcontext.AppContextStatus{} + js, _ := json.Marshal(s) + json.Unmarshal(js, &acStatus) + + return &acStatus, nil + +} + type fn func(ac appcontext.AppContext, client *kubeclient.Client, res string, app string, cluster string, label string) error type statusfn func(client *kubeclient.Client, app string, cluster string, label string) error -func applyFnComApp(cid interface{}, acStatus appcontext.AppContextStatus, f fn, sfn statusfn, breakonError bool) error { - con := connector.Init(cid) +func addChan(instca *CompositeAppContext) chan bool { + + instca.mutex.Lock() + c := make(chan bool) + instca.chans = append(instca.chans, c) + instca.mutex.Unlock() + + return c +} + +func deleteChan(instca *CompositeAppContext, c chan bool) error { + + var i int + instca.mutex.Lock() + for i = 0; i < len(instca.chans); i++ { + if instca.chans[i] == c { + break + } + } + + if i == len(instca.chans) { + instca.mutex.Unlock() + return pkgerrors.Errorf("Given channel was not found:") + } + instca.chans[i] = instca.chans[len(instca.chans)-1] + instca.chans = instca.chans[:len(instca.chans)-1] + instca.mutex.Unlock() + + return nil +} + +func waitForDone(ac appcontext.AppContext) { + count := 0 + for { + time.Sleep(1 * time.Second) + count++ + if count == 60*60 { + logutils.Info("Wait for done watcher running..", logutils.Fields{}) + count = 0 + } + acStatus, _ := getAppContextStatus(ac) + if acStatus.Status == appcontext.AppContextStatusEnum.Instantiated || + acStatus.Status == appcontext.AppContextStatusEnum.InstantiateFailed { + return + } + } + return +} + +func kickoffRetryWatcher(instca *CompositeAppContext, ac appcontext.AppContext, acStatus appcontext.AppContextStatus, wg *errgroup.Group) { + + wg.Go(func() error { + + var count int + + count = 0 + for { + time.Sleep(1 * time.Second) + count++ + if count == 60*60 { + logutils.Info("Retry watcher running..", logutils.Fields{}) + count = 0 + } + + cStatus, err := getAppContextStatus(ac) + if err != nil { + logutils.Error("Failed to get the app context status", logutils.Fields{ + "error": err, + }) + return err + } + flag, err := getAppContextFlag(ac) + if err != nil { + logutils.Error("Failed to get the stop flag", logutils.Fields{ + "error": err, + }) + return err + } else { + if flag == true { + instca.mutex.Lock() + for i := 0; i < len(instca.chans); i++ { + instca.chans[i] <- true + logutils.Info("kickoffRetryWatcher - send an exit message", logutils.Fields{}) + } + instca.mutex.Unlock() + break + } + } + if acStatus.Status == appcontext.AppContextStatusEnum.Instantiating { + if cStatus.Status == appcontext.AppContextStatusEnum.Instantiated || + cStatus.Status == appcontext.AppContextStatusEnum.InstantiateFailed { + break + } + } else { + if cStatus.Status == appcontext.AppContextStatusEnum.Terminated || + cStatus.Status == appcontext.AppContextStatusEnum.TerminateFailed { + break + } + } + + } + return nil + }) + +} + +func getAppContextFlag(ac appcontext.AppContext) (bool, error) { + h, err := ac.GetCompositeAppHandle() + if err != nil { + return false, err + } + sh, err := ac.GetLevelHandle(h, "stopflag") + if sh == nil { + return false, err + } else { + v, err := ac.GetValue(sh) + if err != nil { + return false, err + } else { + return v.(bool), nil + } + } +} + +func updateAppContextFlag(cid interface{}, sf bool) error { + ac := appcontext.AppContext{} + _, err := ac.LoadAppContext(cid) + if err != nil { + return err + } + hc, err := ac.GetCompositeAppHandle() + if err != nil { + return err + } + sh, err := ac.GetLevelHandle(hc, "stopflag") + if sh == nil { + _, err = ac.AddLevelValue(hc, "stopflag", sf) + } else { + err = ac.UpdateValue(sh, sf) + } + if err != nil { + return err + } + return nil +} + +func applyFnComApp(instca *CompositeAppContext, acStatus appcontext.AppContextStatus, f fn, sfn statusfn, breakonError bool) error { + con := connector.Init(instca.cid) //Cleanup defer con.RemoveClient() ac := appcontext.AppContext{} - h, err := ac.LoadAppContext(cid) + h, err := ac.LoadAppContext(instca.cid) if err != nil { return err } + // if terminating, wait for all retrying instantiate threads to exit + if acStatus.Status == appcontext.AppContextStatusEnum.Terminating { + waitForDone(ac) + err := updateAppContextFlag(instca.cid, false) + if err != nil { + return err + } + } + // initialize appcontext status err = initializeAppContextStatus(ac, acStatus) if err != nil { @@ -375,6 +685,8 @@ func applyFnComApp(cid interface{}, acStatus appcontext.AppContextStatus, f fn, }) id, _ := ac.GetCompositeAppHandle() g, _ := errgroup.WithContext(context.Background()) + wg, _ := errgroup.WithContext(context.Background()) + kickoffRetryWatcher(instca, ac, acStatus, wg) // Iterate over all the subapps for _, app := range appList["apporder"] { appName := app @@ -414,8 +726,13 @@ func applyFnComApp(cid interface{}, acStatus appcontext.AppContextStatus, f fn, json.Unmarshal([]byte(resorder.(string)), &aov) // Keep retrying for reachability for { + done := allResourcesDone(ac, appName, cluster, aov) + if done { + break + } + // Wait for cluster to be reachable - err = waitForClusterReady(c, cluster) + err := waitForClusterReady(instca, ac, c, appName, cluster, aov) if err != nil { // TODO: Add error handling return err @@ -479,19 +796,39 @@ func applyFnComApp(cid interface{}, acStatus appcontext.AppContextStatus, f fn, logutils.Error("Encountered error updating AppContext status", logutils.Fields{"error": err}) return err } + if err := wg.Wait(); err != nil { + logutils.Error("Encountered error in watcher thread", logutils.Fields{"error": err}) + return err + } return nil } // InstantiateComApp Instantiate Apps in Composite App func (instca *CompositeAppContext) InstantiateComApp(cid interface{}) error { - go applyFnComApp(cid, appcontext.AppContextStatus{Status: appcontext.AppContextStatusEnum.Instantiating}, + instca.cid = cid + instca.chans = []chan bool{} + instca.mutex = sync.Mutex{} + err := updateAppContextFlag(cid, false) + if err != nil { + logutils.Error("Encountered error updating AppContext flag", logutils.Fields{"error": err}) + return err + } + go applyFnComApp(instca, appcontext.AppContextStatus{Status: appcontext.AppContextStatusEnum.Instantiating}, instantiateResource, addStatusTracker, true) return nil } // TerminateComApp Terminates Apps in Composite App func (instca *CompositeAppContext) TerminateComApp(cid interface{}) error { - go applyFnComApp(cid, appcontext.AppContextStatus{Status: appcontext.AppContextStatusEnum.Terminating}, + instca.cid = cid + instca.chans = []chan bool{} + instca.mutex = sync.Mutex{} + err := updateAppContextFlag(cid, true) + if err != nil { + logutils.Error("Encountered error updating AppContext flag", logutils.Fields{"error": err}) + return err + } + go applyFnComApp(instca, appcontext.AppContextStatus{Status: appcontext.AppContextStatusEnum.Terminating}, terminateResource, deleteStatusTracker, false) return nil } |