diff options
Diffstat (limited to 'src/rsync/pkg/context/context.go')
-rw-r--r-- | src/rsync/pkg/context/context.go | 621 |
1 files changed, 207 insertions, 414 deletions
diff --git a/src/rsync/pkg/context/context.go b/src/rsync/pkg/context/context.go index 7e0fce3c..cc7773b8 100644 --- a/src/rsync/pkg/context/context.go +++ b/src/rsync/pkg/context/context.go @@ -17,503 +17,296 @@ package context import ( + "context" "encoding/json" "fmt" "log" "strings" - "sync" "github.com/onap/multicloud-k8s/src/orchestrator/pkg/appcontext" - "github.com/onap/multicloud-k8s/src/rsync/pkg/app" - con "github.com/onap/multicloud-k8s/src/rsync/pkg/connector" - res "github.com/onap/multicloud-k8s/src/rsync/pkg/resource" + "github.com/onap/multicloud-k8s/src/orchestrator/pkg/infra/logutils" + kubeclient "github.com/onap/multicloud-k8s/src/rsync/pkg/client" + connector "github.com/onap/multicloud-k8s/src/rsync/pkg/connector" + utils "github.com/onap/multicloud-k8s/src/rsync/pkg/internal" status "github.com/onap/multicloud-k8s/src/rsync/pkg/status" pkgerrors "github.com/pkg/errors" + "golang.org/x/sync/errgroup" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" ) type CompositeAppContext struct { - cid interface{} - appsorder string - appsdependency string - appsmap []instMap + cid interface{} } -type clusterInfo struct { - name string - resorder string - resdependency string - ressmap []instMap -} -type instMap struct { - name string - depinfo string - status string - rerr error - clusters []clusterInfo -} - -func getInstMap(order string, dependency string, level string) ([]instMap, error) { - - if order == "" { - return nil, pkgerrors.Errorf("Not a valid order value") - } - if dependency == "" { - return nil, pkgerrors.Errorf("Not a valid dependency value") - } - if !(level == "app" || level == "res") { - return nil, pkgerrors.Errorf("Not a valid level name given to create map") - } - - var aov map[string]interface{} - json.Unmarshal([]byte(order), &aov) - - s := fmt.Sprintf("%vorder", level) - appso := aov[s].([]interface{}) - var instmap = make([]instMap, len(appso)) - - var adv map[string]interface{} - json.Unmarshal([]byte(dependency), &adv) - s = fmt.Sprintf("%vdependency", level) - appsd := adv[s].(map[string]interface{}) - for i, u := range appso { - instmap[i] = instMap{u.(string), appsd[u.(string)].(string), "none", nil, nil} - } - - return instmap, nil -} - -func deleteResource(clustername string, resname string, respath string) error { - k8sClient := app.KubernetesClient{} - err := k8sClient.Init(clustername, resname) +func getRes(ac appcontext.AppContext, name string, app string, cluster string) ([]byte, error) { + var byteRes []byte + rh, err := ac.GetResourceHandle(app, cluster, name) if err != nil { - log.Println("Init failed: " + err.Error()) - return err + return nil, err } - - var c con.KubernetesConnector - c = &k8sClient - var gp res.Resource - err = gp.Delete(respath, resname, "default", c) + resval, err := ac.GetValue(rh) if err != nil { - log.Println("Delete resource failed: " + err.Error() + resname) - return err + return nil, err } - log.Println("Resource succesfully deleted", resname) - return nil - + if resval != "" { + result := strings.Split(name, "+") + if result[0] == "" { + return nil, pkgerrors.Errorf("Resource name is nil %s:", name) + } + byteRes = []byte(fmt.Sprintf("%v", resval.(interface{}))) + } else { + return nil, pkgerrors.Errorf("Resource value is nil %s", name) + } + return byteRes, nil } -func createResource(clustername string, resname string, respath string, label string) error { - k8sClient := app.KubernetesClient{} - err := k8sClient.Init(clustername, resname) +func terminateResource(ac appcontext.AppContext, c *kubeclient.Client, name string, app string, cluster string, label string) error { + res, err := getRes(ac, name, app, cluster) if err != nil { - log.Println("Client init failed: " + err.Error()) return err } - - var c con.KubernetesConnector - c = &k8sClient - var gp res.Resource - _, err = gp.Create(respath, "default", label, c) - if err != nil { - log.Println("Create failed: " + err.Error() + resname) + if err := c.Delete(res); err != nil { + logutils.Error("Failed to delete res", logutils.Fields{ + "error": err, + "resource": name, + }) return err } - log.Println("Resource succesfully created", resname) + logutils.Info("Deleted::", logutils.Fields{ + "cluster": cluster, + "resource": name, + }) return nil - } -func terminateResource(ac appcontext.AppContext, resmap instMap, appname string, clustername string) error { - - rh, err := ac.GetResourceHandle(appname, clustername, resmap.name) +func instantiateResource(ac appcontext.AppContext, c *kubeclient.Client, name string, app string, cluster string, label string) error { + res, err := getRes(ac, name, app, cluster) if err != nil { return err } - - resval, err := ac.GetValue(rh) + //Decode the yaml to create a runtime.Object + unstruct := &unstructured.Unstructured{} + //Ignore the returned obj as we expect the data in unstruct + _, err = utils.DecodeYAMLData(string(res), unstruct) if err != nil { - return err + return pkgerrors.Wrap(err, "Decode deployment object error") } - if resval != "" { - result := strings.Split(resmap.name, "+") - if result[0] == "" { - return pkgerrors.Errorf("Resource name is nil") - } - err = deleteResource(clustername, result[0], resval.(string)) - if err != nil { - return err - } - } else { - return pkgerrors.Errorf("Resource value is nil") + //Add the tracking label to all resources created here + labels := unstruct.GetLabels() + //Check if labels exist for this object + if labels == nil { + labels = map[string]string{} } + //labels[config.GetConfiguration().KubernetesLabelName] = client.GetInstanceID() + labels["emco/deployment-id"] = label + unstruct.SetLabels(labels) - return nil - -} - -func instantiateResource(ac appcontext.AppContext, resmap instMap, appname string, clustername string, label string) error { - rh, err := ac.GetResourceHandle(appname, clustername, resmap.name) + // This checks if the resource we are creating has a podSpec in it + // Eg: Deployment, StatefulSet, Job etc.. + // If a PodSpec is found, the label will be added to it too. + //connector.TagPodsIfPresent(unstruct, client.GetInstanceID()) + utils.TagPodsIfPresent(unstruct, label) + b, err := unstruct.MarshalJSON() if err != nil { + logutils.Error("Failed to MarshalJSON", logutils.Fields{ + "error": err, + "resource": name, + }) return err } - - resval, err := ac.GetValue(rh) - if err != nil { + if err := c.Apply(b); err != nil { + logutils.Error("Failed to apply res", logutils.Fields{ + "error": err, + "resource": name, + }) return err } - - if resval != "" { - result := strings.Split(resmap.name, "+") - if result[0] == "" { - return pkgerrors.Errorf("Resource name is nil") - } - err = createResource(clustername, result[0], resval.(string), label) - if err != nil { - return err - } - } else { - return pkgerrors.Errorf("Resource value is nil") - } - + logutils.Info("Installed::", logutils.Fields{ + "cluster": cluster, + "resource": name, + }) return nil - } -func terminateResources(ac appcontext.AppContext, ressmap []instMap, appname string, clustername string) error { - var wg sync.WaitGroup - var chans = make([]chan int, len(ressmap)) - for l := range chans { - chans[l] = make(chan int) - } - for i := 0; i < len(ressmap); i++ { - wg.Add(1) - go func(index int) { - if ressmap[index].depinfo == "go" { - ressmap[index].status = "start" - } else { - ressmap[index].status = "waiting" - c := <-chans[index] - if c != index { - ressmap[index].status = "error" - ressmap[index].rerr = pkgerrors.Errorf("channel does not match") - wg.Done() - return - } - ressmap[index].status = "start" - } - ressmap[index].rerr = terminateResource(ac, ressmap[index], appname, clustername) - ressmap[index].status = "done" - waitstr := fmt.Sprintf("wait on %v", ressmap[index].name) - for j := 0; j < len(ressmap); j++ { - if ressmap[j].depinfo == waitstr { - chans[j] <- j - } - } - wg.Done() - }(i) - } - wg.Wait() - for k := 0; k < len(ressmap); k++ { - if ressmap[k].rerr != nil { - return pkgerrors.Errorf("error during resources termination") - } - } - return nil +func addStatusTracker(c *kubeclient.Client, app string, cluster string, label string) error { -} - -func instantiateResources(ac appcontext.AppContext, ressmap []instMap, appname string, clustername string) error { - var wg sync.WaitGroup - var chans = make([]chan int, len(ressmap)) - cid, _ := ac.GetCompositeAppHandle() - - results := strings.Split(cid.(string), "/") - label := results[2] + "-" + appname - - for l := range chans { - chans[l] = make(chan int) - } - for i := 0; i < len(ressmap); i++ { - wg.Add(1) - go func(index int) { - if ressmap[index].depinfo == "go" { - ressmap[index].status = "start" - } else { - ressmap[index].status = "waiting" - c := <-chans[index] - if c != index { - ressmap[index].status = "error" - ressmap[index].rerr = pkgerrors.Errorf("channel does not match") - wg.Done() - return - } - ressmap[index].status = "start" - } - ressmap[index].rerr = instantiateResource(ac, ressmap[index], appname, clustername, label) - ressmap[index].status = "done" - waitstr := fmt.Sprintf("wait on %v", ressmap[index].name) - for j := 0; j < len(ressmap); j++ { - if ressmap[j].depinfo == waitstr { - chans[j] <- j - } - } - wg.Done() - }(i) + b, err := status.GetStatusCR(label) + if err != nil { + logutils.Error("Failed to get status CR for installing", logutils.Fields{ + "error": err, + "label": label, + }) + return err } - wg.Wait() - for k := 0; k < len(ressmap); k++ { - if ressmap[k].rerr != nil { - return pkgerrors.Errorf("error during resources instantiation") - } + if err = c.Apply(b); err != nil { + logutils.Error("Failed to apply status tracker", logutils.Fields{ + "error": err, + "cluster": cluster, + "app": app, + "label": label, + }) + return err } + logutils.Info("Status tracker installed::", logutils.Fields{ + "cluster": cluster, + "app": app, + "label": label, + }) return nil - } -func terminateApp(ac appcontext.AppContext, appmap instMap) error { - - for i := 0; i < len(appmap.clusters); i++ { - err := terminateResources(ac, appmap.clusters[i].ressmap, appmap.name, - appmap.clusters[i].name) - if err != nil { - return err - } +func deleteStatusTracker(c *kubeclient.Client, app string, cluster string, label string) error { + b, err := status.GetStatusCR(label) + if err != nil { + logutils.Error("Failed to get status CR for deleting", logutils.Fields{ + "error": err, + "label": label, + }) + return err } - log.Println("Termination of app done: " + appmap.name) - - return nil - -} - -func instantiateApp(ac appcontext.AppContext, appmap instMap) error { - - for i := 0; i < len(appmap.clusters); i++ { - err := instantiateResources(ac, appmap.clusters[i].ressmap, appmap.name, - appmap.clusters[i].name) - if err != nil { - return err - } - err = status.StartClusterWatcher(appmap.clusters[i].name) - if err != nil { - log.Printf("Error starting Cluster Watcher %v: %v\n", appmap.clusters[i], err) - } + if err = c.Delete(b); err != nil { + logutils.Error("Failed to delete res", logutils.Fields{ + "error": err, + "app": app, + "label": label, + }) + return err } - log.Println("Instantiation of app done: " + appmap.name) + logutils.Info("Status tracker deleted::", logutils.Fields{ + "cluster": cluster, + "app": app, + "label": label, + }) return nil - } -func instantiateApps(ac appcontext.AppContext, appsmap []instMap) error { - var wg sync.WaitGroup - var chans = make([]chan int, len(appsmap)) - for l := range chans { - chans[l] = make(chan int) - } - for i := 0; i < len(appsmap); i++ { - wg.Add(1) - go func(index int) { - if appsmap[index].depinfo == "go" { - appsmap[index].status = "start" - } else { - appsmap[index].status = "waiting" - c := <-chans[index] - if c != index { - appsmap[index].status = "error" - appsmap[index].rerr = pkgerrors.Errorf("channel does not match") - wg.Done() - return - } - appsmap[index].status = "start" - } - appsmap[index].rerr = instantiateApp(ac, appsmap[index]) - appsmap[index].status = "done" - waitstr := fmt.Sprintf("wait on %v", appsmap[index].name) - for j := 0; j < len(appsmap); j++ { - if appsmap[j].depinfo == waitstr { - chans[j] <- j - } - } - wg.Done() - }(i) - } - wg.Wait() - for k := 0; k < len(appsmap); k++ { - if appsmap[k].rerr != nil { - return pkgerrors.Errorf("error during apps instantiation") - } - } - return nil +type fn func(ac appcontext.AppContext, client *kubeclient.Client, res string, app string, cluster string, label string) error -} +type statusfn func(client *kubeclient.Client, app string, cluster string, label string) error -func (instca *CompositeAppContext) InstantiateComApp(cid interface{}) error { +func applyFnComApp(cid interface{}, con *connector.Connector, f fn, sfn statusfn, breakonError bool) error { ac := appcontext.AppContext{} - + g, _ := errgroup.WithContext(context.Background()) _, err := ac.LoadAppContext(cid) if err != nil { return err } - instca.cid = cid - - appsorder, err := ac.GetAppInstruction("order") + appsOrder, err := ac.GetAppInstruction("order") if err != nil { return err } - instca.appsorder = appsorder.(string) - appsdependency, err := ac.GetAppInstruction("dependency") - if err != nil { - return err - } - instca.appsdependency = appsdependency.(string) - instca.appsmap, err = getInstMap(instca.appsorder, instca.appsdependency, "app") - if err != nil { - return err - } - - for j := 0; j < len(instca.appsmap); j++ { - clusternames, err := ac.GetClusterNames(instca.appsmap[j].name) - if err != nil { - return err - } - instca.appsmap[j].clusters = make([]clusterInfo, len(clusternames)) - for k := 0; k < len(clusternames); k++ { - instca.appsmap[j].clusters[k].name = clusternames[k] - resorder, err := ac.GetResourceInstruction( - instca.appsmap[j].name, clusternames[k], "order") + var appList map[string][]string + json.Unmarshal([]byte(appsOrder.(string)), &appList) + logutils.Info("appsorder ", logutils.Fields{ + "appsorder": appsOrder, + "string": appList, + }) + id, _ := ac.GetCompositeAppHandle() + + for _, app := range appList["apporder"] { + + appName := app + results := strings.Split(id.(string), "/") + label := results[2] + "-" + app + g.Go(func() error { + clusterNames, err := ac.GetClusterNames(appName) if err != nil { return err } - instca.appsmap[j].clusters[k].resorder = resorder.(string) - - resdependency, err := ac.GetResourceInstruction( - instca.appsmap[j].name, clusternames[k], "dependency") - if err != nil { - return err + rg, _ := errgroup.WithContext(context.Background()) + for k := 0; k < len(clusterNames); k++ { + cluster := clusterNames[k] + err = status.StartClusterWatcher(cluster) + if err != nil { + log.Printf("Error starting Cluster Watcher %v: %v\n", cluster, err) + } + rg.Go(func() error { + c, err := con.GetClient(cluster) + if err != nil { + logutils.Error("Error in creating kubeconfig client", logutils.Fields{ + "error": err, + "cluster": cluster, + "appName": appName, + }) + return err + } + resorder, err := ac.GetResourceInstruction(appName, cluster, "order") + if err != nil { + logutils.Error("Resorder error ", logutils.Fields{"error": err}) + return err + } + var aov map[string][]string + json.Unmarshal([]byte(resorder.(string)), &aov) + for i, res := range aov["resorder"] { + err = f(ac, c, res, appName, cluster, label) + if err != nil { + logutils.Error("Error in resource %s: %v", logutils.Fields{ + "error": err, + "cluster": cluster, + "resource": res, + }) + if breakonError { + // handle status tracking before exiting if at least one resource got handled + if i > 0 { + serr := sfn(c, appName, cluster, label) + if serr != nil { + logutils.Warn("Error handling status tracker", logutils.Fields{"error": serr}) + } + } + return err + } + } + } + serr := sfn(c, appName, cluster, label) + if serr != nil { + logutils.Warn("Error handling status tracker", logutils.Fields{"error": serr}) + } + return nil + }) } - instca.appsmap[j].clusters[k].resdependency = resdependency.(string) - - instca.appsmap[j].clusters[k].ressmap, err = getInstMap( - instca.appsmap[j].clusters[k].resorder, - instca.appsmap[j].clusters[k].resdependency, "res") - if err != nil { + if err := rg.Wait(); err != nil { + logutils.Error("Encountered error in App cluster", logutils.Fields{ + "error": err, + }) return err } - } + return nil + }) } - err = instantiateApps(ac, instca.appsmap) - if err != nil { + if err := g.Wait(); err != nil { + logutils.Error("Encountered error", logutils.Fields{ + "error": err, + }) return err } - return nil } -// Delete all the apps -func terminateApps(ac appcontext.AppContext, appsmap []instMap) error { - var wg sync.WaitGroup - var chans = make([]chan int, len(appsmap)) - for l := range chans { - chans[l] = make(chan int) - } - for i := 0; i < len(appsmap); i++ { - wg.Add(1) - go func(index int) { - if appsmap[index].depinfo == "go" { - appsmap[index].status = "start" - } else { - appsmap[index].status = "waiting" - c := <-chans[index] - if c != index { - appsmap[index].status = "error" - appsmap[index].rerr = pkgerrors.Errorf("channel does not match") - wg.Done() - return - } - appsmap[index].status = "start" - } - appsmap[index].rerr = terminateApp(ac, appsmap[index]) - appsmap[index].status = "done" - waitstr := fmt.Sprintf("wait on %v", appsmap[index].name) - for j := 0; j < len(appsmap); j++ { - if appsmap[j].depinfo == waitstr { - chans[j] <- j - } - } - wg.Done() - }(i) - } - wg.Wait() - for k := 0; k < len(appsmap); k++ { - if appsmap[k].rerr != nil { - return pkgerrors.Errorf("error during apps instantiation") - } +// InstantiateComApp Instantiate Apps in Composite App +func (instca *CompositeAppContext) InstantiateComApp(cid interface{}) error { + con := connector.Init(cid) + err := applyFnComApp(cid, con, instantiateResource, addStatusTracker, true) + if err != nil { + logutils.Error("InstantiateComApp unsuccessful", logutils.Fields{"error": err}) + return err } + //Cleanup + con.RemoveClient() return nil - } -// Delete all the resources for a given context +// TerminateComApp Terminates Apps in Composite App func (instca *CompositeAppContext) TerminateComApp(cid interface{}) error { - ac := appcontext.AppContext{} - - _, err := ac.LoadAppContext(cid) - if err != nil { - return err - } - instca.cid = cid - - appsorder, err := ac.GetAppInstruction("order") - if err != nil { - return err - } - instca.appsorder = appsorder.(string) - appsdependency, err := ac.GetAppInstruction("dependency") - if err != nil { - return err - } - instca.appsdependency = appsdependency.(string) - instca.appsmap, err = getInstMap(instca.appsorder, instca.appsdependency, "app") + con := connector.Init(cid) + err := applyFnComApp(cid, con, terminateResource, deleteStatusTracker, false) if err != nil { + logutils.Error("TerminateComApp unsuccessful", logutils.Fields{ + "error": err, + }) return err } - - for j := 0; j < len(instca.appsmap); j++ { - clusternames, err := ac.GetClusterNames(instca.appsmap[j].name) - if err != nil { - return err - } - instca.appsmap[j].clusters = make([]clusterInfo, len(clusternames)) - for k := 0; k < len(clusternames); k++ { - instca.appsmap[j].clusters[k].name = clusternames[k] - resorder, err := ac.GetResourceInstruction( - instca.appsmap[j].name, clusternames[k], "order") - if err != nil { - return err - } - instca.appsmap[j].clusters[k].resorder = resorder.(string) - - resdependency, err := ac.GetResourceInstruction( - instca.appsmap[j].name, clusternames[k], "dependency") - if err != nil { - return err - } - instca.appsmap[j].clusters[k].resdependency = resdependency.(string) - - instca.appsmap[j].clusters[k].ressmap, err = getInstMap( - instca.appsmap[j].clusters[k].resorder, - instca.appsmap[j].clusters[k].resdependency, "res") - if err != nil { - return err - } - } - } - err = terminateApps(ac, instca.appsmap) - if err != nil { - return err - } - + //Cleanup + con.RemoveClient() return nil - } |