diff options
Diffstat (limited to 'src/ncm/pkg/scheduler/scheduler.go')
-rw-r--r-- | src/ncm/pkg/scheduler/scheduler.go | 288 |
1 files changed, 204 insertions, 84 deletions
diff --git a/src/ncm/pkg/scheduler/scheduler.go b/src/ncm/pkg/scheduler/scheduler.go index 29d67662..516c0525 100644 --- a/src/ncm/pkg/scheduler/scheduler.go +++ b/src/ncm/pkg/scheduler/scheduler.go @@ -17,27 +17,32 @@ package scheduler import ( - "context" "encoding/json" + "fmt" "time" clusterPkg "github.com/onap/multicloud-k8s/src/clm/pkg/cluster" - "github.com/onap/multicloud-k8s/src/ncm/internal/grpc" oc "github.com/onap/multicloud-k8s/src/ncm/internal/ovncontroller" ncmtypes "github.com/onap/multicloud-k8s/src/ncm/pkg/module/types" nettypes "github.com/onap/multicloud-k8s/src/ncm/pkg/networkintents/types" appcontext "github.com/onap/multicloud-k8s/src/orchestrator/pkg/appcontext" + "github.com/onap/multicloud-k8s/src/orchestrator/pkg/grpc/installappclient" "github.com/onap/multicloud-k8s/src/orchestrator/pkg/infra/db" log "github.com/onap/multicloud-k8s/src/orchestrator/pkg/infra/logutils" - "github.com/onap/multicloud-k8s/src/orchestrator/pkg/infra/rpc" - installpb "github.com/onap/multicloud-k8s/src/rsync/pkg/grpc/installapp" + "github.com/onap/multicloud-k8s/src/orchestrator/pkg/module/controller" + "github.com/onap/multicloud-k8s/src/orchestrator/pkg/state" + "github.com/onap/multicloud-k8s/src/orchestrator/pkg/status" pkgerrors "github.com/pkg/errors" ) +// rsyncName denotes the name of the rsync controller +const rsyncName = "rsync" + // ClusterManager is an interface exposes the Cluster functionality type SchedulerManager interface { ApplyNetworkIntents(clusterProvider, cluster string) error + NetworkIntentsStatus(clusterProvider, cluster, qInstance, qType, qOutput string, qApps, qClusters, qResources []string) (ClusterStatus, error) TerminateNetworkIntents(clusterProvider, cluster string) error } @@ -55,17 +60,106 @@ func NewSchedulerClient() *SchedulerClient { StoreName: "cluster", TagMeta: "clustermetadata", TagContent: "clustercontent", - TagContext: "clustercontext", + TagState: "stateInfo", }, } } +// ClusterStatus holds the status data prepared for cluster network intent status queries +type ClusterStatus struct { + status.StatusResult `json:",inline"` +} + +func deleteAppContext(ac appcontext.AppContext) { + err := ac.DeleteCompositeApp() + if err != nil { + log.Warn(":: Error deleting AppContext ::", log.Fields{"Error": err}) + } +} + +/* +queryDBAndSetRsyncInfo queries the MCO db to find the record the sync controller +and then sets the RsyncInfo global variable. +*/ +func queryDBAndSetRsyncInfo() (installappclient.RsyncInfo, error) { + client := controller.NewControllerClient() + vals, _ := client.GetControllers() + for _, v := range vals { + if v.Metadata.Name == rsyncName { + log.Info("Initializing RPC connection to resource synchronizer", log.Fields{ + "Controller": v.Metadata.Name, + }) + rsyncInfo := installappclient.NewRsyncInfo(v.Metadata.Name, v.Spec.Host, v.Spec.Port) + return rsyncInfo, nil + } + } + return installappclient.RsyncInfo{}, pkgerrors.Errorf("queryRsyncInfoInMCODB Failed - Could not get find rsync by name : %v", rsyncName) +} + +/* +callRsyncInstall method shall take in the app context id and invokes the rsync service via grpc +*/ +func callRsyncInstall(contextid interface{}) error { + rsyncInfo, err := queryDBAndSetRsyncInfo() + log.Info("Calling the Rsync ", log.Fields{ + "RsyncName": rsyncInfo.RsyncName, + }) + if err != nil { + return err + } + + appContextID := fmt.Sprintf("%v", contextid) + err = installappclient.InvokeInstallApp(appContextID) + if err != nil { + return err + } + return nil +} + +/* +callRsyncUninstall method shall take in the app context id and invokes the rsync service via grpc +*/ +func callRsyncUninstall(contextid interface{}) error { + rsyncInfo, err := queryDBAndSetRsyncInfo() + log.Info("Calling the Rsync ", log.Fields{ + "RsyncName": rsyncInfo.RsyncName, + }) + if err != nil { + return err + } + + appContextID := fmt.Sprintf("%v", contextid) + err = installappclient.InvokeUninstallApp(appContextID) + if err != nil { + return err + } + return nil +} + // Apply Network Intents associated with a cluster func (v *SchedulerClient) ApplyNetworkIntents(clusterProvider, cluster string) error { - _, err := clusterPkg.NewClusterClient().GetClusterContext(clusterProvider, cluster) - if err == nil { - return pkgerrors.Errorf("Cluster network intents have already been applied: %v, %v", clusterProvider, cluster) + s, err := clusterPkg.NewClusterClient().GetClusterState(clusterProvider, cluster) + if err != nil { + return pkgerrors.Errorf("Error finding cluster: %v %v", clusterProvider, cluster) + } + stateVal, err := state.GetCurrentStateFromStateInfo(s) + if err != nil { + return pkgerrors.Errorf("Error getting current state from Cluster stateInfo: " + cluster) + } + switch stateVal { + case state.StateEnum.Approved: + return pkgerrors.Wrap(err, "Cluster is in an invalid state: "+cluster+" "+state.StateEnum.Approved) + case state.StateEnum.Terminated: + break + case state.StateEnum.Created: + break + case state.StateEnum.Applied: + return nil + case state.StateEnum.Instantiated: + return pkgerrors.Wrap(err, "Cluster is in an invalid state: "+cluster+" "+state.StateEnum.Instantiated) + default: + return pkgerrors.Wrap(err, "Cluster is in an invalid state: "+cluster+" "+stateVal) } // Make an app context for the network intent resources @@ -76,19 +170,14 @@ func (v *SchedulerClient) ApplyNetworkIntents(clusterProvider, cluster string) e } handle, err := ac.CreateCompositeApp() if err != nil { + deleteAppContext(ac) return pkgerrors.Wrap(err, "Error creating AppContext CompositeApp") } // Add an app (fixed value) to the app context apphandle, err := ac.AddApp(handle, nettypes.CONTEXT_CLUSTER_APP) if err != nil { - cleanuperr := ac.DeleteCompositeApp() - if cleanuperr != nil { - log.Warn("Error cleaning AppContext CompositeApp create failure", log.Fields{ - "cluster-provider": clusterProvider, - "cluster": cluster, - }) - } + deleteAppContext(ac) return pkgerrors.Wrap(err, "Error adding App to AppContext") } @@ -98,28 +187,38 @@ func (v *SchedulerClient) ApplyNetworkIntents(clusterProvider, cluster string) e }{ []string{nettypes.CONTEXT_CLUSTER_APP}, } - jinstr, _ := json.Marshal(appinstr) + jinstr, err := json.Marshal(appinstr) + if err != nil { + deleteAppContext(ac) + return pkgerrors.Wrap(err, "Error marshalling network intent app order instruction") + } appdepinstr := struct { Appdep map[string]string `json:"appdependency"` }{ map[string]string{nettypes.CONTEXT_CLUSTER_APP: "go"}, } - jdep, _ := json.Marshal(appdepinstr) + jdep, err := json.Marshal(appdepinstr) + if err != nil { + deleteAppContext(ac) + return pkgerrors.Wrap(err, "Error marshalling network intent app dependency instruction") + } _, err = ac.AddInstruction(handle, "app", "order", string(jinstr)) + if err != nil { + deleteAppContext(ac) + return pkgerrors.Wrap(err, "Error adding network intent app order instruction") + } _, err = ac.AddInstruction(handle, "app", "dependency", string(jdep)) + if err != nil { + deleteAppContext(ac) + return pkgerrors.Wrap(err, "Error adding network intent app dependency instruction") + } // Add a cluster to the app _, err = ac.AddCluster(apphandle, clusterProvider+nettypes.SEPARATOR+cluster) if err != nil { - cleanuperr := ac.DeleteCompositeApp() - if cleanuperr != nil { - log.Warn("Error cleaning AppContext after add cluster failure", log.Fields{ - "cluster-provider": clusterProvider, - "cluster": cluster, - }) - } + deleteAppContext(ac) return pkgerrors.Wrap(err, "Error adding Cluster to AppContext") } @@ -129,58 +228,33 @@ func (v *SchedulerClient) ApplyNetworkIntents(clusterProvider, cluster string) e // their own context err = oc.Apply(ctxVal, clusterProvider, cluster) if err != nil { - cleanuperr := ac.DeleteCompositeApp() - if cleanuperr != nil { - log.Warn("Error cleaning AppContext after controller failure", log.Fields{ - "cluster-provider": clusterProvider, - "cluster": cluster, - }) - } + deleteAppContext(ac) return pkgerrors.Wrap(err, "Error adding Cluster to AppContext") } - // save the context in the cluster db record + // call resource synchronizer to instantiate the CRs in the cluster + err = callRsyncInstall(ctxVal) + if err != nil { + deleteAppContext(ac) + return err + } + + // update the StateInfo in the cluster db record key := clusterPkg.ClusterKey{ ClusterProviderName: clusterProvider, ClusterName: cluster, } - err = db.DBconn.Insert(v.db.StoreName, key, nil, v.db.TagContext, ctxVal) - if err != nil { - cleanuperr := ac.DeleteCompositeApp() - if cleanuperr != nil { - log.Warn("Error cleaning AppContext after DB insert failure", log.Fields{ - "cluster-provider": clusterProvider, - "cluster": cluster, - }) - } - return pkgerrors.Wrap(err, "Error adding AppContext to DB") + a := state.ActionEntry{ + State: state.StateEnum.Applied, + ContextId: ctxVal.(string), + TimeStamp: time.Now(), } + s.Actions = append(s.Actions, a) - // call resource synchronizer to instantiate the CRs in the cluster - conn := rpc.GetRpcConn(grpc.RsyncName) - if conn == nil { - grpc.InitRsyncClient() - conn = rpc.GetRpcConn(grpc.RsyncName) - } - - var rpcClient installpb.InstallappClient - var installRes *installpb.InstallAppResponse - ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) - defer cancel() - - if conn != nil { - rpcClient = installpb.NewInstallappClient(conn) - installReq := new(installpb.InstallAppRequest) - installReq.AppContext = ctxVal.(string) - installRes, err = rpcClient.InstallApp(ctx, installReq) - if err == nil { - log.Info("Response from InstappApp GRPC call", log.Fields{ - "Succeeded": installRes.AppContextInstalled, - "Message": installRes.AppContextInstallMessage, - }) - } - } else { - return pkgerrors.Errorf("InstallApp Failed - Could not get InstallAppClient: %v", grpc.RsyncName) + err = db.DBconn.Insert(v.db.StoreName, key, nil, v.db.TagState, s) + if err != nil { + log.Warn(":: Error updating Cluster state in DB ::", log.Fields{"Error": err.Error(), "cluster": cluster, "cluster provider": clusterProvider, "AppContext": ctxVal.(string)}) + return pkgerrors.Wrap(err, "Error updating the stateInfo of cluster after Apply on network intents: "+cluster) } return nil @@ -188,33 +262,79 @@ func (v *SchedulerClient) ApplyNetworkIntents(clusterProvider, cluster string) e // Terminate Network Intents associated with a cluster func (v *SchedulerClient) TerminateNetworkIntents(clusterProvider, cluster string) error { - context, err := clusterPkg.NewClusterClient().GetClusterContext(clusterProvider, cluster) + s, err := clusterPkg.NewClusterClient().GetClusterState(clusterProvider, cluster) if err != nil { - return pkgerrors.Wrapf(err, "Error finding AppContext for cluster: %v, %v", clusterProvider, cluster) + return pkgerrors.Wrapf(err, "Error finding StateInfo for cluster: %v, %v", clusterProvider, cluster) + } + stateVal, err := state.GetCurrentStateFromStateInfo(s) + if err != nil { + return pkgerrors.Errorf("Error getting current state from Cluster stateInfo: " + cluster) + } + switch stateVal { + case state.StateEnum.Approved: + return pkgerrors.Wrap(err, "Cluster is in an invalid state: "+cluster+" "+state.StateEnum.Approved) + case state.StateEnum.Terminated: + return nil + case state.StateEnum.Created: + return pkgerrors.Wrap(err, "Cluster network intents have not been applied: "+cluster) + case state.StateEnum.Applied: + break + case state.StateEnum.Instantiated: + return pkgerrors.Wrap(err, "Cluster is in an invalid state: "+cluster+" "+state.StateEnum.Instantiated) + default: + return pkgerrors.Wrap(err, "Cluster is in an invalid state: "+cluster+" "+stateVal) } - // TODO: call resource synchronizer to terminate the CRs in the cluster - - // remove the app context - cleanuperr := context.DeleteCompositeApp() - if cleanuperr != nil { - log.Warn("Error deleted AppContext", log.Fields{ - "cluster-provider": clusterProvider, - "cluster": cluster, - }) + // call resource synchronizer to terminate the CRs in the cluster + contextId := state.GetLastContextIdFromStateInfo(s) + err = callRsyncUninstall(contextId) + if err != nil { + return err } - // remove the app context field from the cluster db record + // update StateInfo key := clusterPkg.ClusterKey{ ClusterProviderName: clusterProvider, ClusterName: cluster, } - err = db.DBconn.RemoveTag(v.db.StoreName, key, v.db.TagContext) + a := state.ActionEntry{ + State: state.StateEnum.Terminated, + ContextId: contextId, + TimeStamp: time.Now(), + } + s.Actions = append(s.Actions, a) + err = db.DBconn.Insert(v.db.StoreName, key, nil, v.db.TagState, s) if err != nil { - log.Warn("Error removing AppContext from Cluster document", log.Fields{ - "cluster-provider": clusterProvider, - "cluster": cluster, - }) + return pkgerrors.Wrap(err, "Error updating the stateInfo of cluster: "+cluster) } + return nil } + +/* +NetworkIntentsStatus takes in cluster provider, cluster and query parameters. +This method is responsible obtaining the status of +the cluster network intents, which is made available in the appcontext +*/ +func (c SchedulerClient) NetworkIntentsStatus(clusterProvider, cluster, qInstance, qType, qOutput string, qApps, qClusters, qResources []string) (ClusterStatus, error) { + + s, err := clusterPkg.NewClusterClient().GetClusterState(clusterProvider, cluster) + if err != nil { + return ClusterStatus{}, pkgerrors.Wrap(err, "cluster state not found") + } + + // Prepare the apps list (just one hardcoded value) + allApps := make([]string, 0) + allApps = append(allApps, nettypes.CONTEXT_CLUSTER_APP) + + statusResponse, err := status.PrepareStatusResult(s, allApps, qInstance, qType, qOutput, qApps, qClusters, qResources) + if err != nil { + return ClusterStatus{}, err + } + statusResponse.Name = clusterProvider + "+" + cluster + clStatus := ClusterStatus{ + StatusResult: statusResponse, + } + + return clStatus, nil +} |