aboutsummaryrefslogtreecommitdiffstats
path: root/src/rsync
diff options
context:
space:
mode:
authorRitu Sood <ritu.sood@intel.com>2020-07-23 15:56:20 -0700
committerEric Multanen <eric.w.multanen@intel.com>2020-08-07 12:02:19 -0700
commite7061c31f693f0ee60040a67baaa3935c64786cb (patch)
tree50041297913d2f495955b8f1037fbac8cc5b4b40 /src/rsync
parent2f910c3b1b4370cf8018dd82836ed97c5a5e7027 (diff)
Rsync change behaviour on error handling
If error in any resource stop processing and end all goroutines. Also return gRpc call after starting the goroutine. Adds retry checks also Issue-ID: MULTICLOUD-1005 Signed-off-by: Ritu Sood <ritu.sood@intel.com> Change-Id: I1189e02f0c0426181fdc995a0c4816ceaa64ec7d
Diffstat (limited to 'src/rsync')
-rw-r--r--src/rsync/go.mod3
-rw-r--r--src/rsync/pkg/context/context.go122
2 files changed, 76 insertions, 49 deletions
diff --git a/src/rsync/go.mod b/src/rsync/go.mod
index 18fef6be..2072510a 100644
--- a/src/rsync/go.mod
+++ b/src/rsync/go.mod
@@ -5,6 +5,7 @@ go 1.13
require (
//client
github.com/evanphx/json-patch v4.5.0+incompatible // indirect
+ github.com/ghodss/yaml v1.0.0
github.com/golang/protobuf v1.4.1
github.com/googleapis/gnostic v0.4.0
github.com/jonboulle/clockwork v0.1.0
@@ -29,8 +30,8 @@ require (
replace (
github.com/onap/multicloud-k8s/src/clm => ../clm
- github.com/onap/multicloud-k8s/src/orchestrator => ../orchestrator
github.com/onap/multicloud-k8s/src/monitor => ../monitor
+ github.com/onap/multicloud-k8s/src/orchestrator => ../orchestrator
k8s.io/api => k8s.io/api v0.17.3
k8s.io/apiextensions-apiserver => k8s.io/apiextensions-apiserver v0.17.3
k8s.io/apimachinery => k8s.io/apimachinery v0.17.3
diff --git a/src/rsync/pkg/context/context.go b/src/rsync/pkg/context/context.go
index cc7773b8..2fadfd00 100644
--- a/src/rsync/pkg/context/context.go
+++ b/src/rsync/pkg/context/context.go
@@ -22,6 +22,7 @@ import (
"fmt"
"log"
"strings"
+ "time"
"github.com/onap/multicloud-k8s/src/orchestrator/pkg/appcontext"
"github.com/onap/multicloud-k8s/src/orchestrator/pkg/infra/logutils"
@@ -129,6 +130,25 @@ func instantiateResource(ac appcontext.AppContext, c *kubeclient.Client, name st
return nil
}
+// Wait for 2 secs
+const waitTime = 2
+
+func waitForClusterReady(c *kubeclient.Client, cluster string) error {
+ for {
+ if err := c.IsReachable(); err != nil {
+ // TODO: Add more realistic error checking
+ // TODO: Add Incremental wait logic here
+ time.Sleep(waitTime * time.Second)
+ } else {
+ break
+ }
+ }
+ logutils.Info("Cluster is reachable::", logutils.Fields{
+ "cluster": cluster,
+ })
+ return nil
+}
+
func addStatusTracker(c *kubeclient.Client, app string, cluster string, label string) error {
b, err := status.GetStatusCR(label)
@@ -139,6 +159,7 @@ func addStatusTracker(c *kubeclient.Client, app string, cluster string, label st
})
return err
}
+ // TODO: Check reachability?
if err = c.Apply(b); err != nil {
logutils.Error("Failed to apply status tracker", logutils.Fields{
"error": err,
@@ -185,9 +206,12 @@ type fn func(ac appcontext.AppContext, client *kubeclient.Client, res string, ap
type statusfn func(client *kubeclient.Client, app string, cluster string, label string) error
-func applyFnComApp(cid interface{}, con *connector.Connector, f fn, sfn statusfn, breakonError bool) error {
+func applyFnComApp(cid interface{}, f fn, sfn statusfn, breakonError bool) error {
+
+ con := connector.Init(cid)
+ //Cleanup
+ defer con.RemoveClient()
ac := appcontext.AppContext{}
- g, _ := errgroup.WithContext(context.Background())
_, err := ac.LoadAppContext(cid)
if err != nil {
return err
@@ -203,9 +227,9 @@ func applyFnComApp(cid interface{}, con *connector.Connector, f fn, sfn statusfn
"string": appList,
})
id, _ := ac.GetCompositeAppHandle()
-
+ g, _ := errgroup.WithContext(context.Background())
+ // Iterate over all the subapps
for _, app := range appList["apporder"] {
-
appName := app
results := strings.Split(id.(string), "/")
label := results[2] + "-" + app
@@ -214,14 +238,14 @@ func applyFnComApp(cid interface{}, con *connector.Connector, f fn, sfn statusfn
if err != nil {
return err
}
- rg, _ := errgroup.WithContext(context.Background())
+ // Iterate over all clusters
for k := 0; k < len(clusterNames); k++ {
cluster := clusterNames[k]
err = status.StartClusterWatcher(cluster)
if err != nil {
log.Printf("Error starting Cluster Watcher %v: %v\n", cluster, err)
}
- rg.Go(func() error {
+ g.Go(func() error {
c, err := con.GetClient(cluster)
if err != nil {
logutils.Error("Error in creating kubeconfig client", logutils.Fields{
@@ -238,42 +262,58 @@ func applyFnComApp(cid interface{}, con *connector.Connector, f fn, sfn statusfn
}
var aov map[string][]string
json.Unmarshal([]byte(resorder.(string)), &aov)
- for i, res := range aov["resorder"] {
- err = f(ac, c, res, appName, cluster, label)
+ // Keep retrying for reachability
+ for {
+ // Wait for cluster to be reachable
+ err = waitForClusterReady(c, cluster)
if err != nil {
- logutils.Error("Error in resource %s: %v", logutils.Fields{
- "error": err,
- "cluster": cluster,
- "resource": res,
- })
- if breakonError {
- // handle status tracking before exiting if at least one resource got handled
- if i > 0 {
- serr := sfn(c, appName, cluster, label)
- if serr != nil {
- logutils.Warn("Error handling status tracker", logutils.Fields{"error": serr})
+ // TODO: Add error handling
+ return err
+ }
+ reachable := true
+ // Handle all resources in order
+ for i, res := range aov["resorder"] {
+ err = f(ac, c, res, appName, cluster, label)
+ if err != nil {
+ logutils.Error("Error in resource %s: %v", logutils.Fields{
+ "error": err,
+ "cluster": cluster,
+ "resource": res,
+ })
+ // If failure is due to reachability issues start retrying
+ if err = c.IsReachable(); err != nil {
+ reachable = false
+ break
+ }
+ if breakonError {
+ // handle status tracking before exiting if at least one resource got handled
+ if i > 0 {
+ serr := sfn(c, appName, cluster, label)
+ if serr != nil {
+ logutils.Warn("Error handling status tracker", logutils.Fields{"error": serr})
+ }
}
+ return err
}
- return err
}
}
- }
- serr := sfn(c, appName, cluster, label)
- if serr != nil {
- logutils.Warn("Error handling status tracker", logutils.Fields{"error": serr})
+ // Check if the break from loop due to reachabilty issues
+ if reachable != false {
+ serr := sfn(c, appName, cluster, label)
+ if serr != nil {
+ logutils.Warn("Error handling status tracker", logutils.Fields{"error": serr})
+ }
+ // Done processing cluster without errors
+ return nil
+ }
}
return nil
})
}
- if err := rg.Wait(); err != nil {
- logutils.Error("Encountered error in App cluster", logutils.Fields{
- "error": err,
- })
- return err
- }
return nil
})
}
+ // Wait for all subtasks to complete
if err := g.Wait(); err != nil {
logutils.Error("Encountered error", logutils.Fields{
"error": err,
@@ -285,28 +325,14 @@ func applyFnComApp(cid interface{}, con *connector.Connector, f fn, sfn statusfn
// InstantiateComApp Instantiate Apps in Composite App
func (instca *CompositeAppContext) InstantiateComApp(cid interface{}) error {
- con := connector.Init(cid)
- err := applyFnComApp(cid, con, instantiateResource, addStatusTracker, true)
- if err != nil {
- logutils.Error("InstantiateComApp unsuccessful", logutils.Fields{"error": err})
- return err
- }
- //Cleanup
- con.RemoveClient()
+ // Start handling and return grpc immediately
+ go applyFnComApp(cid, instantiateResource, addStatusTracker, true)
return nil
}
// TerminateComApp Terminates Apps in Composite App
func (instca *CompositeAppContext) TerminateComApp(cid interface{}) error {
- con := connector.Init(cid)
- err := applyFnComApp(cid, con, terminateResource, deleteStatusTracker, false)
- if err != nil {
- logutils.Error("TerminateComApp unsuccessful", logutils.Fields{
- "error": err,
- })
- return err
- }
- //Cleanup
- con.RemoveClient()
+ // Start handling and return grpc immediately
+ go applyFnComApp(cid, terminateResource, deleteStatusTracker, true)
return nil
}