summaryrefslogtreecommitdiffstats
path: root/src/rsync/pkg/context
diff options
context:
space:
mode:
Diffstat (limited to 'src/rsync/pkg/context')
-rw-r--r--src/rsync/pkg/context/context.go303
1 files changed, 244 insertions, 59 deletions
diff --git a/src/rsync/pkg/context/context.go b/src/rsync/pkg/context/context.go
index cc7773b8..f77482e6 100644
--- a/src/rsync/pkg/context/context.go
+++ b/src/rsync/pkg/context/context.go
@@ -20,11 +20,12 @@ import (
"context"
"encoding/json"
"fmt"
- "log"
"strings"
+ "time"
"github.com/onap/multicloud-k8s/src/orchestrator/pkg/appcontext"
"github.com/onap/multicloud-k8s/src/orchestrator/pkg/infra/logutils"
+ "github.com/onap/multicloud-k8s/src/orchestrator/pkg/resourcestatus"
kubeclient "github.com/onap/multicloud-k8s/src/rsync/pkg/client"
connector "github.com/onap/multicloud-k8s/src/rsync/pkg/connector"
utils "github.com/onap/multicloud-k8s/src/rsync/pkg/internal"
@@ -38,40 +39,49 @@ type CompositeAppContext struct {
cid interface{}
}
-func getRes(ac appcontext.AppContext, name string, app string, cluster string) ([]byte, error) {
+func getRes(ac appcontext.AppContext, name string, app string, cluster string) ([]byte, interface{}, error) {
var byteRes []byte
rh, err := ac.GetResourceHandle(app, cluster, name)
if err != nil {
- return nil, err
+ return nil, nil, err
+ }
+ sh, err := ac.GetLevelHandle(rh, "status")
+ if err != nil {
+ return nil, nil, err
}
resval, err := ac.GetValue(rh)
if err != nil {
- return nil, err
+ return nil, sh, err
}
if resval != "" {
result := strings.Split(name, "+")
if result[0] == "" {
- return nil, pkgerrors.Errorf("Resource name is nil %s:", name)
+ return nil, sh, pkgerrors.Errorf("Resource name is nil %s:", name)
}
byteRes = []byte(fmt.Sprintf("%v", resval.(interface{})))
} else {
- return nil, pkgerrors.Errorf("Resource value is nil %s", name)
+ return nil, sh, pkgerrors.Errorf("Resource value is nil %s", name)
}
- return byteRes, nil
+ return byteRes, sh, nil
}
func terminateResource(ac appcontext.AppContext, c *kubeclient.Client, name string, app string, cluster string, label string) error {
- res, err := getRes(ac, name, app, cluster)
+ res, sh, err := getRes(ac, name, app, cluster)
if err != nil {
+ if sh != nil {
+ ac.UpdateStatusValue(sh, resourcestatus.ResourceStatus{Status: resourcestatus.RsyncStatusEnum.Failed})
+ }
return err
}
if err := c.Delete(res); err != nil {
+ ac.UpdateStatusValue(sh, resourcestatus.ResourceStatus{Status: resourcestatus.RsyncStatusEnum.Failed})
logutils.Error("Failed to delete res", logutils.Fields{
"error": err,
"resource": name,
})
return err
}
+ ac.UpdateStatusValue(sh, resourcestatus.ResourceStatus{Status: resourcestatus.RsyncStatusEnum.Deleted})
logutils.Info("Deleted::", logutils.Fields{
"cluster": cluster,
"resource": name,
@@ -80,8 +90,11 @@ func terminateResource(ac appcontext.AppContext, c *kubeclient.Client, name stri
}
func instantiateResource(ac appcontext.AppContext, c *kubeclient.Client, name string, app string, cluster string, label string) error {
- res, err := getRes(ac, name, app, cluster)
+ res, sh, err := getRes(ac, name, app, cluster)
if err != nil {
+ if sh != nil {
+ ac.UpdateStatusValue(sh, resourcestatus.ResourceStatus{Status: resourcestatus.RsyncStatusEnum.Failed})
+ }
return err
}
//Decode the yaml to create a runtime.Object
@@ -116,12 +129,14 @@ func instantiateResource(ac appcontext.AppContext, c *kubeclient.Client, name st
return err
}
if err := c.Apply(b); err != nil {
+ ac.UpdateStatusValue(sh, resourcestatus.ResourceStatus{Status: resourcestatus.RsyncStatusEnum.Failed})
logutils.Error("Failed to apply res", logutils.Fields{
"error": err,
"resource": name,
})
return err
}
+ ac.UpdateStatusValue(sh, resourcestatus.ResourceStatus{Status: resourcestatus.RsyncStatusEnum.Applied})
logutils.Info("Installed::", logutils.Fields{
"cluster": cluster,
"resource": name,
@@ -129,6 +144,116 @@ func instantiateResource(ac appcontext.AppContext, c *kubeclient.Client, name st
return nil
}
+// Wait for 2 secs
+const waitTime = 2
+
+func waitForClusterReady(c *kubeclient.Client, cluster string) error {
+ for {
+ if err := c.IsReachable(); err != nil {
+ // TODO: Add more realistic error checking
+ // TODO: Add Incremental wait logic here
+ time.Sleep(waitTime * time.Second)
+ } else {
+ break
+ }
+ }
+ logutils.Info("Cluster is reachable::", logutils.Fields{
+ "cluster": cluster,
+ })
+ return nil
+}
+
+// initializeResourceStatus sets the initial status of every resource appropriately based on the state of the AppContext
+func initializeAppContextStatus(ac appcontext.AppContext, acStatus appcontext.AppContextStatus) error {
+ h, err := ac.GetCompositeAppHandle()
+ if err != nil {
+ return err
+ }
+ sh, err := ac.GetLevelHandle(h, "status")
+ if sh == nil {
+ _, err = ac.AddLevelValue(h, "status", acStatus)
+ } else {
+ err = ac.UpdateValue(sh, acStatus)
+ }
+ if err != nil {
+ return err
+ }
+ return nil
+}
+
+// initializeResourceStatus sets the initial status of every resource appropriately based on the state of the AppContext
+func initializeResourceStatus(ac appcontext.AppContext, acStatus appcontext.AppContextStatus) error {
+ statusPending := resourcestatus.ResourceStatus{
+ Status: resourcestatus.RsyncStatusEnum.Pending,
+ }
+ statusDeleted := resourcestatus.ResourceStatus{
+ Status: resourcestatus.RsyncStatusEnum.Deleted,
+ }
+
+ appsOrder, err := ac.GetAppInstruction("order")
+ if err != nil {
+ return err
+ }
+ var appList map[string][]string
+ json.Unmarshal([]byte(appsOrder.(string)), &appList)
+
+ for _, app := range appList["apporder"] {
+ clusterNames, err := ac.GetClusterNames(app)
+ if err != nil {
+ return err
+ }
+ for k := 0; k < len(clusterNames); k++ {
+ cluster := clusterNames[k]
+ resorder, err := ac.GetResourceInstruction(app, cluster, "order")
+ if err != nil {
+ return err
+ }
+ var aov map[string][]string
+ json.Unmarshal([]byte(resorder.(string)), &aov)
+ for _, res := range aov["resorder"] {
+ rh, err := ac.GetResourceHandle(app, cluster, res)
+ if err != nil {
+ return err
+ }
+ sh, err := ac.GetLevelHandle(rh, "status")
+ if acStatus.Status == appcontext.AppContextStatusEnum.Instantiating {
+ if sh == nil {
+ _, err = ac.AddLevelValue(rh, "status", statusPending)
+ } else {
+ err = ac.UpdateStatusValue(sh, statusPending)
+ }
+ if err != nil {
+ return err
+ }
+ } else if acStatus.Status == appcontext.AppContextStatusEnum.Terminating {
+ if sh == nil {
+ _, err = ac.AddLevelValue(rh, "status", statusDeleted)
+ } else {
+ s, err := ac.GetValue(sh)
+ if err != nil {
+ return err
+ }
+ rStatus := resourcestatus.ResourceStatus{}
+ js, _ := json.Marshal(s)
+ json.Unmarshal(js, &rStatus)
+ if rStatus.Status == resourcestatus.RsyncStatusEnum.Applied {
+ err = ac.UpdateStatusValue(sh, statusPending)
+ } else {
+ err = ac.UpdateStatusValue(sh, statusDeleted)
+ }
+ if err != nil {
+ return err
+ }
+ }
+ } else {
+ return pkgerrors.Errorf("Error intializing AppContext Resource Statuses")
+ }
+ }
+ }
+ }
+ return nil
+}
+
func addStatusTracker(c *kubeclient.Client, app string, cluster string, label string) error {
b, err := status.GetStatusCR(label)
@@ -139,6 +264,7 @@ func addStatusTracker(c *kubeclient.Client, app string, cluster string, label st
})
return err
}
+ // TODO: Check reachability?
if err = c.Apply(b); err != nil {
logutils.Error("Failed to apply status tracker", logutils.Fields{
"error": err,
@@ -181,17 +307,62 @@ func deleteStatusTracker(c *kubeclient.Client, app string, cluster string, label
return nil
}
+func updateEndingAppContextStatus(ac appcontext.AppContext, handle interface{}, failure bool) error {
+ sh, err := ac.GetLevelHandle(handle, "status")
+ if err != nil {
+ return err
+ }
+ s, err := ac.GetValue(sh)
+ if err != nil {
+ return err
+ }
+ acStatus := appcontext.AppContextStatus{}
+ js, _ := json.Marshal(s)
+ json.Unmarshal(js, &acStatus)
+
+ if failure {
+ acStatus.Status = appcontext.AppContextStatusEnum.Failed
+ } else if acStatus.Status == appcontext.AppContextStatusEnum.Instantiating {
+ acStatus.Status = appcontext.AppContextStatusEnum.Instantiated
+ } else if acStatus.Status == appcontext.AppContextStatusEnum.Terminating {
+ acStatus.Status = appcontext.AppContextStatusEnum.Terminated
+ } else {
+ return pkgerrors.Errorf("Invalid AppContextStatus %v", acStatus)
+ }
+
+ err = ac.UpdateValue(sh, acStatus)
+ if err != nil {
+ return err
+ }
+ return nil
+}
+
type fn func(ac appcontext.AppContext, client *kubeclient.Client, res string, app string, cluster string, label string) error
type statusfn func(client *kubeclient.Client, app string, cluster string, label string) error
-func applyFnComApp(cid interface{}, con *connector.Connector, f fn, sfn statusfn, breakonError bool) error {
+func applyFnComApp(cid interface{}, acStatus appcontext.AppContextStatus, f fn, sfn statusfn, breakonError bool) error {
+ con := connector.Init(cid)
+ //Cleanup
+ defer con.RemoveClient()
ac := appcontext.AppContext{}
- g, _ := errgroup.WithContext(context.Background())
- _, err := ac.LoadAppContext(cid)
+ h, err := ac.LoadAppContext(cid)
+ if err != nil {
+ return err
+ }
+
+ // initialize appcontext status
+ err = initializeAppContextStatus(ac, acStatus)
+ if err != nil {
+ return err
+ }
+
+ // initialize the resource status values before proceeding with the function
+ err = initializeResourceStatus(ac, acStatus)
if err != nil {
return err
}
+
appsOrder, err := ac.GetAppInstruction("order")
if err != nil {
return err
@@ -203,9 +374,9 @@ func applyFnComApp(cid interface{}, con *connector.Connector, f fn, sfn statusfn
"string": appList,
})
id, _ := ac.GetCompositeAppHandle()
-
+ g, _ := errgroup.WithContext(context.Background())
+ // Iterate over all the subapps
for _, app := range appList["apporder"] {
-
appName := app
results := strings.Split(id.(string), "/")
label := results[2] + "-" + app
@@ -214,14 +385,17 @@ func applyFnComApp(cid interface{}, con *connector.Connector, f fn, sfn statusfn
if err != nil {
return err
}
- rg, _ := errgroup.WithContext(context.Background())
+ // Iterate over all clusters
for k := 0; k < len(clusterNames); k++ {
cluster := clusterNames[k]
err = status.StartClusterWatcher(cluster)
if err != nil {
- log.Printf("Error starting Cluster Watcher %v: %v\n", cluster, err)
+ logutils.Error("Error starting Cluster Watcher", logutils.Fields{
+ "error": err,
+ "cluster": cluster,
+ })
}
- rg.Go(func() error {
+ g.Go(func() error {
c, err := con.GetClient(cluster)
if err != nil {
logutils.Error("Error in creating kubeconfig client", logutils.Fields{
@@ -238,75 +412,86 @@ func applyFnComApp(cid interface{}, con *connector.Connector, f fn, sfn statusfn
}
var aov map[string][]string
json.Unmarshal([]byte(resorder.(string)), &aov)
- for i, res := range aov["resorder"] {
- err = f(ac, c, res, appName, cluster, label)
+ // Keep retrying for reachability
+ for {
+ // Wait for cluster to be reachable
+ err = waitForClusterReady(c, cluster)
if err != nil {
- logutils.Error("Error in resource %s: %v", logutils.Fields{
- "error": err,
- "cluster": cluster,
- "resource": res,
- })
- if breakonError {
- // handle status tracking before exiting if at least one resource got handled
- if i > 0 {
- serr := sfn(c, appName, cluster, label)
- if serr != nil {
- logutils.Warn("Error handling status tracker", logutils.Fields{"error": serr})
+ // TODO: Add error handling
+ return err
+ }
+ reachable := true
+ // Handle all resources in order
+ for i, res := range aov["resorder"] {
+ err = f(ac, c, res, appName, cluster, label)
+ if err != nil {
+ logutils.Error("Error in resource %s: %v", logutils.Fields{
+ "error": err,
+ "cluster": cluster,
+ "resource": res,
+ })
+ // If failure is due to reachability issues start retrying
+ if err = c.IsReachable(); err != nil {
+ reachable = false
+ break
+ }
+ if breakonError {
+ // handle status tracking before exiting if at least one resource got handled
+ if i > 0 {
+ serr := sfn(c, appName, cluster, label)
+ if serr != nil {
+ logutils.Warn("Error handling status tracker", logutils.Fields{"error": serr})
+ }
}
+ return err
}
- return err
}
}
- }
- serr := sfn(c, appName, cluster, label)
- if serr != nil {
- logutils.Warn("Error handling status tracker", logutils.Fields{"error": serr})
+ // Check if the break from loop due to reachabilty issues
+ if reachable != false {
+ serr := sfn(c, appName, cluster, label)
+ if serr != nil {
+ logutils.Warn("Error handling status tracker", logutils.Fields{"error": serr})
+ }
+ // Done processing cluster without errors
+ return nil
+ }
}
return nil
})
}
- if err := rg.Wait(); err != nil {
- logutils.Error("Encountered error in App cluster", logutils.Fields{
- "error": err,
- })
- return err
- }
return nil
})
}
+ // Wait for all subtasks to complete
if err := g.Wait(); err != nil {
+ uperr := updateEndingAppContextStatus(ac, h, true)
+ if uperr != nil {
+ logutils.Error("Encountered error updating AppContext to Failed status", logutils.Fields{"error": uperr})
+ }
logutils.Error("Encountered error", logutils.Fields{
"error": err,
})
return err
}
+ err = updateEndingAppContextStatus(ac, h, false)
+ if err != nil {
+ logutils.Error("Encountered error updating AppContext status", logutils.Fields{"error": err})
+ return err
+ }
return nil
}
// InstantiateComApp Instantiate Apps in Composite App
func (instca *CompositeAppContext) InstantiateComApp(cid interface{}) error {
- con := connector.Init(cid)
- err := applyFnComApp(cid, con, instantiateResource, addStatusTracker, true)
- if err != nil {
- logutils.Error("InstantiateComApp unsuccessful", logutils.Fields{"error": err})
- return err
- }
- //Cleanup
- con.RemoveClient()
+ go applyFnComApp(cid, appcontext.AppContextStatus{Status: appcontext.AppContextStatusEnum.Instantiating},
+ instantiateResource, addStatusTracker, true)
return nil
}
// TerminateComApp Terminates Apps in Composite App
func (instca *CompositeAppContext) TerminateComApp(cid interface{}) error {
- con := connector.Init(cid)
- err := applyFnComApp(cid, con, terminateResource, deleteStatusTracker, false)
- if err != nil {
- logutils.Error("TerminateComApp unsuccessful", logutils.Fields{
- "error": err,
- })
- return err
- }
- //Cleanup
- con.RemoveClient()
+ go applyFnComApp(cid, appcontext.AppContextStatus{Status: appcontext.AppContextStatusEnum.Terminating},
+ terminateResource, deleteStatusTracker, false)
return nil
}