aboutsummaryrefslogtreecommitdiffstats
path: root/src/ncm/pkg/scheduler/scheduler.go
diff options
context:
space:
mode:
Diffstat (limited to 'src/ncm/pkg/scheduler/scheduler.go')
-rw-r--r--src/ncm/pkg/scheduler/scheduler.go288
1 files changed, 204 insertions, 84 deletions
diff --git a/src/ncm/pkg/scheduler/scheduler.go b/src/ncm/pkg/scheduler/scheduler.go
index 29d67662..516c0525 100644
--- a/src/ncm/pkg/scheduler/scheduler.go
+++ b/src/ncm/pkg/scheduler/scheduler.go
@@ -17,27 +17,32 @@
package scheduler
import (
- "context"
"encoding/json"
+ "fmt"
"time"
clusterPkg "github.com/onap/multicloud-k8s/src/clm/pkg/cluster"
- "github.com/onap/multicloud-k8s/src/ncm/internal/grpc"
oc "github.com/onap/multicloud-k8s/src/ncm/internal/ovncontroller"
ncmtypes "github.com/onap/multicloud-k8s/src/ncm/pkg/module/types"
nettypes "github.com/onap/multicloud-k8s/src/ncm/pkg/networkintents/types"
appcontext "github.com/onap/multicloud-k8s/src/orchestrator/pkg/appcontext"
+ "github.com/onap/multicloud-k8s/src/orchestrator/pkg/grpc/installappclient"
"github.com/onap/multicloud-k8s/src/orchestrator/pkg/infra/db"
log "github.com/onap/multicloud-k8s/src/orchestrator/pkg/infra/logutils"
- "github.com/onap/multicloud-k8s/src/orchestrator/pkg/infra/rpc"
- installpb "github.com/onap/multicloud-k8s/src/rsync/pkg/grpc/installapp"
+ "github.com/onap/multicloud-k8s/src/orchestrator/pkg/module/controller"
+ "github.com/onap/multicloud-k8s/src/orchestrator/pkg/state"
+ "github.com/onap/multicloud-k8s/src/orchestrator/pkg/status"
pkgerrors "github.com/pkg/errors"
)
+// rsyncName denotes the name of the rsync controller
+const rsyncName = "rsync"
+
// ClusterManager is an interface exposes the Cluster functionality
type SchedulerManager interface {
ApplyNetworkIntents(clusterProvider, cluster string) error
+ NetworkIntentsStatus(clusterProvider, cluster, qInstance, qType, qOutput string, qApps, qClusters, qResources []string) (ClusterStatus, error)
TerminateNetworkIntents(clusterProvider, cluster string) error
}
@@ -55,17 +60,106 @@ func NewSchedulerClient() *SchedulerClient {
StoreName: "cluster",
TagMeta: "clustermetadata",
TagContent: "clustercontent",
- TagContext: "clustercontext",
+ TagState: "stateInfo",
},
}
}
+// ClusterStatus holds the status data prepared for cluster network intent status queries
+type ClusterStatus struct {
+ status.StatusResult `json:",inline"`
+}
+
+func deleteAppContext(ac appcontext.AppContext) {
+ err := ac.DeleteCompositeApp()
+ if err != nil {
+ log.Warn(":: Error deleting AppContext ::", log.Fields{"Error": err})
+ }
+}
+
+/*
+queryDBAndSetRsyncInfo queries the MCO db to find the record the sync controller
+and then sets the RsyncInfo global variable.
+*/
+func queryDBAndSetRsyncInfo() (installappclient.RsyncInfo, error) {
+ client := controller.NewControllerClient()
+ vals, _ := client.GetControllers()
+ for _, v := range vals {
+ if v.Metadata.Name == rsyncName {
+ log.Info("Initializing RPC connection to resource synchronizer", log.Fields{
+ "Controller": v.Metadata.Name,
+ })
+ rsyncInfo := installappclient.NewRsyncInfo(v.Metadata.Name, v.Spec.Host, v.Spec.Port)
+ return rsyncInfo, nil
+ }
+ }
+ return installappclient.RsyncInfo{}, pkgerrors.Errorf("queryRsyncInfoInMCODB Failed - Could not get find rsync by name : %v", rsyncName)
+}
+
+/*
+callRsyncInstall method shall take in the app context id and invokes the rsync service via grpc
+*/
+func callRsyncInstall(contextid interface{}) error {
+ rsyncInfo, err := queryDBAndSetRsyncInfo()
+ log.Info("Calling the Rsync ", log.Fields{
+ "RsyncName": rsyncInfo.RsyncName,
+ })
+ if err != nil {
+ return err
+ }
+
+ appContextID := fmt.Sprintf("%v", contextid)
+ err = installappclient.InvokeInstallApp(appContextID)
+ if err != nil {
+ return err
+ }
+ return nil
+}
+
+/*
+callRsyncUninstall method shall take in the app context id and invokes the rsync service via grpc
+*/
+func callRsyncUninstall(contextid interface{}) error {
+ rsyncInfo, err := queryDBAndSetRsyncInfo()
+ log.Info("Calling the Rsync ", log.Fields{
+ "RsyncName": rsyncInfo.RsyncName,
+ })
+ if err != nil {
+ return err
+ }
+
+ appContextID := fmt.Sprintf("%v", contextid)
+ err = installappclient.InvokeUninstallApp(appContextID)
+ if err != nil {
+ return err
+ }
+ return nil
+}
+
// Apply Network Intents associated with a cluster
func (v *SchedulerClient) ApplyNetworkIntents(clusterProvider, cluster string) error {
- _, err := clusterPkg.NewClusterClient().GetClusterContext(clusterProvider, cluster)
- if err == nil {
- return pkgerrors.Errorf("Cluster network intents have already been applied: %v, %v", clusterProvider, cluster)
+ s, err := clusterPkg.NewClusterClient().GetClusterState(clusterProvider, cluster)
+ if err != nil {
+ return pkgerrors.Errorf("Error finding cluster: %v %v", clusterProvider, cluster)
+ }
+ stateVal, err := state.GetCurrentStateFromStateInfo(s)
+ if err != nil {
+ return pkgerrors.Errorf("Error getting current state from Cluster stateInfo: " + cluster)
+ }
+ switch stateVal {
+ case state.StateEnum.Approved:
+ return pkgerrors.Wrap(err, "Cluster is in an invalid state: "+cluster+" "+state.StateEnum.Approved)
+ case state.StateEnum.Terminated:
+ break
+ case state.StateEnum.Created:
+ break
+ case state.StateEnum.Applied:
+ return nil
+ case state.StateEnum.Instantiated:
+ return pkgerrors.Wrap(err, "Cluster is in an invalid state: "+cluster+" "+state.StateEnum.Instantiated)
+ default:
+ return pkgerrors.Wrap(err, "Cluster is in an invalid state: "+cluster+" "+stateVal)
}
// Make an app context for the network intent resources
@@ -76,19 +170,14 @@ func (v *SchedulerClient) ApplyNetworkIntents(clusterProvider, cluster string) e
}
handle, err := ac.CreateCompositeApp()
if err != nil {
+ deleteAppContext(ac)
return pkgerrors.Wrap(err, "Error creating AppContext CompositeApp")
}
// Add an app (fixed value) to the app context
apphandle, err := ac.AddApp(handle, nettypes.CONTEXT_CLUSTER_APP)
if err != nil {
- cleanuperr := ac.DeleteCompositeApp()
- if cleanuperr != nil {
- log.Warn("Error cleaning AppContext CompositeApp create failure", log.Fields{
- "cluster-provider": clusterProvider,
- "cluster": cluster,
- })
- }
+ deleteAppContext(ac)
return pkgerrors.Wrap(err, "Error adding App to AppContext")
}
@@ -98,28 +187,38 @@ func (v *SchedulerClient) ApplyNetworkIntents(clusterProvider, cluster string) e
}{
[]string{nettypes.CONTEXT_CLUSTER_APP},
}
- jinstr, _ := json.Marshal(appinstr)
+ jinstr, err := json.Marshal(appinstr)
+ if err != nil {
+ deleteAppContext(ac)
+ return pkgerrors.Wrap(err, "Error marshalling network intent app order instruction")
+ }
appdepinstr := struct {
Appdep map[string]string `json:"appdependency"`
}{
map[string]string{nettypes.CONTEXT_CLUSTER_APP: "go"},
}
- jdep, _ := json.Marshal(appdepinstr)
+ jdep, err := json.Marshal(appdepinstr)
+ if err != nil {
+ deleteAppContext(ac)
+ return pkgerrors.Wrap(err, "Error marshalling network intent app dependency instruction")
+ }
_, err = ac.AddInstruction(handle, "app", "order", string(jinstr))
+ if err != nil {
+ deleteAppContext(ac)
+ return pkgerrors.Wrap(err, "Error adding network intent app order instruction")
+ }
_, err = ac.AddInstruction(handle, "app", "dependency", string(jdep))
+ if err != nil {
+ deleteAppContext(ac)
+ return pkgerrors.Wrap(err, "Error adding network intent app dependency instruction")
+ }
// Add a cluster to the app
_, err = ac.AddCluster(apphandle, clusterProvider+nettypes.SEPARATOR+cluster)
if err != nil {
- cleanuperr := ac.DeleteCompositeApp()
- if cleanuperr != nil {
- log.Warn("Error cleaning AppContext after add cluster failure", log.Fields{
- "cluster-provider": clusterProvider,
- "cluster": cluster,
- })
- }
+ deleteAppContext(ac)
return pkgerrors.Wrap(err, "Error adding Cluster to AppContext")
}
@@ -129,58 +228,33 @@ func (v *SchedulerClient) ApplyNetworkIntents(clusterProvider, cluster string) e
// their own context
err = oc.Apply(ctxVal, clusterProvider, cluster)
if err != nil {
- cleanuperr := ac.DeleteCompositeApp()
- if cleanuperr != nil {
- log.Warn("Error cleaning AppContext after controller failure", log.Fields{
- "cluster-provider": clusterProvider,
- "cluster": cluster,
- })
- }
+ deleteAppContext(ac)
return pkgerrors.Wrap(err, "Error adding Cluster to AppContext")
}
- // save the context in the cluster db record
+ // call resource synchronizer to instantiate the CRs in the cluster
+ err = callRsyncInstall(ctxVal)
+ if err != nil {
+ deleteAppContext(ac)
+ return err
+ }
+
+ // update the StateInfo in the cluster db record
key := clusterPkg.ClusterKey{
ClusterProviderName: clusterProvider,
ClusterName: cluster,
}
- err = db.DBconn.Insert(v.db.StoreName, key, nil, v.db.TagContext, ctxVal)
- if err != nil {
- cleanuperr := ac.DeleteCompositeApp()
- if cleanuperr != nil {
- log.Warn("Error cleaning AppContext after DB insert failure", log.Fields{
- "cluster-provider": clusterProvider,
- "cluster": cluster,
- })
- }
- return pkgerrors.Wrap(err, "Error adding AppContext to DB")
+ a := state.ActionEntry{
+ State: state.StateEnum.Applied,
+ ContextId: ctxVal.(string),
+ TimeStamp: time.Now(),
}
+ s.Actions = append(s.Actions, a)
- // call resource synchronizer to instantiate the CRs in the cluster
- conn := rpc.GetRpcConn(grpc.RsyncName)
- if conn == nil {
- grpc.InitRsyncClient()
- conn = rpc.GetRpcConn(grpc.RsyncName)
- }
-
- var rpcClient installpb.InstallappClient
- var installRes *installpb.InstallAppResponse
- ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
- defer cancel()
-
- if conn != nil {
- rpcClient = installpb.NewInstallappClient(conn)
- installReq := new(installpb.InstallAppRequest)
- installReq.AppContext = ctxVal.(string)
- installRes, err = rpcClient.InstallApp(ctx, installReq)
- if err == nil {
- log.Info("Response from InstappApp GRPC call", log.Fields{
- "Succeeded": installRes.AppContextInstalled,
- "Message": installRes.AppContextInstallMessage,
- })
- }
- } else {
- return pkgerrors.Errorf("InstallApp Failed - Could not get InstallAppClient: %v", grpc.RsyncName)
+ err = db.DBconn.Insert(v.db.StoreName, key, nil, v.db.TagState, s)
+ if err != nil {
+ log.Warn(":: Error updating Cluster state in DB ::", log.Fields{"Error": err.Error(), "cluster": cluster, "cluster provider": clusterProvider, "AppContext": ctxVal.(string)})
+ return pkgerrors.Wrap(err, "Error updating the stateInfo of cluster after Apply on network intents: "+cluster)
}
return nil
@@ -188,33 +262,79 @@ func (v *SchedulerClient) ApplyNetworkIntents(clusterProvider, cluster string) e
// Terminate Network Intents associated with a cluster
func (v *SchedulerClient) TerminateNetworkIntents(clusterProvider, cluster string) error {
- context, err := clusterPkg.NewClusterClient().GetClusterContext(clusterProvider, cluster)
+ s, err := clusterPkg.NewClusterClient().GetClusterState(clusterProvider, cluster)
if err != nil {
- return pkgerrors.Wrapf(err, "Error finding AppContext for cluster: %v, %v", clusterProvider, cluster)
+ return pkgerrors.Wrapf(err, "Error finding StateInfo for cluster: %v, %v", clusterProvider, cluster)
+ }
+ stateVal, err := state.GetCurrentStateFromStateInfo(s)
+ if err != nil {
+ return pkgerrors.Errorf("Error getting current state from Cluster stateInfo: " + cluster)
+ }
+ switch stateVal {
+ case state.StateEnum.Approved:
+ return pkgerrors.Wrap(err, "Cluster is in an invalid state: "+cluster+" "+state.StateEnum.Approved)
+ case state.StateEnum.Terminated:
+ return nil
+ case state.StateEnum.Created:
+ return pkgerrors.Wrap(err, "Cluster network intents have not been applied: "+cluster)
+ case state.StateEnum.Applied:
+ break
+ case state.StateEnum.Instantiated:
+ return pkgerrors.Wrap(err, "Cluster is in an invalid state: "+cluster+" "+state.StateEnum.Instantiated)
+ default:
+ return pkgerrors.Wrap(err, "Cluster is in an invalid state: "+cluster+" "+stateVal)
}
- // TODO: call resource synchronizer to terminate the CRs in the cluster
-
- // remove the app context
- cleanuperr := context.DeleteCompositeApp()
- if cleanuperr != nil {
- log.Warn("Error deleted AppContext", log.Fields{
- "cluster-provider": clusterProvider,
- "cluster": cluster,
- })
+ // call resource synchronizer to terminate the CRs in the cluster
+ contextId := state.GetLastContextIdFromStateInfo(s)
+ err = callRsyncUninstall(contextId)
+ if err != nil {
+ return err
}
- // remove the app context field from the cluster db record
+ // update StateInfo
key := clusterPkg.ClusterKey{
ClusterProviderName: clusterProvider,
ClusterName: cluster,
}
- err = db.DBconn.RemoveTag(v.db.StoreName, key, v.db.TagContext)
+ a := state.ActionEntry{
+ State: state.StateEnum.Terminated,
+ ContextId: contextId,
+ TimeStamp: time.Now(),
+ }
+ s.Actions = append(s.Actions, a)
+ err = db.DBconn.Insert(v.db.StoreName, key, nil, v.db.TagState, s)
if err != nil {
- log.Warn("Error removing AppContext from Cluster document", log.Fields{
- "cluster-provider": clusterProvider,
- "cluster": cluster,
- })
+ return pkgerrors.Wrap(err, "Error updating the stateInfo of cluster: "+cluster)
}
+
return nil
}
+
+/*
+NetworkIntentsStatus takes in cluster provider, cluster and query parameters.
+This method is responsible obtaining the status of
+the cluster network intents, which is made available in the appcontext
+*/
+func (c SchedulerClient) NetworkIntentsStatus(clusterProvider, cluster, qInstance, qType, qOutput string, qApps, qClusters, qResources []string) (ClusterStatus, error) {
+
+ s, err := clusterPkg.NewClusterClient().GetClusterState(clusterProvider, cluster)
+ if err != nil {
+ return ClusterStatus{}, pkgerrors.Wrap(err, "cluster state not found")
+ }
+
+ // Prepare the apps list (just one hardcoded value)
+ allApps := make([]string, 0)
+ allApps = append(allApps, nettypes.CONTEXT_CLUSTER_APP)
+
+ statusResponse, err := status.PrepareStatusResult(s, allApps, qInstance, qType, qOutput, qApps, qClusters, qResources)
+ if err != nil {
+ return ClusterStatus{}, err
+ }
+ statusResponse.Name = clusterProvider + "+" + cluster
+ clStatus := ClusterStatus{
+ StatusResult: statusResponse,
+ }
+
+ return clStatus, nil
+}