summaryrefslogtreecommitdiffstats
path: root/src/rsync/pkg/context/context.go
diff options
context:
space:
mode:
Diffstat (limited to 'src/rsync/pkg/context/context.go')
-rw-r--r--src/rsync/pkg/context/context.go621
1 files changed, 207 insertions, 414 deletions
diff --git a/src/rsync/pkg/context/context.go b/src/rsync/pkg/context/context.go
index 7e0fce3c..cc7773b8 100644
--- a/src/rsync/pkg/context/context.go
+++ b/src/rsync/pkg/context/context.go
@@ -17,503 +17,296 @@
package context
import (
+ "context"
"encoding/json"
"fmt"
"log"
"strings"
- "sync"
"github.com/onap/multicloud-k8s/src/orchestrator/pkg/appcontext"
- "github.com/onap/multicloud-k8s/src/rsync/pkg/app"
- con "github.com/onap/multicloud-k8s/src/rsync/pkg/connector"
- res "github.com/onap/multicloud-k8s/src/rsync/pkg/resource"
+ "github.com/onap/multicloud-k8s/src/orchestrator/pkg/infra/logutils"
+ kubeclient "github.com/onap/multicloud-k8s/src/rsync/pkg/client"
+ connector "github.com/onap/multicloud-k8s/src/rsync/pkg/connector"
+ utils "github.com/onap/multicloud-k8s/src/rsync/pkg/internal"
status "github.com/onap/multicloud-k8s/src/rsync/pkg/status"
pkgerrors "github.com/pkg/errors"
+ "golang.org/x/sync/errgroup"
+ "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
)
type CompositeAppContext struct {
- cid interface{}
- appsorder string
- appsdependency string
- appsmap []instMap
+ cid interface{}
}
-type clusterInfo struct {
- name string
- resorder string
- resdependency string
- ressmap []instMap
-}
-type instMap struct {
- name string
- depinfo string
- status string
- rerr error
- clusters []clusterInfo
-}
-
-func getInstMap(order string, dependency string, level string) ([]instMap, error) {
-
- if order == "" {
- return nil, pkgerrors.Errorf("Not a valid order value")
- }
- if dependency == "" {
- return nil, pkgerrors.Errorf("Not a valid dependency value")
- }
- if !(level == "app" || level == "res") {
- return nil, pkgerrors.Errorf("Not a valid level name given to create map")
- }
-
- var aov map[string]interface{}
- json.Unmarshal([]byte(order), &aov)
-
- s := fmt.Sprintf("%vorder", level)
- appso := aov[s].([]interface{})
- var instmap = make([]instMap, len(appso))
-
- var adv map[string]interface{}
- json.Unmarshal([]byte(dependency), &adv)
- s = fmt.Sprintf("%vdependency", level)
- appsd := adv[s].(map[string]interface{})
- for i, u := range appso {
- instmap[i] = instMap{u.(string), appsd[u.(string)].(string), "none", nil, nil}
- }
-
- return instmap, nil
-}
-
-func deleteResource(clustername string, resname string, respath string) error {
- k8sClient := app.KubernetesClient{}
- err := k8sClient.Init(clustername, resname)
+func getRes(ac appcontext.AppContext, name string, app string, cluster string) ([]byte, error) {
+ var byteRes []byte
+ rh, err := ac.GetResourceHandle(app, cluster, name)
if err != nil {
- log.Println("Init failed: " + err.Error())
- return err
+ return nil, err
}
-
- var c con.KubernetesConnector
- c = &k8sClient
- var gp res.Resource
- err = gp.Delete(respath, resname, "default", c)
+ resval, err := ac.GetValue(rh)
if err != nil {
- log.Println("Delete resource failed: " + err.Error() + resname)
- return err
+ return nil, err
}
- log.Println("Resource succesfully deleted", resname)
- return nil
-
+ if resval != "" {
+ result := strings.Split(name, "+")
+ if result[0] == "" {
+ return nil, pkgerrors.Errorf("Resource name is nil %s:", name)
+ }
+ byteRes = []byte(fmt.Sprintf("%v", resval.(interface{})))
+ } else {
+ return nil, pkgerrors.Errorf("Resource value is nil %s", name)
+ }
+ return byteRes, nil
}
-func createResource(clustername string, resname string, respath string, label string) error {
- k8sClient := app.KubernetesClient{}
- err := k8sClient.Init(clustername, resname)
+func terminateResource(ac appcontext.AppContext, c *kubeclient.Client, name string, app string, cluster string, label string) error {
+ res, err := getRes(ac, name, app, cluster)
if err != nil {
- log.Println("Client init failed: " + err.Error())
return err
}
-
- var c con.KubernetesConnector
- c = &k8sClient
- var gp res.Resource
- _, err = gp.Create(respath, "default", label, c)
- if err != nil {
- log.Println("Create failed: " + err.Error() + resname)
+ if err := c.Delete(res); err != nil {
+ logutils.Error("Failed to delete res", logutils.Fields{
+ "error": err,
+ "resource": name,
+ })
return err
}
- log.Println("Resource succesfully created", resname)
+ logutils.Info("Deleted::", logutils.Fields{
+ "cluster": cluster,
+ "resource": name,
+ })
return nil
-
}
-func terminateResource(ac appcontext.AppContext, resmap instMap, appname string, clustername string) error {
-
- rh, err := ac.GetResourceHandle(appname, clustername, resmap.name)
+func instantiateResource(ac appcontext.AppContext, c *kubeclient.Client, name string, app string, cluster string, label string) error {
+ res, err := getRes(ac, name, app, cluster)
if err != nil {
return err
}
-
- resval, err := ac.GetValue(rh)
+ //Decode the yaml to create a runtime.Object
+ unstruct := &unstructured.Unstructured{}
+ //Ignore the returned obj as we expect the data in unstruct
+ _, err = utils.DecodeYAMLData(string(res), unstruct)
if err != nil {
- return err
+ return pkgerrors.Wrap(err, "Decode deployment object error")
}
- if resval != "" {
- result := strings.Split(resmap.name, "+")
- if result[0] == "" {
- return pkgerrors.Errorf("Resource name is nil")
- }
- err = deleteResource(clustername, result[0], resval.(string))
- if err != nil {
- return err
- }
- } else {
- return pkgerrors.Errorf("Resource value is nil")
+ //Add the tracking label to all resources created here
+ labels := unstruct.GetLabels()
+ //Check if labels exist for this object
+ if labels == nil {
+ labels = map[string]string{}
}
+ //labels[config.GetConfiguration().KubernetesLabelName] = client.GetInstanceID()
+ labels["emco/deployment-id"] = label
+ unstruct.SetLabels(labels)
- return nil
-
-}
-
-func instantiateResource(ac appcontext.AppContext, resmap instMap, appname string, clustername string, label string) error {
- rh, err := ac.GetResourceHandle(appname, clustername, resmap.name)
+ // This checks if the resource we are creating has a podSpec in it
+ // Eg: Deployment, StatefulSet, Job etc..
+ // If a PodSpec is found, the label will be added to it too.
+ //connector.TagPodsIfPresent(unstruct, client.GetInstanceID())
+ utils.TagPodsIfPresent(unstruct, label)
+ b, err := unstruct.MarshalJSON()
if err != nil {
+ logutils.Error("Failed to MarshalJSON", logutils.Fields{
+ "error": err,
+ "resource": name,
+ })
return err
}
-
- resval, err := ac.GetValue(rh)
- if err != nil {
+ if err := c.Apply(b); err != nil {
+ logutils.Error("Failed to apply res", logutils.Fields{
+ "error": err,
+ "resource": name,
+ })
return err
}
-
- if resval != "" {
- result := strings.Split(resmap.name, "+")
- if result[0] == "" {
- return pkgerrors.Errorf("Resource name is nil")
- }
- err = createResource(clustername, result[0], resval.(string), label)
- if err != nil {
- return err
- }
- } else {
- return pkgerrors.Errorf("Resource value is nil")
- }
-
+ logutils.Info("Installed::", logutils.Fields{
+ "cluster": cluster,
+ "resource": name,
+ })
return nil
-
}
-func terminateResources(ac appcontext.AppContext, ressmap []instMap, appname string, clustername string) error {
- var wg sync.WaitGroup
- var chans = make([]chan int, len(ressmap))
- for l := range chans {
- chans[l] = make(chan int)
- }
- for i := 0; i < len(ressmap); i++ {
- wg.Add(1)
- go func(index int) {
- if ressmap[index].depinfo == "go" {
- ressmap[index].status = "start"
- } else {
- ressmap[index].status = "waiting"
- c := <-chans[index]
- if c != index {
- ressmap[index].status = "error"
- ressmap[index].rerr = pkgerrors.Errorf("channel does not match")
- wg.Done()
- return
- }
- ressmap[index].status = "start"
- }
- ressmap[index].rerr = terminateResource(ac, ressmap[index], appname, clustername)
- ressmap[index].status = "done"
- waitstr := fmt.Sprintf("wait on %v", ressmap[index].name)
- for j := 0; j < len(ressmap); j++ {
- if ressmap[j].depinfo == waitstr {
- chans[j] <- j
- }
- }
- wg.Done()
- }(i)
- }
- wg.Wait()
- for k := 0; k < len(ressmap); k++ {
- if ressmap[k].rerr != nil {
- return pkgerrors.Errorf("error during resources termination")
- }
- }
- return nil
+func addStatusTracker(c *kubeclient.Client, app string, cluster string, label string) error {
-}
-
-func instantiateResources(ac appcontext.AppContext, ressmap []instMap, appname string, clustername string) error {
- var wg sync.WaitGroup
- var chans = make([]chan int, len(ressmap))
- cid, _ := ac.GetCompositeAppHandle()
-
- results := strings.Split(cid.(string), "/")
- label := results[2] + "-" + appname
-
- for l := range chans {
- chans[l] = make(chan int)
- }
- for i := 0; i < len(ressmap); i++ {
- wg.Add(1)
- go func(index int) {
- if ressmap[index].depinfo == "go" {
- ressmap[index].status = "start"
- } else {
- ressmap[index].status = "waiting"
- c := <-chans[index]
- if c != index {
- ressmap[index].status = "error"
- ressmap[index].rerr = pkgerrors.Errorf("channel does not match")
- wg.Done()
- return
- }
- ressmap[index].status = "start"
- }
- ressmap[index].rerr = instantiateResource(ac, ressmap[index], appname, clustername, label)
- ressmap[index].status = "done"
- waitstr := fmt.Sprintf("wait on %v", ressmap[index].name)
- for j := 0; j < len(ressmap); j++ {
- if ressmap[j].depinfo == waitstr {
- chans[j] <- j
- }
- }
- wg.Done()
- }(i)
+ b, err := status.GetStatusCR(label)
+ if err != nil {
+ logutils.Error("Failed to get status CR for installing", logutils.Fields{
+ "error": err,
+ "label": label,
+ })
+ return err
}
- wg.Wait()
- for k := 0; k < len(ressmap); k++ {
- if ressmap[k].rerr != nil {
- return pkgerrors.Errorf("error during resources instantiation")
- }
+ if err = c.Apply(b); err != nil {
+ logutils.Error("Failed to apply status tracker", logutils.Fields{
+ "error": err,
+ "cluster": cluster,
+ "app": app,
+ "label": label,
+ })
+ return err
}
+ logutils.Info("Status tracker installed::", logutils.Fields{
+ "cluster": cluster,
+ "app": app,
+ "label": label,
+ })
return nil
-
}
-func terminateApp(ac appcontext.AppContext, appmap instMap) error {
-
- for i := 0; i < len(appmap.clusters); i++ {
- err := terminateResources(ac, appmap.clusters[i].ressmap, appmap.name,
- appmap.clusters[i].name)
- if err != nil {
- return err
- }
+func deleteStatusTracker(c *kubeclient.Client, app string, cluster string, label string) error {
+ b, err := status.GetStatusCR(label)
+ if err != nil {
+ logutils.Error("Failed to get status CR for deleting", logutils.Fields{
+ "error": err,
+ "label": label,
+ })
+ return err
}
- log.Println("Termination of app done: " + appmap.name)
-
- return nil
-
-}
-
-func instantiateApp(ac appcontext.AppContext, appmap instMap) error {
-
- for i := 0; i < len(appmap.clusters); i++ {
- err := instantiateResources(ac, appmap.clusters[i].ressmap, appmap.name,
- appmap.clusters[i].name)
- if err != nil {
- return err
- }
- err = status.StartClusterWatcher(appmap.clusters[i].name)
- if err != nil {
- log.Printf("Error starting Cluster Watcher %v: %v\n", appmap.clusters[i], err)
- }
+ if err = c.Delete(b); err != nil {
+ logutils.Error("Failed to delete res", logutils.Fields{
+ "error": err,
+ "app": app,
+ "label": label,
+ })
+ return err
}
- log.Println("Instantiation of app done: " + appmap.name)
+ logutils.Info("Status tracker deleted::", logutils.Fields{
+ "cluster": cluster,
+ "app": app,
+ "label": label,
+ })
return nil
-
}
-func instantiateApps(ac appcontext.AppContext, appsmap []instMap) error {
- var wg sync.WaitGroup
- var chans = make([]chan int, len(appsmap))
- for l := range chans {
- chans[l] = make(chan int)
- }
- for i := 0; i < len(appsmap); i++ {
- wg.Add(1)
- go func(index int) {
- if appsmap[index].depinfo == "go" {
- appsmap[index].status = "start"
- } else {
- appsmap[index].status = "waiting"
- c := <-chans[index]
- if c != index {
- appsmap[index].status = "error"
- appsmap[index].rerr = pkgerrors.Errorf("channel does not match")
- wg.Done()
- return
- }
- appsmap[index].status = "start"
- }
- appsmap[index].rerr = instantiateApp(ac, appsmap[index])
- appsmap[index].status = "done"
- waitstr := fmt.Sprintf("wait on %v", appsmap[index].name)
- for j := 0; j < len(appsmap); j++ {
- if appsmap[j].depinfo == waitstr {
- chans[j] <- j
- }
- }
- wg.Done()
- }(i)
- }
- wg.Wait()
- for k := 0; k < len(appsmap); k++ {
- if appsmap[k].rerr != nil {
- return pkgerrors.Errorf("error during apps instantiation")
- }
- }
- return nil
+type fn func(ac appcontext.AppContext, client *kubeclient.Client, res string, app string, cluster string, label string) error
-}
+type statusfn func(client *kubeclient.Client, app string, cluster string, label string) error
-func (instca *CompositeAppContext) InstantiateComApp(cid interface{}) error {
+func applyFnComApp(cid interface{}, con *connector.Connector, f fn, sfn statusfn, breakonError bool) error {
ac := appcontext.AppContext{}
-
+ g, _ := errgroup.WithContext(context.Background())
_, err := ac.LoadAppContext(cid)
if err != nil {
return err
}
- instca.cid = cid
-
- appsorder, err := ac.GetAppInstruction("order")
+ appsOrder, err := ac.GetAppInstruction("order")
if err != nil {
return err
}
- instca.appsorder = appsorder.(string)
- appsdependency, err := ac.GetAppInstruction("dependency")
- if err != nil {
- return err
- }
- instca.appsdependency = appsdependency.(string)
- instca.appsmap, err = getInstMap(instca.appsorder, instca.appsdependency, "app")
- if err != nil {
- return err
- }
-
- for j := 0; j < len(instca.appsmap); j++ {
- clusternames, err := ac.GetClusterNames(instca.appsmap[j].name)
- if err != nil {
- return err
- }
- instca.appsmap[j].clusters = make([]clusterInfo, len(clusternames))
- for k := 0; k < len(clusternames); k++ {
- instca.appsmap[j].clusters[k].name = clusternames[k]
- resorder, err := ac.GetResourceInstruction(
- instca.appsmap[j].name, clusternames[k], "order")
+ var appList map[string][]string
+ json.Unmarshal([]byte(appsOrder.(string)), &appList)
+ logutils.Info("appsorder ", logutils.Fields{
+ "appsorder": appsOrder,
+ "string": appList,
+ })
+ id, _ := ac.GetCompositeAppHandle()
+
+ for _, app := range appList["apporder"] {
+
+ appName := app
+ results := strings.Split(id.(string), "/")
+ label := results[2] + "-" + app
+ g.Go(func() error {
+ clusterNames, err := ac.GetClusterNames(appName)
if err != nil {
return err
}
- instca.appsmap[j].clusters[k].resorder = resorder.(string)
-
- resdependency, err := ac.GetResourceInstruction(
- instca.appsmap[j].name, clusternames[k], "dependency")
- if err != nil {
- return err
+ rg, _ := errgroup.WithContext(context.Background())
+ for k := 0; k < len(clusterNames); k++ {
+ cluster := clusterNames[k]
+ err = status.StartClusterWatcher(cluster)
+ if err != nil {
+ log.Printf("Error starting Cluster Watcher %v: %v\n", cluster, err)
+ }
+ rg.Go(func() error {
+ c, err := con.GetClient(cluster)
+ if err != nil {
+ logutils.Error("Error in creating kubeconfig client", logutils.Fields{
+ "error": err,
+ "cluster": cluster,
+ "appName": appName,
+ })
+ return err
+ }
+ resorder, err := ac.GetResourceInstruction(appName, cluster, "order")
+ if err != nil {
+ logutils.Error("Resorder error ", logutils.Fields{"error": err})
+ return err
+ }
+ var aov map[string][]string
+ json.Unmarshal([]byte(resorder.(string)), &aov)
+ for i, res := range aov["resorder"] {
+ err = f(ac, c, res, appName, cluster, label)
+ if err != nil {
+ logutils.Error("Error in resource %s: %v", logutils.Fields{
+ "error": err,
+ "cluster": cluster,
+ "resource": res,
+ })
+ if breakonError {
+ // handle status tracking before exiting if at least one resource got handled
+ if i > 0 {
+ serr := sfn(c, appName, cluster, label)
+ if serr != nil {
+ logutils.Warn("Error handling status tracker", logutils.Fields{"error": serr})
+ }
+ }
+ return err
+ }
+ }
+ }
+ serr := sfn(c, appName, cluster, label)
+ if serr != nil {
+ logutils.Warn("Error handling status tracker", logutils.Fields{"error": serr})
+ }
+ return nil
+ })
}
- instca.appsmap[j].clusters[k].resdependency = resdependency.(string)
-
- instca.appsmap[j].clusters[k].ressmap, err = getInstMap(
- instca.appsmap[j].clusters[k].resorder,
- instca.appsmap[j].clusters[k].resdependency, "res")
- if err != nil {
+ if err := rg.Wait(); err != nil {
+ logutils.Error("Encountered error in App cluster", logutils.Fields{
+ "error": err,
+ })
return err
}
- }
+ return nil
+ })
}
- err = instantiateApps(ac, instca.appsmap)
- if err != nil {
+ if err := g.Wait(); err != nil {
+ logutils.Error("Encountered error", logutils.Fields{
+ "error": err,
+ })
return err
}
-
return nil
}
-// Delete all the apps
-func terminateApps(ac appcontext.AppContext, appsmap []instMap) error {
- var wg sync.WaitGroup
- var chans = make([]chan int, len(appsmap))
- for l := range chans {
- chans[l] = make(chan int)
- }
- for i := 0; i < len(appsmap); i++ {
- wg.Add(1)
- go func(index int) {
- if appsmap[index].depinfo == "go" {
- appsmap[index].status = "start"
- } else {
- appsmap[index].status = "waiting"
- c := <-chans[index]
- if c != index {
- appsmap[index].status = "error"
- appsmap[index].rerr = pkgerrors.Errorf("channel does not match")
- wg.Done()
- return
- }
- appsmap[index].status = "start"
- }
- appsmap[index].rerr = terminateApp(ac, appsmap[index])
- appsmap[index].status = "done"
- waitstr := fmt.Sprintf("wait on %v", appsmap[index].name)
- for j := 0; j < len(appsmap); j++ {
- if appsmap[j].depinfo == waitstr {
- chans[j] <- j
- }
- }
- wg.Done()
- }(i)
- }
- wg.Wait()
- for k := 0; k < len(appsmap); k++ {
- if appsmap[k].rerr != nil {
- return pkgerrors.Errorf("error during apps instantiation")
- }
+// InstantiateComApp Instantiate Apps in Composite App
+func (instca *CompositeAppContext) InstantiateComApp(cid interface{}) error {
+ con := connector.Init(cid)
+ err := applyFnComApp(cid, con, instantiateResource, addStatusTracker, true)
+ if err != nil {
+ logutils.Error("InstantiateComApp unsuccessful", logutils.Fields{"error": err})
+ return err
}
+ //Cleanup
+ con.RemoveClient()
return nil
-
}
-// Delete all the resources for a given context
+// TerminateComApp Terminates Apps in Composite App
func (instca *CompositeAppContext) TerminateComApp(cid interface{}) error {
- ac := appcontext.AppContext{}
-
- _, err := ac.LoadAppContext(cid)
- if err != nil {
- return err
- }
- instca.cid = cid
-
- appsorder, err := ac.GetAppInstruction("order")
- if err != nil {
- return err
- }
- instca.appsorder = appsorder.(string)
- appsdependency, err := ac.GetAppInstruction("dependency")
- if err != nil {
- return err
- }
- instca.appsdependency = appsdependency.(string)
- instca.appsmap, err = getInstMap(instca.appsorder, instca.appsdependency, "app")
+ con := connector.Init(cid)
+ err := applyFnComApp(cid, con, terminateResource, deleteStatusTracker, false)
if err != nil {
+ logutils.Error("TerminateComApp unsuccessful", logutils.Fields{
+ "error": err,
+ })
return err
}
-
- for j := 0; j < len(instca.appsmap); j++ {
- clusternames, err := ac.GetClusterNames(instca.appsmap[j].name)
- if err != nil {
- return err
- }
- instca.appsmap[j].clusters = make([]clusterInfo, len(clusternames))
- for k := 0; k < len(clusternames); k++ {
- instca.appsmap[j].clusters[k].name = clusternames[k]
- resorder, err := ac.GetResourceInstruction(
- instca.appsmap[j].name, clusternames[k], "order")
- if err != nil {
- return err
- }
- instca.appsmap[j].clusters[k].resorder = resorder.(string)
-
- resdependency, err := ac.GetResourceInstruction(
- instca.appsmap[j].name, clusternames[k], "dependency")
- if err != nil {
- return err
- }
- instca.appsmap[j].clusters[k].resdependency = resdependency.(string)
-
- instca.appsmap[j].clusters[k].ressmap, err = getInstMap(
- instca.appsmap[j].clusters[k].resorder,
- instca.appsmap[j].clusters[k].resdependency, "res")
- if err != nil {
- return err
- }
- }
- }
- err = terminateApps(ac, instca.appsmap)
- if err != nil {
- return err
- }
-
+ //Cleanup
+ con.RemoveClient()
return nil
-
}