diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/clm/pkg/cluster/cluster.go | 36 | ||||
-rw-r--r-- | src/ncm/internal/ovncontroller/ovncontroller.go | 5 | ||||
-rw-r--r-- | src/ncm/pkg/networkintents/network.go | 16 | ||||
-rw-r--r-- | src/ncm/pkg/networkintents/providernet.go | 16 | ||||
-rw-r--r-- | src/ncm/pkg/scheduler/scheduler.go | 120 | ||||
-rw-r--r-- | src/orchestrator/pkg/appcontext/appcontext.go | 120 | ||||
-rw-r--r-- | src/orchestrator/pkg/appcontext/appcontext_test.go | 7 | ||||
-rw-r--r-- | src/orchestrator/pkg/module/deployment_intent_groups.go | 33 | ||||
-rw-r--r-- | src/orchestrator/pkg/module/instantiation.go | 147 | ||||
-rw-r--r-- | src/orchestrator/pkg/module/instantiation_appcontext_helper.go | 12 | ||||
-rw-r--r-- | src/orchestrator/pkg/resourcestatus/resourcestatus.go | 41 | ||||
-rw-r--r-- | src/orchestrator/pkg/rtcontext/rtcontext.go | 21 | ||||
-rw-r--r-- | src/orchestrator/pkg/state/state_helper.go | 48 | ||||
-rw-r--r-- | src/orchestrator/pkg/state/types.go | 12 | ||||
-rw-r--r-- | src/rsync/go.mod | 6 | ||||
-rw-r--r-- | src/rsync/pkg/context/context.go | 303 | ||||
-rw-r--r-- | src/rsync/pkg/grpc/installappserver/installappserver.go | 13 | ||||
-rw-r--r-- | src/rsync/pkg/status/status.go | 12 |
18 files changed, 703 insertions, 265 deletions
diff --git a/src/clm/pkg/cluster/cluster.go b/src/clm/pkg/cluster/cluster.go index 9505bd97..26a9d6df 100644 --- a/src/clm/pkg/cluster/cluster.go +++ b/src/clm/pkg/cluster/cluster.go @@ -17,6 +17,8 @@ package cluster import ( + "time" + "github.com/onap/multicloud-k8s/src/orchestrator/pkg/infra/db" mtypes "github.com/onap/multicloud-k8s/src/orchestrator/pkg/module/types" "github.com/onap/multicloud-k8s/src/orchestrator/pkg/state" @@ -255,12 +257,15 @@ func (v *ClusterClient) CreateCluster(provider string, p Cluster, q ClusterConte } // Add the stateInfo record - stateInfo := state.StateInfo{ + s := state.StateInfo{} + a := state.ActionEntry{ State: state.StateEnum.Created, ContextId: "", + TimeStamp: time.Now(), } + s.Actions = append(s.Actions, a) - err = db.DBconn.Insert(v.db.storeName, key, nil, v.db.tagState, stateInfo) + err = db.DBconn.Insert(v.db.storeName, key, nil, v.db.tagState, s) if err != nil { return Cluster{}, pkgerrors.Wrap(err, "Creating cluster StateInfo") } @@ -403,8 +408,31 @@ func (v *ClusterClient) DeleteCluster(provider, name string) error { ClusterName: name, } s, err := v.GetClusterState(provider, name) - if err == nil && s.State == state.StateEnum.Applied { - return pkgerrors.Errorf("Cluster network intents must be terminated before it can be deleted: " + name) + if err != nil { + return pkgerrors.Errorf("Error getting current state from Cluster: " + name) + } + + stateVal, err := state.GetCurrentStateFromStateInfo(s) + if err != nil { + return pkgerrors.Errorf("Error getting current state from Cluster stateInfo: " + name) + } + + if stateVal == state.StateEnum.Applied { + return pkgerrors.Errorf("Cluster network intents must be terminated before it can be deleted " + name) + } + + // remove the app contexts associated with this cluster + if stateVal == state.StateEnum.Terminated { + for _, id := range state.GetContextIdsFromStateInfo(s) { + context, err := state.GetAppContextFromId(id) + if err != nil { + return pkgerrors.Wrap(err, "Error getting appcontext from Cluster StateInfo") + } + err = context.DeleteCompositeApp() + if err != nil { + return pkgerrors.Wrap(err, "Error deleting appcontext for Cluster") + } + } } err = db.DBconn.Remove(v.db.storeName, key) diff --git a/src/ncm/internal/ovncontroller/ovncontroller.go b/src/ncm/internal/ovncontroller/ovncontroller.go index 125ad6c7..b2fcacd5 100644 --- a/src/ncm/internal/ovncontroller/ovncontroller.go +++ b/src/ncm/internal/ovncontroller/ovncontroller.go @@ -100,7 +100,8 @@ func Apply(ctxVal interface{}, clusterProvider, cluster string) error { return nil } - clusterhandle, _ := ac.GetClusterHandle(nettypes.CONTEXT_CLUSTER_APP, clusterProvider+nettypes.SEPARATOR+cluster) + acCluster := clusterProvider + nettypes.SEPARATOR + cluster + clusterhandle, _ := ac.GetClusterHandle(nettypes.CONTEXT_CLUSTER_APP, acCluster) var orderinstr struct { Resorder []string `json:"resorder"` @@ -112,7 +113,7 @@ func Apply(ctxVal interface{}, clusterProvider, cluster string) error { for _, resource := range resources { orderinstr.Resorder = append(orderinstr.Resorder, resource.name) resdep[resource.name] = "go" - _, err = ac.AddResource(clusterhandle, resource.name, resource.value) + _, err := ac.AddResource(clusterhandle, resource.name, resource.value) if err != nil { cleanuperr := ac.DeleteCompositeApp() if cleanuperr != nil { diff --git a/src/ncm/pkg/networkintents/network.go b/src/ncm/pkg/networkintents/network.go index 58480cc8..7d6af444 100644 --- a/src/ncm/pkg/networkintents/network.go +++ b/src/ncm/pkg/networkintents/network.go @@ -95,7 +95,11 @@ func (v *NetworkClient) CreateNetwork(p Network, clusterProvider, cluster string if err != nil { return Network{}, pkgerrors.New("Unable to find the cluster") } - switch s.State { + stateVal, err := state.GetCurrentStateFromStateInfo(s) + if err != nil { + return Network{}, pkgerrors.Errorf("Error getting current state from Cluster stateInfo: " + cluster) + } + switch stateVal { case state.StateEnum.Approved: return Network{}, pkgerrors.Errorf("Cluster is in an invalid state: " + cluster + " " + state.StateEnum.Approved) case state.StateEnum.Terminated: @@ -107,7 +111,7 @@ func (v *NetworkClient) CreateNetwork(p Network, clusterProvider, cluster string case state.StateEnum.Instantiated: return Network{}, pkgerrors.Errorf("Cluster is in an invalid state: " + cluster + " " + state.StateEnum.Instantiated) default: - return Network{}, pkgerrors.Errorf("Cluster is in an invalid state: " + cluster + " " + s.State) + return Network{}, pkgerrors.Errorf("Cluster is in an invalid state: " + cluster + " " + stateVal) } //Check if this Network already exists @@ -187,7 +191,11 @@ func (v *NetworkClient) DeleteNetwork(name, clusterProvider, cluster string) err if err != nil { return pkgerrors.New("Unable to find the cluster") } - switch s.State { + stateVal, err := state.GetCurrentStateFromStateInfo(s) + if err != nil { + return pkgerrors.Errorf("Error getting current state from Cluster stateInfo: " + cluster) + } + switch stateVal { case state.StateEnum.Approved: return pkgerrors.Errorf("Cluster is in an invalid state: " + cluster + " " + state.StateEnum.Approved) case state.StateEnum.Terminated: @@ -199,7 +207,7 @@ func (v *NetworkClient) DeleteNetwork(name, clusterProvider, cluster string) err case state.StateEnum.Instantiated: return pkgerrors.Errorf("Cluster is in an invalid state: " + cluster + " " + state.StateEnum.Instantiated) default: - return pkgerrors.Errorf("Cluster is in an invalid state: " + cluster + " " + s.State) + return pkgerrors.Errorf("Cluster is in an invalid state: " + cluster + " " + stateVal) } //Construct key and tag to select the entry diff --git a/src/ncm/pkg/networkintents/providernet.go b/src/ncm/pkg/networkintents/providernet.go index dbe6e46c..5cb9c670 100644 --- a/src/ncm/pkg/networkintents/providernet.go +++ b/src/ncm/pkg/networkintents/providernet.go @@ -90,7 +90,11 @@ func (v *ProviderNetClient) CreateProviderNet(p ProviderNet, clusterProvider, cl if err != nil { return ProviderNet{}, pkgerrors.New("Unable to find the cluster") } - switch s.State { + stateVal, err := state.GetCurrentStateFromStateInfo(s) + if err != nil { + return ProviderNet{}, pkgerrors.Errorf("Error getting current state from Cluster stateInfo: " + cluster) + } + switch stateVal { case state.StateEnum.Approved: return ProviderNet{}, pkgerrors.Wrap(err, "Cluster is in an invalid state: "+cluster+" "+state.StateEnum.Approved) case state.StateEnum.Terminated: @@ -102,7 +106,7 @@ func (v *ProviderNetClient) CreateProviderNet(p ProviderNet, clusterProvider, cl case state.StateEnum.Instantiated: return ProviderNet{}, pkgerrors.Wrap(err, "Cluster is in an invalid state: "+cluster+" "+state.StateEnum.Instantiated) default: - return ProviderNet{}, pkgerrors.Wrap(err, "Cluster is in an invalid state: "+cluster+" "+s.State) + return ProviderNet{}, pkgerrors.Wrap(err, "Cluster is in an invalid state: "+cluster+" "+stateVal) } //Construct key and tag to select the entry @@ -189,7 +193,11 @@ func (v *ProviderNetClient) DeleteProviderNet(name, clusterProvider, cluster str if err != nil { return pkgerrors.New("Unable to find the cluster") } - switch s.State { + stateVal, err := state.GetCurrentStateFromStateInfo(s) + if err != nil { + return pkgerrors.Errorf("Error getting current state from Cluster stateInfo: " + cluster) + } + switch stateVal { case state.StateEnum.Approved: return pkgerrors.Wrap(err, "Cluster is in an invalid state: "+cluster+" "+state.StateEnum.Approved) case state.StateEnum.Terminated: @@ -201,7 +209,7 @@ func (v *ProviderNetClient) DeleteProviderNet(name, clusterProvider, cluster str case state.StateEnum.Instantiated: return pkgerrors.Wrap(err, "Cluster is in an invalid state: "+cluster+" "+state.StateEnum.Instantiated) default: - return pkgerrors.Wrap(err, "Cluster is in an invalid state: "+cluster+" "+s.State) + return pkgerrors.Wrap(err, "Cluster is in an invalid state: "+cluster+" "+stateVal) } //Construct key and tag to select the entry diff --git a/src/ncm/pkg/scheduler/scheduler.go b/src/ncm/pkg/scheduler/scheduler.go index 8ced68b8..131113db 100644 --- a/src/ncm/pkg/scheduler/scheduler.go +++ b/src/ncm/pkg/scheduler/scheduler.go @@ -18,6 +18,7 @@ package scheduler import ( "encoding/json" + "time" clusterPkg "github.com/onap/multicloud-k8s/src/clm/pkg/cluster" oc "github.com/onap/multicloud-k8s/src/ncm/internal/ovncontroller" @@ -57,6 +58,13 @@ func NewSchedulerClient() *SchedulerClient { } } +func deleteAppContext(ac appcontext.AppContext) { + err := ac.DeleteCompositeApp() + if err != nil { + log.Warn(":: Error deleting AppContext ::", log.Fields{"Error": err}) + } +} + // Apply Network Intents associated with a cluster func (v *SchedulerClient) ApplyNetworkIntents(clusterProvider, cluster string) error { @@ -64,7 +72,11 @@ func (v *SchedulerClient) ApplyNetworkIntents(clusterProvider, cluster string) e if err != nil { return pkgerrors.Errorf("Error finding cluster: %v %v", clusterProvider, cluster) } - switch s.State { + stateVal, err := state.GetCurrentStateFromStateInfo(s) + if err != nil { + return pkgerrors.Errorf("Error getting current state from Cluster stateInfo: " + cluster) + } + switch stateVal { case state.StateEnum.Approved: return pkgerrors.Wrap(err, "Cluster is in an invalid state: "+cluster+" "+state.StateEnum.Approved) case state.StateEnum.Terminated: @@ -76,7 +88,7 @@ func (v *SchedulerClient) ApplyNetworkIntents(clusterProvider, cluster string) e case state.StateEnum.Instantiated: return pkgerrors.Wrap(err, "Cluster is in an invalid state: "+cluster+" "+state.StateEnum.Instantiated) default: - return pkgerrors.Wrap(err, "Cluster is in an invalid state: "+cluster+" "+s.State) + return pkgerrors.Wrap(err, "Cluster is in an invalid state: "+cluster+" "+stateVal) } // Make an app context for the network intent resources @@ -87,19 +99,14 @@ func (v *SchedulerClient) ApplyNetworkIntents(clusterProvider, cluster string) e } handle, err := ac.CreateCompositeApp() if err != nil { + deleteAppContext(ac) return pkgerrors.Wrap(err, "Error creating AppContext CompositeApp") } // Add an app (fixed value) to the app context apphandle, err := ac.AddApp(handle, nettypes.CONTEXT_CLUSTER_APP) if err != nil { - cleanuperr := ac.DeleteCompositeApp() - if cleanuperr != nil { - log.Warn("Error cleaning AppContext CompositeApp create failure", log.Fields{ - "cluster-provider": clusterProvider, - "cluster": cluster, - }) - } + deleteAppContext(ac) return pkgerrors.Wrap(err, "Error adding App to AppContext") } @@ -109,28 +116,38 @@ func (v *SchedulerClient) ApplyNetworkIntents(clusterProvider, cluster string) e }{ []string{nettypes.CONTEXT_CLUSTER_APP}, } - jinstr, _ := json.Marshal(appinstr) + jinstr, err := json.Marshal(appinstr) + if err != nil { + deleteAppContext(ac) + return pkgerrors.Wrap(err, "Error marshalling network intent app order instruction") + } appdepinstr := struct { Appdep map[string]string `json:"appdependency"` }{ map[string]string{nettypes.CONTEXT_CLUSTER_APP: "go"}, } - jdep, _ := json.Marshal(appdepinstr) + jdep, err := json.Marshal(appdepinstr) + if err != nil { + deleteAppContext(ac) + return pkgerrors.Wrap(err, "Error marshalling network intent app dependency instruction") + } _, err = ac.AddInstruction(handle, "app", "order", string(jinstr)) + if err != nil { + deleteAppContext(ac) + return pkgerrors.Wrap(err, "Error adding network intent app order instruction") + } _, err = ac.AddInstruction(handle, "app", "dependency", string(jdep)) + if err != nil { + deleteAppContext(ac) + return pkgerrors.Wrap(err, "Error adding network intent app dependency instruction") + } // Add a cluster to the app _, err = ac.AddCluster(apphandle, clusterProvider+nettypes.SEPARATOR+cluster) if err != nil { - cleanuperr := ac.DeleteCompositeApp() - if cleanuperr != nil { - log.Warn("Error cleaning AppContext after add cluster failure", log.Fields{ - "cluster-provider": clusterProvider, - "cluster": cluster, - }) - } + deleteAppContext(ac) return pkgerrors.Wrap(err, "Error adding Cluster to AppContext") } @@ -140,42 +157,33 @@ func (v *SchedulerClient) ApplyNetworkIntents(clusterProvider, cluster string) e // their own context err = oc.Apply(ctxVal, clusterProvider, cluster) if err != nil { - cleanuperr := ac.DeleteCompositeApp() - if cleanuperr != nil { - log.Warn("Error cleaning AppContext after controller failure", log.Fields{ - "cluster-provider": clusterProvider, - "cluster": cluster, - }) - } + deleteAppContext(ac) return pkgerrors.Wrap(err, "Error adding Cluster to AppContext") } + // call resource synchronizer to instantiate the CRs in the cluster + err = installappclient.InvokeInstallApp(ctxVal.(string)) + if err != nil { + deleteAppContext(ac) + return err + } + // update the StateInfo in the cluster db record key := clusterPkg.ClusterKey{ ClusterProviderName: clusterProvider, ClusterName: cluster, } - stateInfo := state.StateInfo{ + a := state.ActionEntry{ State: state.StateEnum.Applied, ContextId: ctxVal.(string), + TimeStamp: time.Now(), } + s.Actions = append(s.Actions, a) - err = db.DBconn.Insert(v.db.StoreName, key, nil, v.db.TagState, stateInfo) + err = db.DBconn.Insert(v.db.StoreName, key, nil, v.db.TagState, s) if err != nil { - cleanuperr := ac.DeleteCompositeApp() - if cleanuperr != nil { - log.Warn("Error cleaning AppContext after DB insert failure", log.Fields{ - "cluster-provider": clusterProvider, - "cluster": cluster, - }) - } - return pkgerrors.Wrap(err, "Error updating the stateInfo of cluster: "+cluster) - } - - // call resource synchronizer to instantiate the CRs in the cluster - err = installappclient.InvokeInstallApp(ctxVal.(string)) - if err != nil { - return err + log.Warn(":: Error updating Cluster state in DB ::", log.Fields{"Error": err.Error(), "cluster": cluster, "cluster provider": clusterProvider, "AppContext": ctxVal.(string)}) + return pkgerrors.Wrap(err, "Error updating the stateInfo of cluster after Apply on network intents: "+cluster) } return nil @@ -187,7 +195,11 @@ func (v *SchedulerClient) TerminateNetworkIntents(clusterProvider, cluster strin if err != nil { return pkgerrors.Wrapf(err, "Error finding StateInfo for cluster: %v, %v", clusterProvider, cluster) } - switch s.State { + stateVal, err := state.GetCurrentStateFromStateInfo(s) + if err != nil { + return pkgerrors.Errorf("Error getting current state from Cluster stateInfo: " + cluster) + } + switch stateVal { case state.StateEnum.Approved: return pkgerrors.Wrap(err, "Cluster is in an invalid state: "+cluster+" "+state.StateEnum.Approved) case state.StateEnum.Terminated: @@ -199,36 +211,28 @@ func (v *SchedulerClient) TerminateNetworkIntents(clusterProvider, cluster strin case state.StateEnum.Instantiated: return pkgerrors.Wrap(err, "Cluster is in an invalid state: "+cluster+" "+state.StateEnum.Instantiated) default: - return pkgerrors.Wrap(err, "Cluster is in an invalid state: "+cluster+" "+s.State) + return pkgerrors.Wrap(err, "Cluster is in an invalid state: "+cluster+" "+stateVal) } // call resource synchronizer to terminate the CRs in the cluster - err = installappclient.InvokeUninstallApp(s.ContextId) + contextId := state.GetLastContextIdFromStateInfo(s) + err = installappclient.InvokeUninstallApp(contextId) if err != nil { return err } - // remove the app context - context, err := state.GetAppContextFromStateInfo(s) - if err != nil { - return pkgerrors.Wrap(err, "Error getting appcontext from cluster StateInfo : "+clusterProvider+" "+cluster) - } - err = context.DeleteCompositeApp() - if err != nil { - return pkgerrors.Wrap(err, "Error deleting appcontext of cluster : "+clusterProvider+" "+cluster) - } - // update StateInfo key := clusterPkg.ClusterKey{ ClusterProviderName: clusterProvider, ClusterName: cluster, } - stateInfo := state.StateInfo{ + a := state.ActionEntry{ State: state.StateEnum.Terminated, - ContextId: "", + ContextId: contextId, + TimeStamp: time.Now(), } - - err = db.DBconn.Insert(v.db.StoreName, key, nil, v.db.TagState, stateInfo) + s.Actions = append(s.Actions, a) + err = db.DBconn.Insert(v.db.StoreName, key, nil, v.db.TagState, s) if err != nil { return pkgerrors.Wrap(err, "Error updating the stateInfo of cluster: "+cluster) } diff --git a/src/orchestrator/pkg/appcontext/appcontext.go b/src/orchestrator/pkg/appcontext/appcontext.go index cdf23bfa..db2ba432 100644 --- a/src/orchestrator/pkg/appcontext/appcontext.go +++ b/src/orchestrator/pkg/appcontext/appcontext.go @@ -34,6 +34,35 @@ type AppContext struct { rtc rtcontext.Rtcontext } +// AppContextStatus represents the current status of the appcontext +// Instantiating - instantiate has been invoked and is still in progress +// Instantiated - instantiate has completed +// PreTerminate - terminate has been invoked when in Instantiating status - need to clean up first +// Terminating - terminate has been invoked and is still in progress +// Terminated - terminate has completed +// Failed - the instantiate or terminate action has failed +type AppContextStatus struct { + Status StatusValue +} +type StatusValue string +type statuses struct { + Instantiating StatusValue + Instantiated StatusValue + PreTerminate StatusValue + Terminating StatusValue + Terminated StatusValue + Failed StatusValue +} + +var AppContextStatusEnum = &statuses{ + Instantiating: "Instantiating", + Instantiated: "Instantiated", + PreTerminate: "PreTerminate", + Terminating: "Terminating", + Terminated: "Terminated", + Failed: "Failed", +} + // CompositeAppMeta consists of projectName, CompositeAppName, // CompositeAppVersion, ReleaseName. This shall be used for // instantiation of a compositeApp @@ -99,6 +128,22 @@ func (ac *AppContext) GetCompositeAppHandle() (interface{}, error) { return h, nil } +// GetLevelHandle returns the handle for the supplied level at the given handle. +// For example, to get the handle of the 'status' level at a given handle. +func (ac *AppContext) GetLevelHandle(handle interface{}, level string) (interface{}, error) { + ach := fmt.Sprintf("%v%v/", handle, level) + hs, err := ac.rtc.RtcGetHandles(ach) + if err != nil { + return nil, err + } + for _, v := range hs { + if v == ach { + return v, nil + } + } + return nil, pkgerrors.Errorf("No handle was found for level %v", level) +} + //Add app to the context under composite app func (ac *AppContext) AddApp(handle interface{}, appname string) (interface{}, error) { h, err := ac.rtc.RtcAddLevel(handle, "app", appname) @@ -307,16 +352,7 @@ func (ac *AppContext) AddResource(handle interface{}, resname string, value inte return h, nil } -//Delete resource given the handle -func (ac *AppContext) DeleteResource(handle interface{}) error { - err := ac.rtc.RtcDeletePair(handle) - if err != nil { - return err - } - return nil -} - -//Return the hanlde for given app, cluster and resource name +//Return the handle for given app, cluster and resource name func (ac *AppContext) GetResourceHandle(appname string, clustername string, resname string) (interface{}, error) { if appname == "" { return nil, pkgerrors.Errorf("Not a valid run time context app name") @@ -343,11 +379,41 @@ func (ac *AppContext) GetResourceHandle(appname string, clustername string, resn return nil, pkgerrors.Errorf("No handle was found for the given resource") } -//Update the resource value usign the given handle +//Update the resource value using the given handle func (ac *AppContext) UpdateResourceValue(handle interface{}, value interface{}) error { return ac.rtc.RtcUpdateValue(handle, value) } +//Return the handle for given app, cluster and resource name +func (ac *AppContext) GetResourceStatusHandle(appname string, clustername string, resname string) (interface{}, error) { + if appname == "" { + return nil, pkgerrors.Errorf("Not a valid run time context app name") + } + if clustername == "" { + return nil, pkgerrors.Errorf("Not a valid run time context cluster name") + } + if resname == "" { + return nil, pkgerrors.Errorf("Not a valid run time context resource name") + } + + rh, err := ac.rtc.RtcGet() + if err != nil { + return nil, err + } + + acrh := fmt.Sprintf("%v", rh) + "app/" + appname + "/cluster/" + clustername + "/resource/" + resname + "/status/" + hs, err := ac.rtc.RtcGetHandles(acrh) + if err != nil { + return nil, err + } + for _, v := range hs { + if v == acrh { + return v, nil + } + } + return nil, pkgerrors.Errorf("No handle was found for the given resource") +} + //Add instruction under given handle and type func (ac *AppContext) AddInstruction(handle interface{}, level string, insttype string, value interface{}) (interface{}, error) { if !(insttype == "order" || insttype == "dependency") { @@ -364,7 +430,7 @@ func (ac *AppContext) AddInstruction(handle interface{}, level string, insttype return h, nil } -//Delete instruction under gievn handle +//Delete instruction under given handle func (ac *AppContext) DeleteInstruction(handle interface{}) error { err := ac.rtc.RtcDeletePair(handle) if err != nil { @@ -414,29 +480,20 @@ func (ac *AppContext) GetResourceInstruction(appname string, clustername string, return v, nil } -//AddStatus for holding status of all resources under app and cluster -// handle should be a cluster handle -func (ac *AppContext) AddStatus(handle interface{}, value interface{}) (interface{}, error) { - h, err := ac.rtc.RtcAddStatus(handle, value) +// AddLevelValue for holding a state object at a given level +// will make a handle with an appended "<level>/" to the key +func (ac *AppContext) AddLevelValue(handle interface{}, level string, value interface{}) (interface{}, error) { + h, err := ac.rtc.RtcAddOneLevel(handle, level, value) if err != nil { return nil, err } - log.Info(":: Added status handle ::", log.Fields{"StatusHandler": h}) + log.Info(":: Added handle ::", log.Fields{"Handle": h}) return h, nil } -//DeleteStatus for the given the handle -func (ac *AppContext) DeleteStatus(handle interface{}) error { - err := ac.rtc.RtcDeletePair(handle) - if err != nil { - return err - } - return nil -} - -//Return the handle for status for a given app and cluster -func (ac *AppContext) GetStatusHandle(appname string, clustername string) (interface{}, error) { +// GetClusterStatusHandle returns the handle for cluster status for a given app and cluster +func (ac *AppContext) GetClusterStatusHandle(appname string, clustername string) (interface{}, error) { if appname == "" { return nil, pkgerrors.Errorf("Not a valid run time context app name") } @@ -467,6 +524,11 @@ func (ac *AppContext) UpdateStatusValue(handle interface{}, value interface{}) e return ac.rtc.RtcUpdateValue(handle, value) } +//UpdateValue updates the state value with the given handle +func (ac *AppContext) UpdateValue(handle interface{}, value interface{}) error { + return ac.rtc.RtcUpdateValue(handle, value) +} + //Return all the handles under the composite app func (ac *AppContext) GetAllHandles(handle interface{}) ([]interface{}, error) { hs, err := ac.rtc.RtcGetHandles(handle) @@ -478,7 +540,7 @@ func (ac *AppContext) GetAllHandles(handle interface{}) ([]interface{}, error) { //Returns the value for a given handle func (ac *AppContext) GetValue(handle interface{}) (interface{}, error) { - var v string + var v interface{} err := ac.rtc.RtcGetValue(handle, &v) if err != nil { return nil, err diff --git a/src/orchestrator/pkg/appcontext/appcontext_test.go b/src/orchestrator/pkg/appcontext/appcontext_test.go index 92c43113..4d66b559 100644 --- a/src/orchestrator/pkg/appcontext/appcontext_test.go +++ b/src/orchestrator/pkg/appcontext/appcontext_test.go @@ -18,9 +18,10 @@ package appcontext import ( "fmt" - pkgerrors "github.com/pkg/errors" "strings" "testing" + + pkgerrors "github.com/pkg/errors" ) // Mock run time context @@ -145,10 +146,6 @@ func (c *MockRunTimeContext) RtcUpdateValue(handle interface{}, value interface{ return c.Err } -func (rtc *MockRunTimeContext) RtcAddStatus(handle interface{}, value interface{}) (interface{}, error) { - return nil, nil -} - func TestCreateCompositeApp(t *testing.T) { var ac = AppContext{} testCases := []struct { diff --git a/src/orchestrator/pkg/module/deployment_intent_groups.go b/src/orchestrator/pkg/module/deployment_intent_groups.go index d017c1e9..f9829853 100644 --- a/src/orchestrator/pkg/module/deployment_intent_groups.go +++ b/src/orchestrator/pkg/module/deployment_intent_groups.go @@ -19,6 +19,7 @@ package module import ( "encoding/json" "reflect" + "time" "github.com/onap/multicloud-k8s/src/orchestrator/pkg/infra/db" "github.com/onap/multicloud-k8s/src/orchestrator/pkg/state" @@ -135,12 +136,15 @@ func (c *DeploymentIntentGroupClient) CreateDeploymentIntentGroup(d DeploymentIn } // Add the stateInfo record - stateInfo := state.StateInfo{ + s := state.StateInfo{} + a := state.ActionEntry{ State: state.StateEnum.Created, ContextId: "", + TimeStamp: time.Now(), } + s.Actions = append(s.Actions, a) - err = db.DBconn.Insert(c.storeName, gkey, nil, c.tagState, stateInfo) + err = db.DBconn.Insert(c.storeName, gkey, nil, c.tagState, s) if err != nil { return DeploymentIntentGroup{}, pkgerrors.Wrap(err, "Error updating the stateInfo of the DeploymentIntentGroup: "+d.MetaData.Name) } @@ -252,10 +256,33 @@ func (c *DeploymentIntentGroupClient) DeleteDeploymentIntentGroup(di string, p s Version: v, } s, err := c.GetDeploymentIntentGroupState(di, p, ca, v) - if err == nil && s.State == state.StateEnum.Instantiated { + if err != nil { + return pkgerrors.Errorf("Error getting stateInfo from DeploymentIntentGroup: " + di) + } + + stateVal, err := state.GetCurrentStateFromStateInfo(s) + if err != nil { + return pkgerrors.Errorf("Error getting current state from DeploymentIntentGroup stateInfo: " + di) + } + + if stateVal == state.StateEnum.Instantiated { return pkgerrors.Errorf("DeploymentIntentGroup must be terminated before it can be deleted " + di) } + // remove the app contexts associated with thie Deployment Intent Group + if stateVal == state.StateEnum.Terminated { + for _, id := range state.GetContextIdsFromStateInfo(s) { + context, err := state.GetAppContextFromId(id) + if err != nil { + return pkgerrors.Wrap(err, "Error getting appcontext from Deployment Intent Group StateInfo") + } + err = context.DeleteCompositeApp() + if err != nil { + return pkgerrors.Wrap(err, "Error deleting appcontext for Deployment Intent Group") + } + } + } + err = db.DBconn.Remove(c.storeName, k) if err != nil { return pkgerrors.Wrap(err, "Error deleting DeploymentIntentGroup entry") diff --git a/src/orchestrator/pkg/module/instantiation.go b/src/orchestrator/pkg/module/instantiation.go index 9c0c9e31..08250d16 100644 --- a/src/orchestrator/pkg/module/instantiation.go +++ b/src/orchestrator/pkg/module/instantiation.go @@ -20,6 +20,7 @@ import ( "encoding/base64" "encoding/json" "fmt" + "time" rb "github.com/onap/multicloud-k8s/src/monitor/pkg/apis/k8splugin/v1alpha1" gpic "github.com/onap/multicloud-k8s/src/orchestrator/pkg/gpic" @@ -100,7 +101,11 @@ func (c InstantiationClient) Approve(p string, ca string, v string, di string) e if err != nil { return pkgerrors.Wrap(err, "DeploymentIntentGroup has no state info: "+di) } - switch s.State { + stateVal, err := state.GetCurrentStateFromStateInfo(s) + if err != nil { + return pkgerrors.Errorf("Error getting current state from DeploymentIntentGroup stateInfo: " + di) + } + switch stateVal { case state.StateEnum.Approved: return nil case state.StateEnum.Terminated: @@ -108,11 +113,11 @@ func (c InstantiationClient) Approve(p string, ca string, v string, di string) e case state.StateEnum.Created: break case state.StateEnum.Applied: - return pkgerrors.Errorf("DeploymentIntentGroup is in an invalid state" + s.State) + return pkgerrors.Errorf("DeploymentIntentGroup is in an invalid state" + stateVal) case state.StateEnum.Instantiated: return pkgerrors.Errorf("DeploymentIntentGroup has already been instantiated" + di) default: - return pkgerrors.Errorf("DeploymentIntentGroup is in an unknown state" + s.State) + return pkgerrors.Errorf("DeploymentIntentGroup is in an unknown state" + stateVal) } key := DeploymentIntentGroupKey{ @@ -121,12 +126,14 @@ func (c InstantiationClient) Approve(p string, ca string, v string, di string) e CompositeApp: ca, Version: v, } - stateInfo := state.StateInfo{ + a := state.ActionEntry{ State: state.StateEnum.Approved, ContextId: "", + TimeStamp: time.Now(), } + s.Actions = append(s.Actions, a) - err = db.DBconn.Insert(c.db.storeName, key, nil, c.db.tagState, stateInfo) + err = db.DBconn.Insert(c.db.storeName, key, nil, c.db.tagState, s) if err != nil { return pkgerrors.Wrap(err, "Error updating the stateInfo of the DeploymentIntentGroup: "+di) } @@ -229,7 +236,11 @@ func (c InstantiationClient) Instantiate(p string, ca string, v string, di strin if err != nil { return pkgerrors.Errorf("Error retrieving DeploymentIntentGroup stateInfo: " + di) } - switch s.State { + stateVal, err := state.GetCurrentStateFromStateInfo(s) + if err != nil { + return pkgerrors.Errorf("Error getting current state from DeploymentIntentGroup stateInfo: " + di) + } + switch stateVal { case state.StateEnum.Approved: break case state.StateEnum.Terminated: @@ -241,7 +252,7 @@ func (c InstantiationClient) Instantiate(p string, ca string, v string, di strin case state.StateEnum.Instantiated: return pkgerrors.Errorf("DeploymentIntentGroup has already been instantiated" + di) default: - return pkgerrors.Errorf("DeploymentIntentGroup is in an unknown state" + s.State) + return pkgerrors.Errorf("DeploymentIntentGroup is in an unknown state" + stateVal) } rName := dIGrp.Spec.Version //rName is releaseName @@ -286,6 +297,7 @@ func (c InstantiationClient) Instantiate(p string, ca string, v string, di strin sortedTemplates, err := GetSortedTemplateForApp(eachApp.Metadata.Name, p, ca, v, rName, cp, overrideValues) if err != nil { + deleteAppContext(context) return pkgerrors.Wrap(err, "Unable to get the sorted templates for app") } @@ -293,16 +305,19 @@ func (c InstantiationClient) Instantiate(p string, ca string, v string, di strin resources, err := getResources(sortedTemplates) if err != nil { + deleteAppContext(context) return pkgerrors.Wrapf(err, "Unable to get the resources for app :: %s", eachApp.Metadata.Name) } specData, err := NewAppIntentClient().GetAllIntentsByApp(eachApp.Metadata.Name, p, ca, v, gIntent) if err != nil { + deleteAppContext(context) return pkgerrors.Wrap(err, "Unable to get the intents for app") } // listOfClusters shall have both mandatoryClusters and optionalClusters where the app needs to be installed. listOfClusters, err := gpic.IntentResolver(specData.Intent) if err != nil { + deleteAppContext(context) return pkgerrors.Wrap(err, "Unable to get the intents resolved for app") } @@ -312,85 +327,100 @@ func (c InstantiationClient) Instantiate(p string, ca string, v string, di strin // Add an app to the app context apphandle, err := context.AddApp(compositeHandle, eachApp.Metadata.Name) if err != nil { - cleanuperr := context.DeleteCompositeApp() - if cleanuperr != nil { - log.Info(":: Error Cleaning up AppContext compositeApp failure ::", log.Fields{"Error": cleanuperr.Error(), "AppName": eachApp.Metadata.Name}) - } + deleteAppContext(context) return pkgerrors.Wrap(err, "Error adding App to AppContext") } err = addClustersToAppContext(listOfClusters, context, apphandle, resources) if err != nil { - log.Info(":: Error while adding cluster and resources to app ::", log.Fields{"Error": err.Error(), "AppName": eachApp.Metadata.Name}) + deleteAppContext(context) + return pkgerrors.Wrap(err, "Error while adding cluster and resources to app") } err = verifyResources(listOfClusters, context, resources, eachApp.Metadata.Name) if err != nil { - log.Info(":: Error while verifying resources in app ::", log.Fields{"Error": err.Error(), "AppName": eachApp.Metadata.Name}) + deleteAppContext(context) + return pkgerrors.Wrap(err, "Error while verifying resources in app: ") } - } - jappOrderInstr, _ := json.Marshal(appOrderInstr) + jappOrderInstr, err := json.Marshal(appOrderInstr) + if err != nil { + deleteAppContext(context) + return pkgerrors.Wrap(err, "Error marshalling app order instruction") + } appDepInstr.Appdep = appdep - jappDepInstr, _ := json.Marshal(appDepInstr) - context.AddInstruction(compositeHandle, "app", "order", string(jappOrderInstr)) - context.AddInstruction(compositeHandle, "app", "dependency", string(jappDepInstr)) - //END: storing into etcd - - // BEGIN:: save the context in the orchestrator db record - key := DeploymentIntentGroupKey{ - Name: di, - Project: p, - CompositeApp: ca, - Version: v, + jappDepInstr, err := json.Marshal(appDepInstr) + if err != nil { + deleteAppContext(context) + return pkgerrors.Wrap(err, "Error marshalling app dependency instruction") } - stateInfo := state.StateInfo{ - State: state.StateEnum.Instantiated, - ContextId: ctxval.(string), + _, err = context.AddInstruction(compositeHandle, "app", "order", string(jappOrderInstr)) + if err != nil { + deleteAppContext(context) + return pkgerrors.Wrap(err, "Error adding app dependency instruction") } - err = db.DBconn.Insert(c.db.storeName, key, nil, c.db.tagState, stateInfo) + _, err = context.AddInstruction(compositeHandle, "app", "dependency", string(jappDepInstr)) if err != nil { - cleanuperr := context.DeleteCompositeApp() - if cleanuperr != nil { - - log.Info(":: Error Cleaning up AppContext while saving context in the db for GPIntent ::", log.Fields{"Error": cleanuperr.Error(), "GPIntent": gIntent, "DeploymentIntentGroup": di, "CompositeApp": ca, "CompositeAppVersion": v, "Project": p}) - } - return pkgerrors.Wrap(err, "Error adding AppContext to DB") + deleteAppContext(context) + return pkgerrors.Wrap(err, "Error adding app dependency instruction") } - // END:: save the context in the orchestrator db record + //END: storing into etcd // BEGIN: scheduler code pl, mapOfControllers, err := getPrioritizedControllerList(p, ca, v, di) if err != nil { - return err + return pkgerrors.Wrap(err, "Error adding getting prioritized controller list") } log.Info("Priority Based List ", log.Fields{"PlacementControllers::": pl.pPlaCont, "ActionControllers::": pl.pActCont, "mapOfControllers::": mapOfControllers}) err = callGrpcForControllerList(pl.pPlaCont, mapOfControllers, ctxval) if err != nil { - return err + deleteAppContext(context) + return pkgerrors.Wrap(err, "Error calling gRPC for placement controller list") } err = deleteExtraClusters(allApps, context) if err != nil { - return err + deleteAppContext(context) + return pkgerrors.Wrap(err, "Error deleting extra clusters") } err = callGrpcForControllerList(pl.pActCont, mapOfControllers, ctxval) if err != nil { - return err + deleteAppContext(context) + return pkgerrors.Wrap(err, "Error calling gRPC for action controller list") } - // END: Scheduler code // BEGIN : Rsync code err = callRsyncInstall(ctxval) if err != nil { - return err + deleteAppContext(context) + return pkgerrors.Wrap(err, "Error calling rsync") } // END : Rsyc code - log.Info(":: Done with instantiation... ::", log.Fields{"CompositeAppName": ca}) + // BEGIN:: save the context in the orchestrator db record + key := DeploymentIntentGroupKey{ + Name: di, + Project: p, + CompositeApp: ca, + Version: v, + } + a := state.ActionEntry{ + State: state.StateEnum.Instantiated, + ContextId: ctxval.(string), + TimeStamp: time.Now(), + } + s.Actions = append(s.Actions, a) + err = db.DBconn.Insert(c.db.storeName, key, nil, c.db.tagState, s) + if err != nil { + log.Warn(":: Error updating DeploymentIntentGroup state in DB ::", log.Fields{"Error": err.Error(), "GPIntent": gIntent, "DeploymentIntentGroup": di, "CompositeApp": ca, "CompositeAppVersion": v, "Project": p, "AppContext": ctxval.(string)}) + return pkgerrors.Wrap(err, "Error adding DeploymentIntentGroup state to DB") + } + // END:: save the context in the orchestrator db record + + log.Info(":: Done with instantiation call to rsync... ::", log.Fields{"CompositeAppName": ca}) return err } @@ -406,7 +436,8 @@ func (c InstantiationClient) Status(p string, ca string, v string, di string) (S return StatusData{}, pkgerrors.Wrap(err, "deploymentIntentGroup not found: "+di) } - ac, err := state.GetAppContextFromStateInfo(s) + currentCtxId := state.GetLastContextIdFromStateInfo(s) + ac, err := state.GetAppContextFromId(currentCtxId) if err != nil { return StatusData{}, pkgerrors.Wrap(err, "AppContext for deploymentIntentGroup not found: "+di) } @@ -430,7 +461,7 @@ func (c InstantiationClient) Status(p string, ca string, v string, di string) (S } for _, cluster := range clusters { - handle, err := ac.GetStatusHandle(app.Metadata.Name, cluster) + handle, err := ac.GetClusterStatusHandle(app.Metadata.Name, cluster) if err != nil { log.Info(":: No status handle for cluster, app ::", log.Fields{"Cluster": cluster, "AppName": app.Metadata.Name, "Error": err}) @@ -470,23 +501,21 @@ func (c InstantiationClient) Terminate(p string, ca string, v string, di string) s, err := NewDeploymentIntentGroupClient().GetDeploymentIntentGroupState(di, p, ca, v) if err != nil { return pkgerrors.Wrap(err, "DeploymentIntentGroup has no state info: "+di) - } else if s.State != state.StateEnum.Instantiated { - return pkgerrors.Errorf("DeploymentIntentGroup is not instantiated" + di) } - ac, err := state.GetAppContextFromStateInfo(s) + stateVal, err := state.GetCurrentStateFromStateInfo(s) if err != nil { - return pkgerrors.Wrap(err, "AppContext for deploymentIntentGroup not found: "+di) + return pkgerrors.Errorf("Error getting current state from DeploymentIntentGroup stateInfo: " + di) } - err = callRsyncUninstall(s.ContextId) - if err != nil { - return err + if stateVal != state.StateEnum.Instantiated { + return pkgerrors.Errorf("DeploymentIntentGroup is not instantiated" + di) } - err = ac.DeleteCompositeApp() + currentCtxId := state.GetLastContextIdFromStateInfo(s) + err = callRsyncUninstall(currentCtxId) if err != nil { - return pkgerrors.Wrap(err, "Error deleting the app context for DeploymentIntentGroup: "+di) + return err } key := DeploymentIntentGroupKey{ @@ -495,12 +524,14 @@ func (c InstantiationClient) Terminate(p string, ca string, v string, di string) CompositeApp: ca, Version: v, } - stateInfo := state.StateInfo{ + a := state.ActionEntry{ State: state.StateEnum.Terminated, - ContextId: "", + ContextId: currentCtxId, + TimeStamp: time.Now(), } + s.Actions = append(s.Actions, a) - err = db.DBconn.Insert(c.db.storeName, key, nil, c.db.tagState, stateInfo) + err = db.DBconn.Insert(c.db.storeName, key, nil, c.db.tagState, s) if err != nil { return pkgerrors.Wrap(err, "Error updating the stateInfo of the DeploymentIntentGroup: "+di) } diff --git a/src/orchestrator/pkg/module/instantiation_appcontext_helper.go b/src/orchestrator/pkg/module/instantiation_appcontext_helper.go index 9ace81b6..692cdf1e 100644 --- a/src/orchestrator/pkg/module/instantiation_appcontext_helper.go +++ b/src/orchestrator/pkg/module/instantiation_appcontext_helper.go @@ -71,6 +71,16 @@ func makeAppContextForCompositeApp(p, ca, v, rName string) (contextForCompositeA } +// deleteAppContext removes an appcontext +func deleteAppContext(ct appcontext.AppContext) error { + err := ct.DeleteCompositeApp() + if err != nil { + log.Warn(":: Error deleting AppContext ::", log.Fields{"Error": err}) + return pkgerrors.Wrapf(err, "Error Deleteing AppContext") + } + return nil +} + // getResources shall take in the sorted templates and output the resources // which consists of name(name+kind) and filecontent func getResources(st []helm.KubernetesResourceTemplate) ([]resource, error) { @@ -208,7 +218,7 @@ func verifyResources(l gpic.ClusterList, ct appcontext.AppContext, resources []r for _, res := range resources { rh, err := ct.GetResourceHandle(appName, cn, res.name) if err != nil { - return pkgerrors.Wrapf(err, "Error getting resoure handle for resource :: %s, app:: %s, cluster :: %s, groupName :: %s", appName, res.name, cn, gn) + return pkgerrors.Wrapf(err, "Error getting resource handle for resource :: %s, app:: %s, cluster :: %s, groupName :: %s", appName, res.name, cn, gn) } log.Info(":: GetResourceHandle ::", log.Fields{"ResourceHandler": rh, "appName": appName, "Cluster": cn, "Resource": res.name}) } diff --git a/src/orchestrator/pkg/resourcestatus/resourcestatus.go b/src/orchestrator/pkg/resourcestatus/resourcestatus.go new file mode 100644 index 00000000..f399e29e --- /dev/null +++ b/src/orchestrator/pkg/resourcestatus/resourcestatus.go @@ -0,0 +1,41 @@ +/* + * Copyright 2020 Intel Corporation, Inc + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package resourcestatus + +// ResourceStatus struct is used to maintain the rsync status for resources in the appcontext +// that rsync is synchronizing to clusters +type ResourceStatus struct { + Status RsyncStatus +} + +type RsyncStatus = string + +type statusValues struct { + Pending RsyncStatus + Applied RsyncStatus + Failed RsyncStatus + Retrying RsyncStatus + Deleted RsyncStatus +} + +var RsyncStatusEnum = &statusValues{ + Pending: "Pending", + Applied: "Applied", + Failed: "Failed", + Retrying: "Retrying", + Deleted: "Deleted", +} diff --git a/src/orchestrator/pkg/rtcontext/rtcontext.go b/src/orchestrator/pkg/rtcontext/rtcontext.go index f3905eb0..f77fb329 100644 --- a/src/orchestrator/pkg/rtcontext/rtcontext.go +++ b/src/orchestrator/pkg/rtcontext/rtcontext.go @@ -41,7 +41,6 @@ type Rtcontext interface { RtcAddMeta(meta interface{}) error RtcGet() (interface{}, error) RtcAddLevel(handle interface{}, level string, value string) (interface{}, error) - RtcAddStatus(handle interface{}, value interface{}) (interface{}, error) RtcAddResource(handle interface{}, resname string, value interface{}) (interface{}, error) RtcAddInstruction(handle interface{}, level string, insttype string, value interface{}) (interface{}, error) RtcDeletePair(handle interface{}) error @@ -203,26 +202,6 @@ func (rtc *RunTimeContext) RtcAddOneLevel(pl interface{}, level string, value in return (interface{})(key), nil } -// Add status under the given level and return new handle -func (rtc *RunTimeContext) RtcAddStatus(handle interface{}, value interface{}) (interface{}, error) { - - str := fmt.Sprintf("%v", handle) - sid := fmt.Sprintf("%v", rtc.cid) - if !strings.HasPrefix(str, sid) { - return nil, pkgerrors.Errorf("Not a valid run time context handle") - } - if value == nil { - return nil, pkgerrors.Errorf("Not a valid run time context resource value") - } - - k := str + "status" + "/" - err := contextdb.Db.Put(k, value) - if err != nil { - return nil, pkgerrors.Errorf("Error adding run time context status: %s", err.Error()) - } - return (interface{})(k), nil -} - // Add a resource under the given level and return new handle func (rtc *RunTimeContext) RtcAddResource(handle interface{}, resname string, value interface{}) (interface{}, error) { diff --git a/src/orchestrator/pkg/state/state_helper.go b/src/orchestrator/pkg/state/state_helper.go index a65cea8d..9d59fb75 100644 --- a/src/orchestrator/pkg/state/state_helper.go +++ b/src/orchestrator/pkg/state/state_helper.go @@ -16,14 +16,56 @@ package state -import "github.com/onap/multicloud-k8s/src/orchestrator/pkg/appcontext" +import ( + "github.com/onap/multicloud-k8s/src/orchestrator/pkg/appcontext" + pkgerrors "github.com/pkg/errors" +) // GetAppContextFromStateInfo loads the appcontext present in the StateInfo input -func GetAppContextFromStateInfo(s StateInfo) (appcontext.AppContext, error) { +func GetAppContextFromId(ctxid string) (appcontext.AppContext, error) { var cc appcontext.AppContext - _, err := cc.LoadAppContext(s.ContextId) + _, err := cc.LoadAppContext(ctxid) if err != nil { return appcontext.AppContext{}, err } return cc, nil } + +// GetCurrentStateFromStatInfo gets the last (current) state from StateInfo +func GetCurrentStateFromStateInfo(s StateInfo) (StateValue, error) { + alen := len(s.Actions) + if alen == 0 { + return StateEnum.Undefined, pkgerrors.Errorf("No state information") + } + return s.Actions[alen-1].State, nil +} + +// GetLastContextFromStatInfo gets the last (most recent) context id from StateInfo +func GetLastContextIdFromStateInfo(s StateInfo) string { + alen := len(s.Actions) + if alen > 0 { + return s.Actions[alen-1].ContextId + } else { + return "" + } +} + +// GetContextIdsFromStatInfo return a list of the unique AppContext Ids in the StateInfo +func GetContextIdsFromStateInfo(s StateInfo) []string { + m := make(map[string]string) + + for _, a := range s.Actions { + if a.ContextId != "" { + m[a.ContextId] = "" + } + } + + ids := make([]string, len(m)) + i := 0 + for k := range m { + ids[i] = k + i++ + } + + return ids +} diff --git a/src/orchestrator/pkg/state/types.go b/src/orchestrator/pkg/state/types.go index 25fb60d2..665a1be4 100644 --- a/src/orchestrator/pkg/state/types.go +++ b/src/orchestrator/pkg/state/types.go @@ -16,16 +16,27 @@ package state +import "time" + // StateInfo struct is used to maintain the values for state, contextid, (and other) // information about resources which can be instantiated via rsync. +// The last Actions entry holds the current state of the container object. type StateInfo struct { + Actions []ActionEntry +} + +// ActionEntry is used to keep track of the time an action (e.g. Created, Instantiate, Terminate) was invoked +// For actions where an AppContext is relevent, the ContextId field will be non-zero length +type ActionEntry struct { State StateValue ContextId string + TimeStamp time.Time } type StateValue = string type states struct { + Undefined StateValue Created StateValue Approved StateValue Applied StateValue @@ -34,6 +45,7 @@ type states struct { } var StateEnum = &states{ + Undefined: "Undefined", Created: "Created", Approved: "Approved", Applied: "Applied", diff --git a/src/rsync/go.mod b/src/rsync/go.mod index 18fef6be..973895a3 100644 --- a/src/rsync/go.mod +++ b/src/rsync/go.mod @@ -5,13 +5,13 @@ go 1.13 require ( //client github.com/evanphx/json-patch v4.5.0+incompatible // indirect + github.com/ghodss/yaml v1.0.0 github.com/golang/protobuf v1.4.1 github.com/googleapis/gnostic v0.4.0 github.com/jonboulle/clockwork v0.1.0 - github.com/modern-go/reflect2 v1.0.1 // indirect github.com/onap/multicloud-k8s/src/clm v0.0.0-00010101000000-000000000000 github.com/onap/multicloud-k8s/src/monitor v0.0.0-20200708223327-9a9a6aedbd7a - github.com/onap/multicloud-k8s/src/orchestrator v0.0.0-20200601021239-7959bd4c6fd4 + github.com/onap/multicloud-k8s/src/orchestrator v0.0.0-20200721211210-783ed87fb39a //github.com/onap/multicloud-k8s/src/orchestrator v0.0.0-20200601021239-7959bd4c6fd4 github.com/pkg/errors v0.9.1 github.com/sirupsen/logrus v1.5.0 @@ -29,8 +29,8 @@ require ( replace ( github.com/onap/multicloud-k8s/src/clm => ../clm - github.com/onap/multicloud-k8s/src/orchestrator => ../orchestrator github.com/onap/multicloud-k8s/src/monitor => ../monitor + github.com/onap/multicloud-k8s/src/orchestrator => ../orchestrator k8s.io/api => k8s.io/api v0.17.3 k8s.io/apiextensions-apiserver => k8s.io/apiextensions-apiserver v0.17.3 k8s.io/apimachinery => k8s.io/apimachinery v0.17.3 diff --git a/src/rsync/pkg/context/context.go b/src/rsync/pkg/context/context.go index cc7773b8..f77482e6 100644 --- a/src/rsync/pkg/context/context.go +++ b/src/rsync/pkg/context/context.go @@ -20,11 +20,12 @@ import ( "context" "encoding/json" "fmt" - "log" "strings" + "time" "github.com/onap/multicloud-k8s/src/orchestrator/pkg/appcontext" "github.com/onap/multicloud-k8s/src/orchestrator/pkg/infra/logutils" + "github.com/onap/multicloud-k8s/src/orchestrator/pkg/resourcestatus" kubeclient "github.com/onap/multicloud-k8s/src/rsync/pkg/client" connector "github.com/onap/multicloud-k8s/src/rsync/pkg/connector" utils "github.com/onap/multicloud-k8s/src/rsync/pkg/internal" @@ -38,40 +39,49 @@ type CompositeAppContext struct { cid interface{} } -func getRes(ac appcontext.AppContext, name string, app string, cluster string) ([]byte, error) { +func getRes(ac appcontext.AppContext, name string, app string, cluster string) ([]byte, interface{}, error) { var byteRes []byte rh, err := ac.GetResourceHandle(app, cluster, name) if err != nil { - return nil, err + return nil, nil, err + } + sh, err := ac.GetLevelHandle(rh, "status") + if err != nil { + return nil, nil, err } resval, err := ac.GetValue(rh) if err != nil { - return nil, err + return nil, sh, err } if resval != "" { result := strings.Split(name, "+") if result[0] == "" { - return nil, pkgerrors.Errorf("Resource name is nil %s:", name) + return nil, sh, pkgerrors.Errorf("Resource name is nil %s:", name) } byteRes = []byte(fmt.Sprintf("%v", resval.(interface{}))) } else { - return nil, pkgerrors.Errorf("Resource value is nil %s", name) + return nil, sh, pkgerrors.Errorf("Resource value is nil %s", name) } - return byteRes, nil + return byteRes, sh, nil } func terminateResource(ac appcontext.AppContext, c *kubeclient.Client, name string, app string, cluster string, label string) error { - res, err := getRes(ac, name, app, cluster) + res, sh, err := getRes(ac, name, app, cluster) if err != nil { + if sh != nil { + ac.UpdateStatusValue(sh, resourcestatus.ResourceStatus{Status: resourcestatus.RsyncStatusEnum.Failed}) + } return err } if err := c.Delete(res); err != nil { + ac.UpdateStatusValue(sh, resourcestatus.ResourceStatus{Status: resourcestatus.RsyncStatusEnum.Failed}) logutils.Error("Failed to delete res", logutils.Fields{ "error": err, "resource": name, }) return err } + ac.UpdateStatusValue(sh, resourcestatus.ResourceStatus{Status: resourcestatus.RsyncStatusEnum.Deleted}) logutils.Info("Deleted::", logutils.Fields{ "cluster": cluster, "resource": name, @@ -80,8 +90,11 @@ func terminateResource(ac appcontext.AppContext, c *kubeclient.Client, name stri } func instantiateResource(ac appcontext.AppContext, c *kubeclient.Client, name string, app string, cluster string, label string) error { - res, err := getRes(ac, name, app, cluster) + res, sh, err := getRes(ac, name, app, cluster) if err != nil { + if sh != nil { + ac.UpdateStatusValue(sh, resourcestatus.ResourceStatus{Status: resourcestatus.RsyncStatusEnum.Failed}) + } return err } //Decode the yaml to create a runtime.Object @@ -116,12 +129,14 @@ func instantiateResource(ac appcontext.AppContext, c *kubeclient.Client, name st return err } if err := c.Apply(b); err != nil { + ac.UpdateStatusValue(sh, resourcestatus.ResourceStatus{Status: resourcestatus.RsyncStatusEnum.Failed}) logutils.Error("Failed to apply res", logutils.Fields{ "error": err, "resource": name, }) return err } + ac.UpdateStatusValue(sh, resourcestatus.ResourceStatus{Status: resourcestatus.RsyncStatusEnum.Applied}) logutils.Info("Installed::", logutils.Fields{ "cluster": cluster, "resource": name, @@ -129,6 +144,116 @@ func instantiateResource(ac appcontext.AppContext, c *kubeclient.Client, name st return nil } +// Wait for 2 secs +const waitTime = 2 + +func waitForClusterReady(c *kubeclient.Client, cluster string) error { + for { + if err := c.IsReachable(); err != nil { + // TODO: Add more realistic error checking + // TODO: Add Incremental wait logic here + time.Sleep(waitTime * time.Second) + } else { + break + } + } + logutils.Info("Cluster is reachable::", logutils.Fields{ + "cluster": cluster, + }) + return nil +} + +// initializeResourceStatus sets the initial status of every resource appropriately based on the state of the AppContext +func initializeAppContextStatus(ac appcontext.AppContext, acStatus appcontext.AppContextStatus) error { + h, err := ac.GetCompositeAppHandle() + if err != nil { + return err + } + sh, err := ac.GetLevelHandle(h, "status") + if sh == nil { + _, err = ac.AddLevelValue(h, "status", acStatus) + } else { + err = ac.UpdateValue(sh, acStatus) + } + if err != nil { + return err + } + return nil +} + +// initializeResourceStatus sets the initial status of every resource appropriately based on the state of the AppContext +func initializeResourceStatus(ac appcontext.AppContext, acStatus appcontext.AppContextStatus) error { + statusPending := resourcestatus.ResourceStatus{ + Status: resourcestatus.RsyncStatusEnum.Pending, + } + statusDeleted := resourcestatus.ResourceStatus{ + Status: resourcestatus.RsyncStatusEnum.Deleted, + } + + appsOrder, err := ac.GetAppInstruction("order") + if err != nil { + return err + } + var appList map[string][]string + json.Unmarshal([]byte(appsOrder.(string)), &appList) + + for _, app := range appList["apporder"] { + clusterNames, err := ac.GetClusterNames(app) + if err != nil { + return err + } + for k := 0; k < len(clusterNames); k++ { + cluster := clusterNames[k] + resorder, err := ac.GetResourceInstruction(app, cluster, "order") + if err != nil { + return err + } + var aov map[string][]string + json.Unmarshal([]byte(resorder.(string)), &aov) + for _, res := range aov["resorder"] { + rh, err := ac.GetResourceHandle(app, cluster, res) + if err != nil { + return err + } + sh, err := ac.GetLevelHandle(rh, "status") + if acStatus.Status == appcontext.AppContextStatusEnum.Instantiating { + if sh == nil { + _, err = ac.AddLevelValue(rh, "status", statusPending) + } else { + err = ac.UpdateStatusValue(sh, statusPending) + } + if err != nil { + return err + } + } else if acStatus.Status == appcontext.AppContextStatusEnum.Terminating { + if sh == nil { + _, err = ac.AddLevelValue(rh, "status", statusDeleted) + } else { + s, err := ac.GetValue(sh) + if err != nil { + return err + } + rStatus := resourcestatus.ResourceStatus{} + js, _ := json.Marshal(s) + json.Unmarshal(js, &rStatus) + if rStatus.Status == resourcestatus.RsyncStatusEnum.Applied { + err = ac.UpdateStatusValue(sh, statusPending) + } else { + err = ac.UpdateStatusValue(sh, statusDeleted) + } + if err != nil { + return err + } + } + } else { + return pkgerrors.Errorf("Error intializing AppContext Resource Statuses") + } + } + } + } + return nil +} + func addStatusTracker(c *kubeclient.Client, app string, cluster string, label string) error { b, err := status.GetStatusCR(label) @@ -139,6 +264,7 @@ func addStatusTracker(c *kubeclient.Client, app string, cluster string, label st }) return err } + // TODO: Check reachability? if err = c.Apply(b); err != nil { logutils.Error("Failed to apply status tracker", logutils.Fields{ "error": err, @@ -181,17 +307,62 @@ func deleteStatusTracker(c *kubeclient.Client, app string, cluster string, label return nil } +func updateEndingAppContextStatus(ac appcontext.AppContext, handle interface{}, failure bool) error { + sh, err := ac.GetLevelHandle(handle, "status") + if err != nil { + return err + } + s, err := ac.GetValue(sh) + if err != nil { + return err + } + acStatus := appcontext.AppContextStatus{} + js, _ := json.Marshal(s) + json.Unmarshal(js, &acStatus) + + if failure { + acStatus.Status = appcontext.AppContextStatusEnum.Failed + } else if acStatus.Status == appcontext.AppContextStatusEnum.Instantiating { + acStatus.Status = appcontext.AppContextStatusEnum.Instantiated + } else if acStatus.Status == appcontext.AppContextStatusEnum.Terminating { + acStatus.Status = appcontext.AppContextStatusEnum.Terminated + } else { + return pkgerrors.Errorf("Invalid AppContextStatus %v", acStatus) + } + + err = ac.UpdateValue(sh, acStatus) + if err != nil { + return err + } + return nil +} + type fn func(ac appcontext.AppContext, client *kubeclient.Client, res string, app string, cluster string, label string) error type statusfn func(client *kubeclient.Client, app string, cluster string, label string) error -func applyFnComApp(cid interface{}, con *connector.Connector, f fn, sfn statusfn, breakonError bool) error { +func applyFnComApp(cid interface{}, acStatus appcontext.AppContextStatus, f fn, sfn statusfn, breakonError bool) error { + con := connector.Init(cid) + //Cleanup + defer con.RemoveClient() ac := appcontext.AppContext{} - g, _ := errgroup.WithContext(context.Background()) - _, err := ac.LoadAppContext(cid) + h, err := ac.LoadAppContext(cid) + if err != nil { + return err + } + + // initialize appcontext status + err = initializeAppContextStatus(ac, acStatus) + if err != nil { + return err + } + + // initialize the resource status values before proceeding with the function + err = initializeResourceStatus(ac, acStatus) if err != nil { return err } + appsOrder, err := ac.GetAppInstruction("order") if err != nil { return err @@ -203,9 +374,9 @@ func applyFnComApp(cid interface{}, con *connector.Connector, f fn, sfn statusfn "string": appList, }) id, _ := ac.GetCompositeAppHandle() - + g, _ := errgroup.WithContext(context.Background()) + // Iterate over all the subapps for _, app := range appList["apporder"] { - appName := app results := strings.Split(id.(string), "/") label := results[2] + "-" + app @@ -214,14 +385,17 @@ func applyFnComApp(cid interface{}, con *connector.Connector, f fn, sfn statusfn if err != nil { return err } - rg, _ := errgroup.WithContext(context.Background()) + // Iterate over all clusters for k := 0; k < len(clusterNames); k++ { cluster := clusterNames[k] err = status.StartClusterWatcher(cluster) if err != nil { - log.Printf("Error starting Cluster Watcher %v: %v\n", cluster, err) + logutils.Error("Error starting Cluster Watcher", logutils.Fields{ + "error": err, + "cluster": cluster, + }) } - rg.Go(func() error { + g.Go(func() error { c, err := con.GetClient(cluster) if err != nil { logutils.Error("Error in creating kubeconfig client", logutils.Fields{ @@ -238,75 +412,86 @@ func applyFnComApp(cid interface{}, con *connector.Connector, f fn, sfn statusfn } var aov map[string][]string json.Unmarshal([]byte(resorder.(string)), &aov) - for i, res := range aov["resorder"] { - err = f(ac, c, res, appName, cluster, label) + // Keep retrying for reachability + for { + // Wait for cluster to be reachable + err = waitForClusterReady(c, cluster) if err != nil { - logutils.Error("Error in resource %s: %v", logutils.Fields{ - "error": err, - "cluster": cluster, - "resource": res, - }) - if breakonError { - // handle status tracking before exiting if at least one resource got handled - if i > 0 { - serr := sfn(c, appName, cluster, label) - if serr != nil { - logutils.Warn("Error handling status tracker", logutils.Fields{"error": serr}) + // TODO: Add error handling + return err + } + reachable := true + // Handle all resources in order + for i, res := range aov["resorder"] { + err = f(ac, c, res, appName, cluster, label) + if err != nil { + logutils.Error("Error in resource %s: %v", logutils.Fields{ + "error": err, + "cluster": cluster, + "resource": res, + }) + // If failure is due to reachability issues start retrying + if err = c.IsReachable(); err != nil { + reachable = false + break + } + if breakonError { + // handle status tracking before exiting if at least one resource got handled + if i > 0 { + serr := sfn(c, appName, cluster, label) + if serr != nil { + logutils.Warn("Error handling status tracker", logutils.Fields{"error": serr}) + } } + return err } - return err } } - } - serr := sfn(c, appName, cluster, label) - if serr != nil { - logutils.Warn("Error handling status tracker", logutils.Fields{"error": serr}) + // Check if the break from loop due to reachabilty issues + if reachable != false { + serr := sfn(c, appName, cluster, label) + if serr != nil { + logutils.Warn("Error handling status tracker", logutils.Fields{"error": serr}) + } + // Done processing cluster without errors + return nil + } } return nil }) } - if err := rg.Wait(); err != nil { - logutils.Error("Encountered error in App cluster", logutils.Fields{ - "error": err, - }) - return err - } return nil }) } + // Wait for all subtasks to complete if err := g.Wait(); err != nil { + uperr := updateEndingAppContextStatus(ac, h, true) + if uperr != nil { + logutils.Error("Encountered error updating AppContext to Failed status", logutils.Fields{"error": uperr}) + } logutils.Error("Encountered error", logutils.Fields{ "error": err, }) return err } + err = updateEndingAppContextStatus(ac, h, false) + if err != nil { + logutils.Error("Encountered error updating AppContext status", logutils.Fields{"error": err}) + return err + } return nil } // InstantiateComApp Instantiate Apps in Composite App func (instca *CompositeAppContext) InstantiateComApp(cid interface{}) error { - con := connector.Init(cid) - err := applyFnComApp(cid, con, instantiateResource, addStatusTracker, true) - if err != nil { - logutils.Error("InstantiateComApp unsuccessful", logutils.Fields{"error": err}) - return err - } - //Cleanup - con.RemoveClient() + go applyFnComApp(cid, appcontext.AppContextStatus{Status: appcontext.AppContextStatusEnum.Instantiating}, + instantiateResource, addStatusTracker, true) return nil } // TerminateComApp Terminates Apps in Composite App func (instca *CompositeAppContext) TerminateComApp(cid interface{}) error { - con := connector.Init(cid) - err := applyFnComApp(cid, con, terminateResource, deleteStatusTracker, false) - if err != nil { - logutils.Error("TerminateComApp unsuccessful", logutils.Fields{ - "error": err, - }) - return err - } - //Cleanup - con.RemoveClient() + go applyFnComApp(cid, appcontext.AppContextStatus{Status: appcontext.AppContextStatusEnum.Terminating}, + terminateResource, deleteStatusTracker, false) return nil } diff --git a/src/rsync/pkg/grpc/installappserver/installappserver.go b/src/rsync/pkg/grpc/installappserver/installappserver.go index 68118ade..d70000c0 100644 --- a/src/rsync/pkg/grpc/installappserver/installappserver.go +++ b/src/rsync/pkg/grpc/installappserver/installappserver.go @@ -17,8 +17,9 @@ import ( "context" "encoding/json" "log" - "github.com/onap/multicloud-k8s/src/rsync/pkg/grpc/installapp" + con "github.com/onap/multicloud-k8s/src/rsync/pkg/context" + "github.com/onap/multicloud-k8s/src/rsync/pkg/grpc/installapp" ) type installappServer struct { @@ -29,17 +30,17 @@ func (cs *installappServer) InstallApp(ctx context.Context, req *installapp.Inst installAppReq, _ := json.Marshal(req) log.Println("GRPC Server received installAppRequest: ", string(installAppReq)) - // Try instantiate the comp app + // Try instantiate the comp app instca := con.CompositeAppContext{} - err := instca.InstantiateComApp(req.GetAppContext()) - if err != nil { - log.Println("Instantiation failed: " + err.Error()) + err := instca.InstantiateComApp(req.GetAppContext()) + if err != nil { + log.Println("Instantiation failed: " + err.Error()) err := instca.TerminateComApp(req.GetAppContext()) if err != nil { log.Println("Termination failed: " + err.Error()) } return &installapp.InstallAppResponse{AppContextInstalled: false}, err - } + } return &installapp.InstallAppResponse{AppContextInstalled: true}, nil } diff --git a/src/rsync/pkg/status/status.go b/src/rsync/pkg/status/status.go index 8c1e12be..74334278 100644 --- a/src/rsync/pkg/status/status.go +++ b/src/rsync/pkg/status/status.go @@ -81,15 +81,17 @@ func HandleStatusUpdate(clusterId string, id string, v *v1alpha1.ResourceBundleS return } + chandle, err := ac.GetClusterHandle(result[1], clusterId) + if err != nil { + logrus.Info(clusterId, "::Error getting cluster handle::", err) + return + } // Get the handle for the context/app/cluster status object - handle, _ := ac.GetStatusHandle(result[1], clusterId) + handle, _ := ac.GetLevelHandle(chandle, "status") // If status handle was not found, then create the status object in the appcontext if handle == nil { - chandle, err := ac.GetClusterHandle(result[1], clusterId) - if err == nil { - ac.AddStatus(chandle, string(vjson)) - } + ac.AddLevelValue(chandle, "status", string(vjson)) } else { ac.UpdateStatusValue(handle, string(vjson)) } |