summaryrefslogtreecommitdiffstats
path: root/src/rsync/pkg
diff options
context:
space:
mode:
authorEric Multanen <eric.w.multanen@intel.com>2020-08-07 12:04:15 -0700
committerEric Multanen <eric.w.multanen@intel.com>2020-08-11 19:30:59 -0700
commit709d6d17a3b2f8bc9d46034295bd7c5a7fb76107 (patch)
treec028ad152f5cf50e4991ba1388561b2c83f1fca8 /src/rsync/pkg
parente7061c31f693f0ee60040a67baaa3935c64786cb (diff)
Add appcontext state, status and resource status
Add support in the AppContext for managing an AppContext (composite app level) status value. Also adds support for tracking rsync status at the resource level. A mechanism for tracking history at the controlling resource level (i.e. DeploymentGroupIntnt or Cluster) is added, in part, so that all AppContexts associated can be deleted when the resource is eventually deleted. Issue-ID: MULTICLOUD-1042 Change-Id: I3d0a9a97ea45ca11f9f873104476e4b67521e56a Signed-off-by: Eric Multanen <eric.w.multanen@intel.com>
Diffstat (limited to 'src/rsync/pkg')
-rw-r--r--src/rsync/pkg/context/context.go193
-rw-r--r--src/rsync/pkg/grpc/installappserver/installappserver.go13
-rw-r--r--src/rsync/pkg/status/status.go12
3 files changed, 190 insertions, 28 deletions
diff --git a/src/rsync/pkg/context/context.go b/src/rsync/pkg/context/context.go
index 2fadfd00..f77482e6 100644
--- a/src/rsync/pkg/context/context.go
+++ b/src/rsync/pkg/context/context.go
@@ -20,12 +20,12 @@ import (
"context"
"encoding/json"
"fmt"
- "log"
"strings"
"time"
"github.com/onap/multicloud-k8s/src/orchestrator/pkg/appcontext"
"github.com/onap/multicloud-k8s/src/orchestrator/pkg/infra/logutils"
+ "github.com/onap/multicloud-k8s/src/orchestrator/pkg/resourcestatus"
kubeclient "github.com/onap/multicloud-k8s/src/rsync/pkg/client"
connector "github.com/onap/multicloud-k8s/src/rsync/pkg/connector"
utils "github.com/onap/multicloud-k8s/src/rsync/pkg/internal"
@@ -39,40 +39,49 @@ type CompositeAppContext struct {
cid interface{}
}
-func getRes(ac appcontext.AppContext, name string, app string, cluster string) ([]byte, error) {
+func getRes(ac appcontext.AppContext, name string, app string, cluster string) ([]byte, interface{}, error) {
var byteRes []byte
rh, err := ac.GetResourceHandle(app, cluster, name)
if err != nil {
- return nil, err
+ return nil, nil, err
+ }
+ sh, err := ac.GetLevelHandle(rh, "status")
+ if err != nil {
+ return nil, nil, err
}
resval, err := ac.GetValue(rh)
if err != nil {
- return nil, err
+ return nil, sh, err
}
if resval != "" {
result := strings.Split(name, "+")
if result[0] == "" {
- return nil, pkgerrors.Errorf("Resource name is nil %s:", name)
+ return nil, sh, pkgerrors.Errorf("Resource name is nil %s:", name)
}
byteRes = []byte(fmt.Sprintf("%v", resval.(interface{})))
} else {
- return nil, pkgerrors.Errorf("Resource value is nil %s", name)
+ return nil, sh, pkgerrors.Errorf("Resource value is nil %s", name)
}
- return byteRes, nil
+ return byteRes, sh, nil
}
func terminateResource(ac appcontext.AppContext, c *kubeclient.Client, name string, app string, cluster string, label string) error {
- res, err := getRes(ac, name, app, cluster)
+ res, sh, err := getRes(ac, name, app, cluster)
if err != nil {
+ if sh != nil {
+ ac.UpdateStatusValue(sh, resourcestatus.ResourceStatus{Status: resourcestatus.RsyncStatusEnum.Failed})
+ }
return err
}
if err := c.Delete(res); err != nil {
+ ac.UpdateStatusValue(sh, resourcestatus.ResourceStatus{Status: resourcestatus.RsyncStatusEnum.Failed})
logutils.Error("Failed to delete res", logutils.Fields{
"error": err,
"resource": name,
})
return err
}
+ ac.UpdateStatusValue(sh, resourcestatus.ResourceStatus{Status: resourcestatus.RsyncStatusEnum.Deleted})
logutils.Info("Deleted::", logutils.Fields{
"cluster": cluster,
"resource": name,
@@ -81,8 +90,11 @@ func terminateResource(ac appcontext.AppContext, c *kubeclient.Client, name stri
}
func instantiateResource(ac appcontext.AppContext, c *kubeclient.Client, name string, app string, cluster string, label string) error {
- res, err := getRes(ac, name, app, cluster)
+ res, sh, err := getRes(ac, name, app, cluster)
if err != nil {
+ if sh != nil {
+ ac.UpdateStatusValue(sh, resourcestatus.ResourceStatus{Status: resourcestatus.RsyncStatusEnum.Failed})
+ }
return err
}
//Decode the yaml to create a runtime.Object
@@ -117,12 +129,14 @@ func instantiateResource(ac appcontext.AppContext, c *kubeclient.Client, name st
return err
}
if err := c.Apply(b); err != nil {
+ ac.UpdateStatusValue(sh, resourcestatus.ResourceStatus{Status: resourcestatus.RsyncStatusEnum.Failed})
logutils.Error("Failed to apply res", logutils.Fields{
"error": err,
"resource": name,
})
return err
}
+ ac.UpdateStatusValue(sh, resourcestatus.ResourceStatus{Status: resourcestatus.RsyncStatusEnum.Applied})
logutils.Info("Installed::", logutils.Fields{
"cluster": cluster,
"resource": name,
@@ -149,6 +163,97 @@ func waitForClusterReady(c *kubeclient.Client, cluster string) error {
return nil
}
+// initializeResourceStatus sets the initial status of every resource appropriately based on the state of the AppContext
+func initializeAppContextStatus(ac appcontext.AppContext, acStatus appcontext.AppContextStatus) error {
+ h, err := ac.GetCompositeAppHandle()
+ if err != nil {
+ return err
+ }
+ sh, err := ac.GetLevelHandle(h, "status")
+ if sh == nil {
+ _, err = ac.AddLevelValue(h, "status", acStatus)
+ } else {
+ err = ac.UpdateValue(sh, acStatus)
+ }
+ if err != nil {
+ return err
+ }
+ return nil
+}
+
+// initializeResourceStatus sets the initial status of every resource appropriately based on the state of the AppContext
+func initializeResourceStatus(ac appcontext.AppContext, acStatus appcontext.AppContextStatus) error {
+ statusPending := resourcestatus.ResourceStatus{
+ Status: resourcestatus.RsyncStatusEnum.Pending,
+ }
+ statusDeleted := resourcestatus.ResourceStatus{
+ Status: resourcestatus.RsyncStatusEnum.Deleted,
+ }
+
+ appsOrder, err := ac.GetAppInstruction("order")
+ if err != nil {
+ return err
+ }
+ var appList map[string][]string
+ json.Unmarshal([]byte(appsOrder.(string)), &appList)
+
+ for _, app := range appList["apporder"] {
+ clusterNames, err := ac.GetClusterNames(app)
+ if err != nil {
+ return err
+ }
+ for k := 0; k < len(clusterNames); k++ {
+ cluster := clusterNames[k]
+ resorder, err := ac.GetResourceInstruction(app, cluster, "order")
+ if err != nil {
+ return err
+ }
+ var aov map[string][]string
+ json.Unmarshal([]byte(resorder.(string)), &aov)
+ for _, res := range aov["resorder"] {
+ rh, err := ac.GetResourceHandle(app, cluster, res)
+ if err != nil {
+ return err
+ }
+ sh, err := ac.GetLevelHandle(rh, "status")
+ if acStatus.Status == appcontext.AppContextStatusEnum.Instantiating {
+ if sh == nil {
+ _, err = ac.AddLevelValue(rh, "status", statusPending)
+ } else {
+ err = ac.UpdateStatusValue(sh, statusPending)
+ }
+ if err != nil {
+ return err
+ }
+ } else if acStatus.Status == appcontext.AppContextStatusEnum.Terminating {
+ if sh == nil {
+ _, err = ac.AddLevelValue(rh, "status", statusDeleted)
+ } else {
+ s, err := ac.GetValue(sh)
+ if err != nil {
+ return err
+ }
+ rStatus := resourcestatus.ResourceStatus{}
+ js, _ := json.Marshal(s)
+ json.Unmarshal(js, &rStatus)
+ if rStatus.Status == resourcestatus.RsyncStatusEnum.Applied {
+ err = ac.UpdateStatusValue(sh, statusPending)
+ } else {
+ err = ac.UpdateStatusValue(sh, statusDeleted)
+ }
+ if err != nil {
+ return err
+ }
+ }
+ } else {
+ return pkgerrors.Errorf("Error intializing AppContext Resource Statuses")
+ }
+ }
+ }
+ }
+ return nil
+}
+
func addStatusTracker(c *kubeclient.Client, app string, cluster string, label string) error {
b, err := status.GetStatusCR(label)
@@ -202,20 +307,62 @@ func deleteStatusTracker(c *kubeclient.Client, app string, cluster string, label
return nil
}
+func updateEndingAppContextStatus(ac appcontext.AppContext, handle interface{}, failure bool) error {
+ sh, err := ac.GetLevelHandle(handle, "status")
+ if err != nil {
+ return err
+ }
+ s, err := ac.GetValue(sh)
+ if err != nil {
+ return err
+ }
+ acStatus := appcontext.AppContextStatus{}
+ js, _ := json.Marshal(s)
+ json.Unmarshal(js, &acStatus)
+
+ if failure {
+ acStatus.Status = appcontext.AppContextStatusEnum.Failed
+ } else if acStatus.Status == appcontext.AppContextStatusEnum.Instantiating {
+ acStatus.Status = appcontext.AppContextStatusEnum.Instantiated
+ } else if acStatus.Status == appcontext.AppContextStatusEnum.Terminating {
+ acStatus.Status = appcontext.AppContextStatusEnum.Terminated
+ } else {
+ return pkgerrors.Errorf("Invalid AppContextStatus %v", acStatus)
+ }
+
+ err = ac.UpdateValue(sh, acStatus)
+ if err != nil {
+ return err
+ }
+ return nil
+}
+
type fn func(ac appcontext.AppContext, client *kubeclient.Client, res string, app string, cluster string, label string) error
type statusfn func(client *kubeclient.Client, app string, cluster string, label string) error
-func applyFnComApp(cid interface{}, f fn, sfn statusfn, breakonError bool) error {
-
+func applyFnComApp(cid interface{}, acStatus appcontext.AppContextStatus, f fn, sfn statusfn, breakonError bool) error {
con := connector.Init(cid)
//Cleanup
defer con.RemoveClient()
ac := appcontext.AppContext{}
- _, err := ac.LoadAppContext(cid)
+ h, err := ac.LoadAppContext(cid)
+ if err != nil {
+ return err
+ }
+
+ // initialize appcontext status
+ err = initializeAppContextStatus(ac, acStatus)
+ if err != nil {
+ return err
+ }
+
+ // initialize the resource status values before proceeding with the function
+ err = initializeResourceStatus(ac, acStatus)
if err != nil {
return err
}
+
appsOrder, err := ac.GetAppInstruction("order")
if err != nil {
return err
@@ -243,7 +390,10 @@ func applyFnComApp(cid interface{}, f fn, sfn statusfn, breakonError bool) error
cluster := clusterNames[k]
err = status.StartClusterWatcher(cluster)
if err != nil {
- log.Printf("Error starting Cluster Watcher %v: %v\n", cluster, err)
+ logutils.Error("Error starting Cluster Watcher", logutils.Fields{
+ "error": err,
+ "cluster": cluster,
+ })
}
g.Go(func() error {
c, err := con.GetClient(cluster)
@@ -315,24 +465,33 @@ func applyFnComApp(cid interface{}, f fn, sfn statusfn, breakonError bool) error
}
// Wait for all subtasks to complete
if err := g.Wait(); err != nil {
+ uperr := updateEndingAppContextStatus(ac, h, true)
+ if uperr != nil {
+ logutils.Error("Encountered error updating AppContext to Failed status", logutils.Fields{"error": uperr})
+ }
logutils.Error("Encountered error", logutils.Fields{
"error": err,
})
return err
}
+ err = updateEndingAppContextStatus(ac, h, false)
+ if err != nil {
+ logutils.Error("Encountered error updating AppContext status", logutils.Fields{"error": err})
+ return err
+ }
return nil
}
// InstantiateComApp Instantiate Apps in Composite App
func (instca *CompositeAppContext) InstantiateComApp(cid interface{}) error {
- // Start handling and return grpc immediately
- go applyFnComApp(cid, instantiateResource, addStatusTracker, true)
+ go applyFnComApp(cid, appcontext.AppContextStatus{Status: appcontext.AppContextStatusEnum.Instantiating},
+ instantiateResource, addStatusTracker, true)
return nil
}
// TerminateComApp Terminates Apps in Composite App
func (instca *CompositeAppContext) TerminateComApp(cid interface{}) error {
- // Start handling and return grpc immediately
- go applyFnComApp(cid, terminateResource, deleteStatusTracker, true)
+ go applyFnComApp(cid, appcontext.AppContextStatus{Status: appcontext.AppContextStatusEnum.Terminating},
+ terminateResource, deleteStatusTracker, false)
return nil
}
diff --git a/src/rsync/pkg/grpc/installappserver/installappserver.go b/src/rsync/pkg/grpc/installappserver/installappserver.go
index 68118ade..d70000c0 100644
--- a/src/rsync/pkg/grpc/installappserver/installappserver.go
+++ b/src/rsync/pkg/grpc/installappserver/installappserver.go
@@ -17,8 +17,9 @@ import (
"context"
"encoding/json"
"log"
- "github.com/onap/multicloud-k8s/src/rsync/pkg/grpc/installapp"
+
con "github.com/onap/multicloud-k8s/src/rsync/pkg/context"
+ "github.com/onap/multicloud-k8s/src/rsync/pkg/grpc/installapp"
)
type installappServer struct {
@@ -29,17 +30,17 @@ func (cs *installappServer) InstallApp(ctx context.Context, req *installapp.Inst
installAppReq, _ := json.Marshal(req)
log.Println("GRPC Server received installAppRequest: ", string(installAppReq))
- // Try instantiate the comp app
+ // Try instantiate the comp app
instca := con.CompositeAppContext{}
- err := instca.InstantiateComApp(req.GetAppContext())
- if err != nil {
- log.Println("Instantiation failed: " + err.Error())
+ err := instca.InstantiateComApp(req.GetAppContext())
+ if err != nil {
+ log.Println("Instantiation failed: " + err.Error())
err := instca.TerminateComApp(req.GetAppContext())
if err != nil {
log.Println("Termination failed: " + err.Error())
}
return &installapp.InstallAppResponse{AppContextInstalled: false}, err
- }
+ }
return &installapp.InstallAppResponse{AppContextInstalled: true}, nil
}
diff --git a/src/rsync/pkg/status/status.go b/src/rsync/pkg/status/status.go
index 8c1e12be..74334278 100644
--- a/src/rsync/pkg/status/status.go
+++ b/src/rsync/pkg/status/status.go
@@ -81,15 +81,17 @@ func HandleStatusUpdate(clusterId string, id string, v *v1alpha1.ResourceBundleS
return
}
+ chandle, err := ac.GetClusterHandle(result[1], clusterId)
+ if err != nil {
+ logrus.Info(clusterId, "::Error getting cluster handle::", err)
+ return
+ }
// Get the handle for the context/app/cluster status object
- handle, _ := ac.GetStatusHandle(result[1], clusterId)
+ handle, _ := ac.GetLevelHandle(chandle, "status")
// If status handle was not found, then create the status object in the appcontext
if handle == nil {
- chandle, err := ac.GetClusterHandle(result[1], clusterId)
- if err == nil {
- ac.AddStatus(chandle, string(vjson))
- }
+ ac.AddLevelValue(chandle, "status", string(vjson))
} else {
ac.UpdateStatusValue(handle, string(vjson))
}