diff options
Diffstat (limited to 'src/rsync')
-rw-r--r-- | src/rsync/go.mod | 3 | ||||
-rw-r--r-- | src/rsync/pkg/context/context.go | 193 | ||||
-rw-r--r-- | src/rsync/pkg/grpc/installappserver/installappserver.go | 13 | ||||
-rw-r--r-- | src/rsync/pkg/status/status.go | 12 |
4 files changed, 191 insertions, 30 deletions
diff --git a/src/rsync/go.mod b/src/rsync/go.mod index 2072510a..973895a3 100644 --- a/src/rsync/go.mod +++ b/src/rsync/go.mod @@ -9,10 +9,9 @@ require ( github.com/golang/protobuf v1.4.1 github.com/googleapis/gnostic v0.4.0 github.com/jonboulle/clockwork v0.1.0 - github.com/modern-go/reflect2 v1.0.1 // indirect github.com/onap/multicloud-k8s/src/clm v0.0.0-00010101000000-000000000000 github.com/onap/multicloud-k8s/src/monitor v0.0.0-20200708223327-9a9a6aedbd7a - github.com/onap/multicloud-k8s/src/orchestrator v0.0.0-20200601021239-7959bd4c6fd4 + github.com/onap/multicloud-k8s/src/orchestrator v0.0.0-20200721211210-783ed87fb39a //github.com/onap/multicloud-k8s/src/orchestrator v0.0.0-20200601021239-7959bd4c6fd4 github.com/pkg/errors v0.9.1 github.com/sirupsen/logrus v1.5.0 diff --git a/src/rsync/pkg/context/context.go b/src/rsync/pkg/context/context.go index 2fadfd00..f77482e6 100644 --- a/src/rsync/pkg/context/context.go +++ b/src/rsync/pkg/context/context.go @@ -20,12 +20,12 @@ import ( "context" "encoding/json" "fmt" - "log" "strings" "time" "github.com/onap/multicloud-k8s/src/orchestrator/pkg/appcontext" "github.com/onap/multicloud-k8s/src/orchestrator/pkg/infra/logutils" + "github.com/onap/multicloud-k8s/src/orchestrator/pkg/resourcestatus" kubeclient "github.com/onap/multicloud-k8s/src/rsync/pkg/client" connector "github.com/onap/multicloud-k8s/src/rsync/pkg/connector" utils "github.com/onap/multicloud-k8s/src/rsync/pkg/internal" @@ -39,40 +39,49 @@ type CompositeAppContext struct { cid interface{} } -func getRes(ac appcontext.AppContext, name string, app string, cluster string) ([]byte, error) { +func getRes(ac appcontext.AppContext, name string, app string, cluster string) ([]byte, interface{}, error) { var byteRes []byte rh, err := ac.GetResourceHandle(app, cluster, name) if err != nil { - return nil, err + return nil, nil, err + } + sh, err := ac.GetLevelHandle(rh, "status") + if err != nil { + return nil, nil, err } resval, err := ac.GetValue(rh) if err != nil { - return nil, err + return nil, sh, err } if resval != "" { result := strings.Split(name, "+") if result[0] == "" { - return nil, pkgerrors.Errorf("Resource name is nil %s:", name) + return nil, sh, pkgerrors.Errorf("Resource name is nil %s:", name) } byteRes = []byte(fmt.Sprintf("%v", resval.(interface{}))) } else { - return nil, pkgerrors.Errorf("Resource value is nil %s", name) + return nil, sh, pkgerrors.Errorf("Resource value is nil %s", name) } - return byteRes, nil + return byteRes, sh, nil } func terminateResource(ac appcontext.AppContext, c *kubeclient.Client, name string, app string, cluster string, label string) error { - res, err := getRes(ac, name, app, cluster) + res, sh, err := getRes(ac, name, app, cluster) if err != nil { + if sh != nil { + ac.UpdateStatusValue(sh, resourcestatus.ResourceStatus{Status: resourcestatus.RsyncStatusEnum.Failed}) + } return err } if err := c.Delete(res); err != nil { + ac.UpdateStatusValue(sh, resourcestatus.ResourceStatus{Status: resourcestatus.RsyncStatusEnum.Failed}) logutils.Error("Failed to delete res", logutils.Fields{ "error": err, "resource": name, }) return err } + ac.UpdateStatusValue(sh, resourcestatus.ResourceStatus{Status: resourcestatus.RsyncStatusEnum.Deleted}) logutils.Info("Deleted::", logutils.Fields{ "cluster": cluster, "resource": name, @@ -81,8 +90,11 @@ func terminateResource(ac appcontext.AppContext, c *kubeclient.Client, name stri } func instantiateResource(ac appcontext.AppContext, c *kubeclient.Client, name string, app string, cluster string, label string) error { - res, err := getRes(ac, name, app, cluster) + res, sh, err := getRes(ac, name, app, cluster) if err != nil { + if sh != nil { + ac.UpdateStatusValue(sh, resourcestatus.ResourceStatus{Status: resourcestatus.RsyncStatusEnum.Failed}) + } return err } //Decode the yaml to create a runtime.Object @@ -117,12 +129,14 @@ func instantiateResource(ac appcontext.AppContext, c *kubeclient.Client, name st return err } if err := c.Apply(b); err != nil { + ac.UpdateStatusValue(sh, resourcestatus.ResourceStatus{Status: resourcestatus.RsyncStatusEnum.Failed}) logutils.Error("Failed to apply res", logutils.Fields{ "error": err, "resource": name, }) return err } + ac.UpdateStatusValue(sh, resourcestatus.ResourceStatus{Status: resourcestatus.RsyncStatusEnum.Applied}) logutils.Info("Installed::", logutils.Fields{ "cluster": cluster, "resource": name, @@ -149,6 +163,97 @@ func waitForClusterReady(c *kubeclient.Client, cluster string) error { return nil } +// initializeResourceStatus sets the initial status of every resource appropriately based on the state of the AppContext +func initializeAppContextStatus(ac appcontext.AppContext, acStatus appcontext.AppContextStatus) error { + h, err := ac.GetCompositeAppHandle() + if err != nil { + return err + } + sh, err := ac.GetLevelHandle(h, "status") + if sh == nil { + _, err = ac.AddLevelValue(h, "status", acStatus) + } else { + err = ac.UpdateValue(sh, acStatus) + } + if err != nil { + return err + } + return nil +} + +// initializeResourceStatus sets the initial status of every resource appropriately based on the state of the AppContext +func initializeResourceStatus(ac appcontext.AppContext, acStatus appcontext.AppContextStatus) error { + statusPending := resourcestatus.ResourceStatus{ + Status: resourcestatus.RsyncStatusEnum.Pending, + } + statusDeleted := resourcestatus.ResourceStatus{ + Status: resourcestatus.RsyncStatusEnum.Deleted, + } + + appsOrder, err := ac.GetAppInstruction("order") + if err != nil { + return err + } + var appList map[string][]string + json.Unmarshal([]byte(appsOrder.(string)), &appList) + + for _, app := range appList["apporder"] { + clusterNames, err := ac.GetClusterNames(app) + if err != nil { + return err + } + for k := 0; k < len(clusterNames); k++ { + cluster := clusterNames[k] + resorder, err := ac.GetResourceInstruction(app, cluster, "order") + if err != nil { + return err + } + var aov map[string][]string + json.Unmarshal([]byte(resorder.(string)), &aov) + for _, res := range aov["resorder"] { + rh, err := ac.GetResourceHandle(app, cluster, res) + if err != nil { + return err + } + sh, err := ac.GetLevelHandle(rh, "status") + if acStatus.Status == appcontext.AppContextStatusEnum.Instantiating { + if sh == nil { + _, err = ac.AddLevelValue(rh, "status", statusPending) + } else { + err = ac.UpdateStatusValue(sh, statusPending) + } + if err != nil { + return err + } + } else if acStatus.Status == appcontext.AppContextStatusEnum.Terminating { + if sh == nil { + _, err = ac.AddLevelValue(rh, "status", statusDeleted) + } else { + s, err := ac.GetValue(sh) + if err != nil { + return err + } + rStatus := resourcestatus.ResourceStatus{} + js, _ := json.Marshal(s) + json.Unmarshal(js, &rStatus) + if rStatus.Status == resourcestatus.RsyncStatusEnum.Applied { + err = ac.UpdateStatusValue(sh, statusPending) + } else { + err = ac.UpdateStatusValue(sh, statusDeleted) + } + if err != nil { + return err + } + } + } else { + return pkgerrors.Errorf("Error intializing AppContext Resource Statuses") + } + } + } + } + return nil +} + func addStatusTracker(c *kubeclient.Client, app string, cluster string, label string) error { b, err := status.GetStatusCR(label) @@ -202,20 +307,62 @@ func deleteStatusTracker(c *kubeclient.Client, app string, cluster string, label return nil } +func updateEndingAppContextStatus(ac appcontext.AppContext, handle interface{}, failure bool) error { + sh, err := ac.GetLevelHandle(handle, "status") + if err != nil { + return err + } + s, err := ac.GetValue(sh) + if err != nil { + return err + } + acStatus := appcontext.AppContextStatus{} + js, _ := json.Marshal(s) + json.Unmarshal(js, &acStatus) + + if failure { + acStatus.Status = appcontext.AppContextStatusEnum.Failed + } else if acStatus.Status == appcontext.AppContextStatusEnum.Instantiating { + acStatus.Status = appcontext.AppContextStatusEnum.Instantiated + } else if acStatus.Status == appcontext.AppContextStatusEnum.Terminating { + acStatus.Status = appcontext.AppContextStatusEnum.Terminated + } else { + return pkgerrors.Errorf("Invalid AppContextStatus %v", acStatus) + } + + err = ac.UpdateValue(sh, acStatus) + if err != nil { + return err + } + return nil +} + type fn func(ac appcontext.AppContext, client *kubeclient.Client, res string, app string, cluster string, label string) error type statusfn func(client *kubeclient.Client, app string, cluster string, label string) error -func applyFnComApp(cid interface{}, f fn, sfn statusfn, breakonError bool) error { - +func applyFnComApp(cid interface{}, acStatus appcontext.AppContextStatus, f fn, sfn statusfn, breakonError bool) error { con := connector.Init(cid) //Cleanup defer con.RemoveClient() ac := appcontext.AppContext{} - _, err := ac.LoadAppContext(cid) + h, err := ac.LoadAppContext(cid) + if err != nil { + return err + } + + // initialize appcontext status + err = initializeAppContextStatus(ac, acStatus) + if err != nil { + return err + } + + // initialize the resource status values before proceeding with the function + err = initializeResourceStatus(ac, acStatus) if err != nil { return err } + appsOrder, err := ac.GetAppInstruction("order") if err != nil { return err @@ -243,7 +390,10 @@ func applyFnComApp(cid interface{}, f fn, sfn statusfn, breakonError bool) error cluster := clusterNames[k] err = status.StartClusterWatcher(cluster) if err != nil { - log.Printf("Error starting Cluster Watcher %v: %v\n", cluster, err) + logutils.Error("Error starting Cluster Watcher", logutils.Fields{ + "error": err, + "cluster": cluster, + }) } g.Go(func() error { c, err := con.GetClient(cluster) @@ -315,24 +465,33 @@ func applyFnComApp(cid interface{}, f fn, sfn statusfn, breakonError bool) error } // Wait for all subtasks to complete if err := g.Wait(); err != nil { + uperr := updateEndingAppContextStatus(ac, h, true) + if uperr != nil { + logutils.Error("Encountered error updating AppContext to Failed status", logutils.Fields{"error": uperr}) + } logutils.Error("Encountered error", logutils.Fields{ "error": err, }) return err } + err = updateEndingAppContextStatus(ac, h, false) + if err != nil { + logutils.Error("Encountered error updating AppContext status", logutils.Fields{"error": err}) + return err + } return nil } // InstantiateComApp Instantiate Apps in Composite App func (instca *CompositeAppContext) InstantiateComApp(cid interface{}) error { - // Start handling and return grpc immediately - go applyFnComApp(cid, instantiateResource, addStatusTracker, true) + go applyFnComApp(cid, appcontext.AppContextStatus{Status: appcontext.AppContextStatusEnum.Instantiating}, + instantiateResource, addStatusTracker, true) return nil } // TerminateComApp Terminates Apps in Composite App func (instca *CompositeAppContext) TerminateComApp(cid interface{}) error { - // Start handling and return grpc immediately - go applyFnComApp(cid, terminateResource, deleteStatusTracker, true) + go applyFnComApp(cid, appcontext.AppContextStatus{Status: appcontext.AppContextStatusEnum.Terminating}, + terminateResource, deleteStatusTracker, false) return nil } diff --git a/src/rsync/pkg/grpc/installappserver/installappserver.go b/src/rsync/pkg/grpc/installappserver/installappserver.go index 68118ade..d70000c0 100644 --- a/src/rsync/pkg/grpc/installappserver/installappserver.go +++ b/src/rsync/pkg/grpc/installappserver/installappserver.go @@ -17,8 +17,9 @@ import ( "context" "encoding/json" "log" - "github.com/onap/multicloud-k8s/src/rsync/pkg/grpc/installapp" + con "github.com/onap/multicloud-k8s/src/rsync/pkg/context" + "github.com/onap/multicloud-k8s/src/rsync/pkg/grpc/installapp" ) type installappServer struct { @@ -29,17 +30,17 @@ func (cs *installappServer) InstallApp(ctx context.Context, req *installapp.Inst installAppReq, _ := json.Marshal(req) log.Println("GRPC Server received installAppRequest: ", string(installAppReq)) - // Try instantiate the comp app + // Try instantiate the comp app instca := con.CompositeAppContext{} - err := instca.InstantiateComApp(req.GetAppContext()) - if err != nil { - log.Println("Instantiation failed: " + err.Error()) + err := instca.InstantiateComApp(req.GetAppContext()) + if err != nil { + log.Println("Instantiation failed: " + err.Error()) err := instca.TerminateComApp(req.GetAppContext()) if err != nil { log.Println("Termination failed: " + err.Error()) } return &installapp.InstallAppResponse{AppContextInstalled: false}, err - } + } return &installapp.InstallAppResponse{AppContextInstalled: true}, nil } diff --git a/src/rsync/pkg/status/status.go b/src/rsync/pkg/status/status.go index 8c1e12be..74334278 100644 --- a/src/rsync/pkg/status/status.go +++ b/src/rsync/pkg/status/status.go @@ -81,15 +81,17 @@ func HandleStatusUpdate(clusterId string, id string, v *v1alpha1.ResourceBundleS return } + chandle, err := ac.GetClusterHandle(result[1], clusterId) + if err != nil { + logrus.Info(clusterId, "::Error getting cluster handle::", err) + return + } // Get the handle for the context/app/cluster status object - handle, _ := ac.GetStatusHandle(result[1], clusterId) + handle, _ := ac.GetLevelHandle(chandle, "status") // If status handle was not found, then create the status object in the appcontext if handle == nil { - chandle, err := ac.GetClusterHandle(result[1], clusterId) - if err == nil { - ac.AddStatus(chandle, string(vjson)) - } + ac.AddLevelValue(chandle, "status", string(vjson)) } else { ac.UpdateStatusValue(handle, string(vjson)) } |