diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/rsync/go.mod | 3 | ||||
-rw-r--r-- | src/rsync/pkg/context/context.go | 122 |
2 files changed, 76 insertions, 49 deletions
diff --git a/src/rsync/go.mod b/src/rsync/go.mod index 18fef6be..2072510a 100644 --- a/src/rsync/go.mod +++ b/src/rsync/go.mod @@ -5,6 +5,7 @@ go 1.13 require ( //client github.com/evanphx/json-patch v4.5.0+incompatible // indirect + github.com/ghodss/yaml v1.0.0 github.com/golang/protobuf v1.4.1 github.com/googleapis/gnostic v0.4.0 github.com/jonboulle/clockwork v0.1.0 @@ -29,8 +30,8 @@ require ( replace ( github.com/onap/multicloud-k8s/src/clm => ../clm - github.com/onap/multicloud-k8s/src/orchestrator => ../orchestrator github.com/onap/multicloud-k8s/src/monitor => ../monitor + github.com/onap/multicloud-k8s/src/orchestrator => ../orchestrator k8s.io/api => k8s.io/api v0.17.3 k8s.io/apiextensions-apiserver => k8s.io/apiextensions-apiserver v0.17.3 k8s.io/apimachinery => k8s.io/apimachinery v0.17.3 diff --git a/src/rsync/pkg/context/context.go b/src/rsync/pkg/context/context.go index cc7773b8..2fadfd00 100644 --- a/src/rsync/pkg/context/context.go +++ b/src/rsync/pkg/context/context.go @@ -22,6 +22,7 @@ import ( "fmt" "log" "strings" + "time" "github.com/onap/multicloud-k8s/src/orchestrator/pkg/appcontext" "github.com/onap/multicloud-k8s/src/orchestrator/pkg/infra/logutils" @@ -129,6 +130,25 @@ func instantiateResource(ac appcontext.AppContext, c *kubeclient.Client, name st return nil } +// Wait for 2 secs +const waitTime = 2 + +func waitForClusterReady(c *kubeclient.Client, cluster string) error { + for { + if err := c.IsReachable(); err != nil { + // TODO: Add more realistic error checking + // TODO: Add Incremental wait logic here + time.Sleep(waitTime * time.Second) + } else { + break + } + } + logutils.Info("Cluster is reachable::", logutils.Fields{ + "cluster": cluster, + }) + return nil +} + func addStatusTracker(c *kubeclient.Client, app string, cluster string, label string) error { b, err := status.GetStatusCR(label) @@ -139,6 +159,7 @@ func addStatusTracker(c *kubeclient.Client, app string, cluster string, label st }) return err } + // TODO: Check reachability? if err = c.Apply(b); err != nil { logutils.Error("Failed to apply status tracker", logutils.Fields{ "error": err, @@ -185,9 +206,12 @@ type fn func(ac appcontext.AppContext, client *kubeclient.Client, res string, ap type statusfn func(client *kubeclient.Client, app string, cluster string, label string) error -func applyFnComApp(cid interface{}, con *connector.Connector, f fn, sfn statusfn, breakonError bool) error { +func applyFnComApp(cid interface{}, f fn, sfn statusfn, breakonError bool) error { + + con := connector.Init(cid) + //Cleanup + defer con.RemoveClient() ac := appcontext.AppContext{} - g, _ := errgroup.WithContext(context.Background()) _, err := ac.LoadAppContext(cid) if err != nil { return err @@ -203,9 +227,9 @@ func applyFnComApp(cid interface{}, con *connector.Connector, f fn, sfn statusfn "string": appList, }) id, _ := ac.GetCompositeAppHandle() - + g, _ := errgroup.WithContext(context.Background()) + // Iterate over all the subapps for _, app := range appList["apporder"] { - appName := app results := strings.Split(id.(string), "/") label := results[2] + "-" + app @@ -214,14 +238,14 @@ func applyFnComApp(cid interface{}, con *connector.Connector, f fn, sfn statusfn if err != nil { return err } - rg, _ := errgroup.WithContext(context.Background()) + // Iterate over all clusters for k := 0; k < len(clusterNames); k++ { cluster := clusterNames[k] err = status.StartClusterWatcher(cluster) if err != nil { log.Printf("Error starting Cluster Watcher %v: %v\n", cluster, err) } - rg.Go(func() error { + g.Go(func() error { c, err := con.GetClient(cluster) if err != nil { logutils.Error("Error in creating kubeconfig client", logutils.Fields{ @@ -238,42 +262,58 @@ func applyFnComApp(cid interface{}, con *connector.Connector, f fn, sfn statusfn } var aov map[string][]string json.Unmarshal([]byte(resorder.(string)), &aov) - for i, res := range aov["resorder"] { - err = f(ac, c, res, appName, cluster, label) + // Keep retrying for reachability + for { + // Wait for cluster to be reachable + err = waitForClusterReady(c, cluster) if err != nil { - logutils.Error("Error in resource %s: %v", logutils.Fields{ - "error": err, - "cluster": cluster, - "resource": res, - }) - if breakonError { - // handle status tracking before exiting if at least one resource got handled - if i > 0 { - serr := sfn(c, appName, cluster, label) - if serr != nil { - logutils.Warn("Error handling status tracker", logutils.Fields{"error": serr}) + // TODO: Add error handling + return err + } + reachable := true + // Handle all resources in order + for i, res := range aov["resorder"] { + err = f(ac, c, res, appName, cluster, label) + if err != nil { + logutils.Error("Error in resource %s: %v", logutils.Fields{ + "error": err, + "cluster": cluster, + "resource": res, + }) + // If failure is due to reachability issues start retrying + if err = c.IsReachable(); err != nil { + reachable = false + break + } + if breakonError { + // handle status tracking before exiting if at least one resource got handled + if i > 0 { + serr := sfn(c, appName, cluster, label) + if serr != nil { + logutils.Warn("Error handling status tracker", logutils.Fields{"error": serr}) + } } + return err } - return err } } - } - serr := sfn(c, appName, cluster, label) - if serr != nil { - logutils.Warn("Error handling status tracker", logutils.Fields{"error": serr}) + // Check if the break from loop due to reachabilty issues + if reachable != false { + serr := sfn(c, appName, cluster, label) + if serr != nil { + logutils.Warn("Error handling status tracker", logutils.Fields{"error": serr}) + } + // Done processing cluster without errors + return nil + } } return nil }) } - if err := rg.Wait(); err != nil { - logutils.Error("Encountered error in App cluster", logutils.Fields{ - "error": err, - }) - return err - } return nil }) } + // Wait for all subtasks to complete if err := g.Wait(); err != nil { logutils.Error("Encountered error", logutils.Fields{ "error": err, @@ -285,28 +325,14 @@ func applyFnComApp(cid interface{}, con *connector.Connector, f fn, sfn statusfn // InstantiateComApp Instantiate Apps in Composite App func (instca *CompositeAppContext) InstantiateComApp(cid interface{}) error { - con := connector.Init(cid) - err := applyFnComApp(cid, con, instantiateResource, addStatusTracker, true) - if err != nil { - logutils.Error("InstantiateComApp unsuccessful", logutils.Fields{"error": err}) - return err - } - //Cleanup - con.RemoveClient() + // Start handling and return grpc immediately + go applyFnComApp(cid, instantiateResource, addStatusTracker, true) return nil } // TerminateComApp Terminates Apps in Composite App func (instca *CompositeAppContext) TerminateComApp(cid interface{}) error { - con := connector.Init(cid) - err := applyFnComApp(cid, con, terminateResource, deleteStatusTracker, false) - if err != nil { - logutils.Error("TerminateComApp unsuccessful", logutils.Fields{ - "error": err, - }) - return err - } - //Cleanup - con.RemoveClient() + // Start handling and return grpc immediately + go applyFnComApp(cid, terminateResource, deleteStatusTracker, true) return nil } |