aboutsummaryrefslogtreecommitdiffstats
path: root/src/rsync/pkg
diff options
context:
space:
mode:
authorManjunath Ranganathaiah <manjunath.ranganathaiah@intel.com>2020-06-19 17:54:58 +0000
committerManjunath Ranganathaiah <manjunath.ranganathaiah@intel.com>2020-06-24 22:58:20 +0000
commit81c8ffaa3046245caf3aff5bffe2b971d497ac3d (patch)
tree92804e8f522fa2ec352b4d3bea16d8c2abc645ac /src/rsync/pkg
parent7b860ae60bf9686b449ab2fe3f18c33944bdd71c (diff)
Instantiation and termination of a given context implementation.
Issue-ID: MULTICLOUD-1005 Signed-off-by: Manjunath Ranganathaiah <manjunath.ranganathaiah@intel.com> Change-Id: I60e11aaad97b60efc24a02866dc0e580507e5296
Diffstat (limited to 'src/rsync/pkg')
-rw-r--r--src/rsync/pkg/app/client.go154
-rw-r--r--src/rsync/pkg/connector/connector.go96
-rw-r--r--src/rsync/pkg/context/context.go547
-rw-r--r--src/rsync/pkg/grpc/installappserver/installappserver.go27
-rw-r--r--src/rsync/pkg/internal/config/config.go128
-rw-r--r--src/rsync/pkg/internal/utils.go59
-rw-r--r--src/rsync/pkg/resource/resource.go129
7 files changed, 1131 insertions, 9 deletions
diff --git a/src/rsync/pkg/app/client.go b/src/rsync/pkg/app/client.go
new file mode 100644
index 00000000..fb57d46b
--- /dev/null
+++ b/src/rsync/pkg/app/client.go
@@ -0,0 +1,154 @@
+/*
+Copyright 2018 Intel Corporation.
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+ http://www.apache.org/licenses/LICENSE-2.0
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package app
+
+import (
+ "os"
+ "strings"
+ "time"
+ "encoding/base64"
+
+ pkgerrors "github.com/pkg/errors"
+ "k8s.io/apimachinery/pkg/api/meta"
+ "k8s.io/client-go/discovery/cached/disk"
+ "k8s.io/client-go/dynamic"
+ "k8s.io/client-go/kubernetes"
+ "k8s.io/client-go/restmapper"
+ "k8s.io/client-go/tools/clientcmd"
+
+ "github.com/onap/multicloud-k8s/src/clm/pkg/cluster"
+)
+
+const basePath string = "/tmp/rsync/"
+
+// KubernetesClient encapsulates the different clients' interfaces
+// we need when interacting with a Kubernetes cluster
+type KubernetesClient struct {
+ clientSet kubernetes.Interface
+ dynamicClient dynamic.Interface
+ discoverClient *disk.CachedDiscoveryClient
+ restMapper meta.RESTMapper
+ instanceID string
+}
+
+// getKubeConfig uses the connectivity client to get the kubeconfig based on the name
+// of the clustername. This is written out to a file.
+func (k *KubernetesClient) getKubeConfig(clustername string, id string) (string, error) {
+
+ if !strings.Contains(clustername, "+") {
+ return "", pkgerrors.New("Not a valid cluster name")
+ }
+ strs := strings.Split(clustername, "+")
+ if len(strs) != 2 {
+ return "", pkgerrors.New("Not a valid cluster name")
+ }
+ kubeConfig, err := cluster.NewClusterClient().GetClusterContent(strs[0], strs[1])
+ if err != nil {
+ return "", pkgerrors.New("Get kubeconfig failed")
+ }
+
+ var kubeConfigPath string = basePath + id + "/" + clustername + "/"
+
+ if _, err := os.Stat(kubeConfigPath); os.IsNotExist(err) {
+ err = os.MkdirAll(kubeConfigPath, 0755)
+ if err != nil {
+ return "", err
+ }
+ }
+ kubeConfigPath = kubeConfigPath + "config"
+
+ f, err := os.Create(kubeConfigPath)
+ defer f.Close()
+ if err != nil {
+ return "", err
+ }
+ dec, err := base64.StdEncoding.DecodeString(kubeConfig.Kubeconfig)
+ if err != nil {
+ return "", err
+ }
+ _, err = f.Write(dec)
+ if err != nil {
+ return "", err
+ }
+
+ return kubeConfigPath, nil
+}
+
+// init loads the Kubernetes configuation values stored into the local configuration file
+func (k *KubernetesClient) Init(clustername string, iid string) error {
+ if clustername == "" {
+ return pkgerrors.New("Cloudregion is empty")
+ }
+
+ if iid == "" {
+ return pkgerrors.New("Instance ID is empty")
+ }
+
+ k.instanceID = iid
+
+ configPath, err := k.getKubeConfig(clustername, iid)
+ if err != nil {
+ return pkgerrors.Wrap(err, "Get kubeconfig file")
+ }
+
+ //Remove kubeconfigfile after the clients are created
+ defer os.Remove(configPath)
+
+ config, err := clientcmd.BuildConfigFromFlags("", configPath)
+ if err != nil {
+ return pkgerrors.Wrap(err, "setConfig: Build config from flags raised an error")
+ }
+
+ k.clientSet, err = kubernetes.NewForConfig(config)
+ if err != nil {
+ return err
+ }
+
+ k.dynamicClient, err = dynamic.NewForConfig(config)
+ if err != nil {
+ return pkgerrors.Wrap(err, "Creating dynamic client")
+ }
+
+ k.discoverClient, err = disk.NewCachedDiscoveryClientForConfig(config, os.TempDir(), "", 10*time.Minute)
+ if err != nil {
+ return pkgerrors.Wrap(err, "Creating discovery client")
+ }
+
+ k.restMapper = restmapper.NewDeferredDiscoveryRESTMapper(k.discoverClient)
+
+ return nil
+}
+
+//GetMapper returns the RESTMapper that was created for this client
+func (k *KubernetesClient) GetMapper() meta.RESTMapper {
+ return k.restMapper
+}
+
+//GetDynamicClient returns the dynamic client that is needed for
+//unstructured REST calls to the apiserver
+func (k *KubernetesClient) GetDynamicClient() dynamic.Interface {
+ return k.dynamicClient
+}
+
+// GetStandardClient returns the standard client that can be used to handle
+// standard kubernetes kinds
+func (k *KubernetesClient) GetStandardClient() kubernetes.Interface {
+ return k.clientSet
+}
+
+//GetInstanceID returns the instanceID that is injected into all the
+//resources created by the plugin
+func (k *KubernetesClient) GetInstanceID() string {
+ return k.instanceID
+}
diff --git a/src/rsync/pkg/connector/connector.go b/src/rsync/pkg/connector/connector.go
new file mode 100644
index 00000000..6e17f87a
--- /dev/null
+++ b/src/rsync/pkg/connector/connector.go
@@ -0,0 +1,96 @@
+/*
+ * Copyright 2019 Intel Corporation, Inc
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package connector
+
+import (
+ "log"
+
+ "github.com/onap/multicloud-k8s/src/rsync/pkg/internal/config"
+
+ corev1 "k8s.io/api/core/v1"
+ "k8s.io/apimachinery/pkg/api/meta"
+ "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
+ "k8s.io/apimachinery/pkg/runtime"
+ "k8s.io/client-go/dynamic"
+ "k8s.io/client-go/kubernetes"
+)
+
+// KubernetesConnector is an interface that is expected to be implemented
+// by any code that calls the plugin framework functions.
+// It implements methods that are needed by the plugins to get Kubernetes
+// clients and other information needed to interface with Kubernetes
+type KubernetesConnector interface {
+ //GetMapper returns the RESTMapper that was created for this client
+ GetMapper() meta.RESTMapper
+
+ //GetDynamicClient returns the dynamic client that is needed for
+ //unstructured REST calls to the apiserver
+ GetDynamicClient() dynamic.Interface
+
+ // GetStandardClient returns the standard client that can be used to handle
+ // standard kubernetes kinds
+ GetStandardClient() kubernetes.Interface
+
+ //GetInstanceID returns the InstanceID for tracking during creation
+ GetInstanceID() string
+}
+
+// Reference is the interface that is implemented
+type Reference interface {
+ //Create a kubernetes resource described by the yaml in yamlFilePath
+ Create(yamlFilePath string, namespace string, client KubernetesConnector) (string, error)
+ //Delete a kubernetes resource described in the provided namespace
+ Delete(yamlFilePath string, resname string, namespace string, client KubernetesConnector) error
+}
+
+// TagPodsIfPresent finds the PodTemplateSpec from any workload
+// object that contains it and changes the spec to include the tag label
+func TagPodsIfPresent(unstruct *unstructured.Unstructured, tag string) {
+
+ spec, ok := unstruct.Object["spec"].(map[string]interface{})
+ if !ok {
+ log.Println("Error converting spec to map")
+ return
+ }
+
+ template, ok := spec["template"].(map[string]interface{})
+ if !ok {
+ log.Println("Error converting template to map")
+ return
+ }
+
+ //Attempt to convert the template to a podtemplatespec.
+ //This is to check if we have any pods being created.
+ podTemplateSpec := &corev1.PodTemplateSpec{}
+ err := runtime.DefaultUnstructuredConverter.FromUnstructured(template, podTemplateSpec)
+ if err != nil {
+ log.Println("Did not find a podTemplateSpec: " + err.Error())
+ return
+ }
+
+ labels := podTemplateSpec.GetLabels()
+ if labels == nil {
+ labels = map[string]string{}
+ }
+ labels[config.GetConfiguration().KubernetesLabelName] = tag
+ podTemplateSpec.SetLabels(labels)
+
+ updatedTemplate, err := runtime.DefaultUnstructuredConverter.ToUnstructured(podTemplateSpec)
+
+ //Set the label
+ spec["template"] = updatedTemplate
+}
diff --git a/src/rsync/pkg/context/context.go b/src/rsync/pkg/context/context.go
new file mode 100644
index 00000000..67e589c9
--- /dev/null
+++ b/src/rsync/pkg/context/context.go
@@ -0,0 +1,547 @@
+/*
+ * Copyright 2020 Intel Corporation, Inc
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package context
+
+import (
+ "encoding/json"
+ "fmt"
+ "log"
+ "os"
+ "sync"
+ "strings"
+ "github.com/onap/multicloud-k8s/src/orchestrator/pkg/appcontext"
+ pkgerrors "github.com/pkg/errors"
+ res "github.com/onap/multicloud-k8s/src/rsync/pkg/resource"
+ con "github.com/onap/multicloud-k8s/src/rsync/pkg/connector"
+ "github.com/onap/multicloud-k8s/src/rsync/pkg/app"
+)
+
+type CompositeAppContext struct {
+ cid interface{}
+ appsorder string
+ appsdependency string
+ appsmap []instMap
+}
+type clusterInfo struct {
+ name string
+ resorder string
+ resdependency string
+ ressmap []instMap
+}
+type instMap struct {
+ name string
+ depinfo string
+ status string
+ rerr error
+ clusters []clusterInfo
+}
+
+const basePath string = "/tmp/rsync/"
+
+func getInstMap(order string, dependency string, level string) ([]instMap, error) {
+
+ if order == "" {
+ return nil, pkgerrors.Errorf("Not a valid order value")
+ }
+ if dependency == "" {
+ return nil, pkgerrors.Errorf("Not a valid dependency value")
+ }
+
+ if !(level == "app" || level == "res") {
+ return nil, pkgerrors.Errorf("Not a valid level name given to create map")
+ }
+
+
+ var aov map[string]interface{}
+ json.Unmarshal([]byte(order), &aov)
+
+ s := fmt.Sprintf("%vorder", level)
+ appso := aov[s].([]interface{})
+ var instmap = make([]instMap, len(appso))
+
+ var adv map[string]interface{}
+ json.Unmarshal([]byte(dependency), &adv)
+ s = fmt.Sprintf("%vdependency", level)
+ appsd := adv[s].(map[string]interface{})
+ for i, u := range appso {
+ instmap[i] = instMap{u.(string), appsd[u.(string)].(string), "none", nil, nil}
+ }
+
+ return instmap, nil
+}
+
+func deleteResource(clustername string, resname string, respath string) error {
+ k8sClient := app.KubernetesClient{}
+ err := k8sClient.Init(clustername, resname)
+ if err != nil {
+ log.Println("Init failed: " + err.Error())
+ return err
+ }
+
+ var c con.KubernetesConnector
+ c = &k8sClient
+ var gp res.Resource
+ err = gp.Delete(respath, resname, "default", c)
+ if err != nil {
+ log.Println("Delete resource failed: " + err.Error())
+ return err
+ }
+ log.Println("Resource succesfully deleted")
+ return nil
+
+}
+
+func createResource(clustername string, resname string, respath string) error {
+ k8sClient := app.KubernetesClient{}
+ err := k8sClient.Init(clustername, resname)
+ if err != nil {
+ log.Println("Client init failed: " + err.Error())
+ return err
+ }
+
+ var c con.KubernetesConnector
+ c = &k8sClient
+ var gp res.Resource
+ _, err = gp.Create(respath,"default", c)
+ if err != nil {
+ log.Println("Create failed: " + err.Error())
+ return err
+ }
+ log.Println("Resource succesfully created")
+ return nil
+
+}
+
+func terminateResource(ac appcontext.AppContext, resmap instMap, appname string, clustername string) error {
+ var resPath string = basePath + appname + "/" + clustername + "/resources/"
+ rh, err := ac.GetResourceHandle(appname, clustername, resmap.name)
+ if err != nil {
+ return err
+ }
+
+ resval, err := ac.GetValue(rh)
+ if err != nil {
+ return err
+ }
+
+ if resval != "" {
+ if _, err := os.Stat(resPath); os.IsNotExist(err) {
+ err = os.MkdirAll(resPath, 0755)
+ if err != nil {
+ return err
+ }
+ }
+ resPath := resPath + resmap.name + ".yaml"
+ f, err := os.Create(resPath)
+ defer f.Close()
+ if err != nil {
+ return err
+ }
+ _, err = f.WriteString(resval.(string))
+ if err != nil {
+ return err
+ }
+ result := strings.Split(resmap.name, "+")
+ if result[0] == "" {
+ return pkgerrors.Errorf("Resource name is nil")
+ }
+ err = deleteResource(clustername, result[0], resPath)
+ if err != nil {
+ return err
+ }
+ //defer os.Remove(resPath)
+ } else {
+ return pkgerrors.Errorf("Resource value is nil")
+ }
+
+ return nil
+
+}
+
+func instantiateResource(ac appcontext.AppContext, resmap instMap, appname string, clustername string) error {
+ var resPath string = basePath + appname + "/" + clustername + "/resources/"
+ rh, err := ac.GetResourceHandle(appname, clustername, resmap.name)
+ if err != nil {
+ return err
+ }
+
+ resval, err := ac.GetValue(rh)
+ if err != nil {
+ return err
+ }
+
+ if resval != "" {
+ if _, err := os.Stat(resPath); os.IsNotExist(err) {
+ err = os.MkdirAll(resPath, 0755)
+ if err != nil {
+ return err
+ }
+ }
+ resPath := resPath + resmap.name + ".yaml"
+ f, err := os.Create(resPath)
+ defer f.Close()
+ if err != nil {
+ return err
+ }
+ _, err = f.WriteString(resval.(string))
+ if err != nil {
+ return err
+ }
+ result := strings.Split(resmap.name, "+")
+ if result[0] == "" {
+ return pkgerrors.Errorf("Resource name is nil")
+ }
+ err = createResource(clustername, result[0], resPath)
+ if err != nil {
+ return err
+ }
+ //defer os.Remove(resPath)
+ } else {
+ return pkgerrors.Errorf("Resource value is nil")
+ }
+
+ return nil
+
+}
+
+func terminateResources(ac appcontext.AppContext, ressmap []instMap, appname string, clustername string) error {
+ var wg sync.WaitGroup
+ var chans = make([]chan int, len(ressmap))
+ for l := range chans {
+ chans[l] = make(chan int)
+ }
+ for i:=0; i<len(ressmap); i++ {
+ wg.Add(1)
+ go func(index int) {
+ if ressmap[index].depinfo == "go" {
+ ressmap[index].status = "start"
+ } else {
+ ressmap[index].status = "waiting"
+ c := <- chans[index]
+ if c != index {
+ ressmap[index].status = "error"
+ ressmap[index].rerr = pkgerrors.Errorf("channel does not match")
+ wg.Done()
+ return
+ }
+ ressmap[index].status = "start"
+ }
+ ressmap[index].rerr = terminateResource(ac, ressmap[index], appname, clustername)
+ ressmap[index].status = "done"
+ waitstr := fmt.Sprintf("wait on %v",ressmap[index].name)
+ for j:=0; j<len(ressmap); j++ {
+ if ressmap[j].depinfo == waitstr {
+ chans[j] <- j
+ }
+ }
+ wg.Done()
+ }(i)
+ }
+ wg.Wait()
+ for k:=0; k<len(ressmap); k++ {
+ if ressmap[k].rerr != nil {
+ return pkgerrors.Errorf("error during resources termination")
+ }
+ }
+ return nil
+
+}
+
+func instantiateResources(ac appcontext.AppContext, ressmap []instMap, appname string, clustername string) error {
+ var wg sync.WaitGroup
+ var chans = make([]chan int, len(ressmap))
+ for l := range chans {
+ chans[l] = make(chan int)
+ }
+ for i:=0; i<len(ressmap); i++ {
+ wg.Add(1)
+ go func(index int) {
+ if ressmap[index].depinfo == "go" {
+ ressmap[index].status = "start"
+ } else {
+ ressmap[index].status = "waiting"
+ c := <- chans[index]
+ if c != index {
+ ressmap[index].status = "error"
+ ressmap[index].rerr = pkgerrors.Errorf("channel does not match")
+ wg.Done()
+ return
+ }
+ ressmap[index].status = "start"
+ }
+ ressmap[index].rerr = instantiateResource(ac, ressmap[index], appname, clustername)
+ ressmap[index].status = "done"
+ waitstr := fmt.Sprintf("wait on %v",ressmap[index].name)
+ for j:=0; j<len(ressmap); j++ {
+ if ressmap[j].depinfo == waitstr {
+ chans[j] <- j
+ }
+ }
+ wg.Done()
+ }(i)
+ }
+ wg.Wait()
+ for k:=0; k<len(ressmap); k++ {
+ if ressmap[k].rerr != nil {
+ return pkgerrors.Errorf("error during resources instantiation")
+ }
+ }
+ return nil
+
+}
+
+func terminateApp(ac appcontext.AppContext, appmap instMap) error {
+
+ for i:=0; i<len(appmap.clusters); i++ {
+ err := terminateResources(ac, appmap.clusters[i].ressmap, appmap.name,
+ appmap.clusters[i].name)
+ if err != nil {
+ return err
+ }
+ }
+ log.Println("Termination of app done: " + appmap.name)
+
+ return nil
+
+}
+
+
+func instantiateApp(ac appcontext.AppContext, appmap instMap) error {
+
+ for i:=0; i<len(appmap.clusters); i++ {
+ err := instantiateResources(ac, appmap.clusters[i].ressmap, appmap.name,
+ appmap.clusters[i].name)
+ if err != nil {
+ return err
+ }
+ }
+ log.Println("Instantiation of app done: " + appmap.name)
+ return nil
+
+}
+
+func instantiateApps(ac appcontext.AppContext, appsmap []instMap) error {
+ var wg sync.WaitGroup
+ var chans = make([]chan int, len(appsmap))
+ for l := range chans {
+ chans[l] = make(chan int)
+ }
+ for i:=0; i<len(appsmap); i++ {
+ wg.Add(1)
+ go func(index int) {
+ if appsmap[index].depinfo == "go" {
+ appsmap[index].status = "start"
+ } else {
+ appsmap[index].status = "waiting"
+ c := <- chans[index]
+ if c != index {
+ appsmap[index].status = "error"
+ appsmap[index].rerr = pkgerrors.Errorf("channel does not match")
+ wg.Done()
+ return
+ }
+ appsmap[index].status = "start"
+ }
+ appsmap[index].rerr = instantiateApp(ac, appsmap[index])
+ appsmap[index].status = "done"
+ waitstr := fmt.Sprintf("wait on %v",appsmap[index].name)
+ for j:=0; j<len(appsmap); j++ {
+ if appsmap[j].depinfo == waitstr {
+ chans[j] <- j
+ }
+ }
+ wg.Done()
+ }(i)
+ }
+ wg.Wait()
+ for k:=0; k<len(appsmap); k++ {
+ if appsmap[k].rerr != nil {
+ return pkgerrors.Errorf("error during apps instantiation")
+ }
+ }
+ return nil
+
+}
+
+func (instca *CompositeAppContext) InstantiateComApp(cid interface{}) error {
+ ac := appcontext.AppContext{}
+
+ _, err := ac.LoadAppContext(cid)
+ if err != nil {
+ return err
+ }
+ instca.cid = cid
+
+ appsorder, err := ac.GetAppInstruction("order")
+ if err != nil {
+ return err
+ }
+ instca.appsorder = appsorder.(string)
+ appsdependency, err := ac.GetAppInstruction("dependency")
+ if err != nil {
+ return err
+ }
+ instca.appsdependency = appsdependency.(string)
+ instca.appsmap, err = getInstMap(instca.appsorder,instca.appsdependency, "app")
+ if err != nil {
+ return err
+ }
+
+ for j:=0; j<len(instca.appsmap); j++ {
+ clusternames, err := ac.GetClusterNames(instca.appsmap[j].name)
+ if err != nil {
+ return err
+ }
+ instca.appsmap[j].clusters = make([]clusterInfo, len(clusternames))
+ for k:=0; k<len(clusternames); k++ {
+ instca.appsmap[j].clusters[k].name = clusternames[k]
+ resorder, err := ac.GetResourceInstruction(
+ instca.appsmap[j].name, clusternames[k], "order")
+ if err != nil {
+ return err
+ }
+ instca.appsmap[j].clusters[k].resorder = resorder.(string)
+
+ resdependency, err := ac.GetResourceInstruction(
+ instca.appsmap[j].name, clusternames[k], "dependency")
+ if err != nil {
+ return err
+ }
+ instca.appsmap[j].clusters[k].resdependency = resdependency.(string)
+
+ instca.appsmap[j].clusters[k].ressmap, err = getInstMap(
+ instca.appsmap[j].clusters[k].resorder,
+ instca.appsmap[j].clusters[k].resdependency, "res")
+ if err != nil {
+ return err
+ }
+ }
+ }
+ err = instantiateApps(ac, instca.appsmap)
+ if err != nil {
+ return err
+ }
+
+ return nil
+}
+
+// Delete all the apps
+func terminateApps(ac appcontext.AppContext, appsmap []instMap) error {
+ var wg sync.WaitGroup
+ var chans = make([]chan int, len(appsmap))
+ for l := range chans {
+ chans[l] = make(chan int)
+ }
+ for i:=0; i<len(appsmap); i++ {
+ wg.Add(1)
+ go func(index int) {
+ if appsmap[index].depinfo == "go" {
+ appsmap[index].status = "start"
+ } else {
+ appsmap[index].status = "waiting"
+ c := <- chans[index]
+ if c != index {
+ appsmap[index].status = "error"
+ appsmap[index].rerr = pkgerrors.Errorf("channel does not match")
+ wg.Done()
+ return
+ }
+ appsmap[index].status = "start"
+ }
+ appsmap[index].rerr = terminateApp(ac, appsmap[index])
+ appsmap[index].status = "done"
+ waitstr := fmt.Sprintf("wait on %v",appsmap[index].name)
+ for j:=0; j<len(appsmap); j++ {
+ if appsmap[j].depinfo == waitstr {
+ chans[j] <- j
+ }
+ }
+ wg.Done()
+ }(i)
+ }
+ wg.Wait()
+ for k:=0; k<len(appsmap); k++ {
+ if appsmap[k].rerr != nil {
+ return pkgerrors.Errorf("error during apps instantiation")
+ }
+ }
+ return nil
+
+}
+// Delete all the resources for a given context
+func (instca *CompositeAppContext) TerminateComApp(cid interface{}) error {
+ ac := appcontext.AppContext{}
+
+ _, err := ac.LoadAppContext(cid)
+ if err != nil {
+ return err
+ }
+ instca.cid = cid
+
+ appsorder, err := ac.GetAppInstruction("order")
+ if err != nil {
+ return err
+ }
+ instca.appsorder = appsorder.(string)
+ appsdependency, err := ac.GetAppInstruction("dependency")
+ if err != nil {
+ return err
+ }
+ instca.appsdependency = appsdependency.(string)
+ instca.appsmap, err = getInstMap(instca.appsorder,instca.appsdependency, "app")
+ if err != nil {
+ return err
+ }
+
+ for j:=0; j<len(instca.appsmap); j++ {
+ clusternames, err := ac.GetClusterNames(instca.appsmap[j].name)
+ if err != nil {
+ return err
+ }
+ instca.appsmap[j].clusters = make([]clusterInfo, len(clusternames))
+ for k:=0; k<len(clusternames); k++ {
+ instca.appsmap[j].clusters[k].name = clusternames[k]
+ resorder, err := ac.GetResourceInstruction(
+ instca.appsmap[j].name, clusternames[k], "order")
+ if err != nil {
+ return err
+ }
+ instca.appsmap[j].clusters[k].resorder = resorder.(string)
+
+ resdependency, err := ac.GetResourceInstruction(
+ instca.appsmap[j].name, clusternames[k], "dependency")
+ if err != nil {
+ return err
+ }
+ instca.appsmap[j].clusters[k].resdependency = resdependency.(string)
+
+ instca.appsmap[j].clusters[k].ressmap, err = getInstMap(
+ instca.appsmap[j].clusters[k].resorder,
+ instca.appsmap[j].clusters[k].resdependency, "res")
+ if err != nil {
+ return err
+ }
+ }
+ }
+ err = terminateApps(ac, instca.appsmap)
+ if err != nil {
+ return err
+ }
+
+ return nil
+
+}
diff --git a/src/rsync/pkg/grpc/installappserver/installappserver.go b/src/rsync/pkg/grpc/installappserver/installappserver.go
index 28b4a585..68118ade 100644
--- a/src/rsync/pkg/grpc/installappserver/installappserver.go
+++ b/src/rsync/pkg/grpc/installappserver/installappserver.go
@@ -17,10 +17,8 @@ import (
"context"
"encoding/json"
"log"
-
"github.com/onap/multicloud-k8s/src/rsync/pkg/grpc/installapp"
- //"google.golang.org/grpc/codes"
- //"google.golang.org/grpc/status"
+ con "github.com/onap/multicloud-k8s/src/rsync/pkg/context"
)
type installappServer struct {
@@ -31,10 +29,17 @@ func (cs *installappServer) InstallApp(ctx context.Context, req *installapp.Inst
installAppReq, _ := json.Marshal(req)
log.Println("GRPC Server received installAppRequest: ", string(installAppReq))
- // Insert call to Server Functionality here
- //
- //
-
+ // Try instantiate the comp app
+ instca := con.CompositeAppContext{}
+ err := instca.InstantiateComApp(req.GetAppContext())
+ if err != nil {
+ log.Println("Instantiation failed: " + err.Error())
+ err := instca.TerminateComApp(req.GetAppContext())
+ if err != nil {
+ log.Println("Termination failed: " + err.Error())
+ }
+ return &installapp.InstallAppResponse{AppContextInstalled: false}, err
+ }
return &installapp.InstallAppResponse{AppContextInstalled: true}, nil
}
@@ -43,8 +48,12 @@ func (cs *installappServer) UninstallApp(ctx context.Context, req *installapp.Un
log.Println("GRPC Server received uninstallAppRequest: ", string(uninstallAppReq))
// Try terminating the comp app here
- //
- //
+ instca := con.CompositeAppContext{}
+ err := instca.TerminateComApp(req.GetAppContext())
+ if err != nil {
+ log.Println("Termination failed: " + err.Error())
+ return &installapp.UninstallAppResponse{AppContextUninstalled: false}, err
+ }
return &installapp.UninstallAppResponse{AppContextUninstalled: true}, nil
}
diff --git a/src/rsync/pkg/internal/config/config.go b/src/rsync/pkg/internal/config/config.go
new file mode 100644
index 00000000..89f2553d
--- /dev/null
+++ b/src/rsync/pkg/internal/config/config.go
@@ -0,0 +1,128 @@
+/*
+ * Copyright 2019 Intel Corporation, Inc
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package config
+
+import (
+ "encoding/json"
+ "log"
+ "os"
+ "reflect"
+)
+
+// Configuration loads up all the values that are used to configure
+// backend implementations
+type Configuration struct {
+ CAFile string `json:"ca-file"`
+ ServerCert string `json:"server-cert"`
+ ServerKey string `json:"server-key"`
+ Password string `json:"password"`
+ DatabaseAddress string `json:"database-address"`
+ DatabaseType string `json:"database-type"`
+ PluginDir string `json:"plugin-dir"`
+ EtcdIP string `json:"etcd-ip"`
+ EtcdCert string `json:"etcd-cert"`
+ EtcdKey string `json:"etcd-key"`
+ EtcdCAFile string `json:"etcd-ca-file"`
+ ServicePort string `json:"service-port"`
+ KubernetesLabelName string `json:"kubernetes-label-name"`
+}
+
+// Config is the structure that stores the configuration
+var gConfig *Configuration
+
+// readConfigFile reads the specified smsConfig file to setup some env variables
+func readConfigFile(file string) (*Configuration, error) {
+ f, err := os.Open(file)
+ if err != nil {
+ return defaultConfiguration(), err
+ }
+ defer f.Close()
+
+ // Setup some defaults here
+ // If the json file has values in it, the defaults will be overwritten
+ conf := defaultConfiguration()
+
+ // Read the configuration from json file
+ decoder := json.NewDecoder(f)
+ err = decoder.Decode(conf)
+ if err != nil {
+ return conf, err
+ }
+
+ return conf, nil
+}
+
+func defaultConfiguration() *Configuration {
+ cwd, err := os.Getwd()
+ if err != nil {
+ log.Println("Error getting cwd. Using .")
+ cwd = "."
+ }
+
+ return &Configuration{
+ CAFile: "ca.cert",
+ ServerCert: "server.cert",
+ ServerKey: "server.key",
+ Password: "",
+ DatabaseAddress: "127.0.0.1",
+ DatabaseType: "mongo",
+ PluginDir: cwd,
+ EtcdIP: "127.0.0.1",
+ EtcdCert: "",
+ EtcdKey: "",
+ EtcdCAFile: "",
+ ServicePort: "9015",
+ KubernetesLabelName: "k8splugin.io/rb-instance-id",
+ }
+}
+
+// GetConfiguration returns the configuration for the app.
+// It will try to load it if it is not already loaded.
+func GetConfiguration() *Configuration {
+ if gConfig == nil {
+ conf, err := readConfigFile("k8sconfig.json")
+ if err != nil {
+ log.Println("Error loading config file. Using defaults.")
+ }
+ gConfig = conf
+ }
+
+ return gConfig
+}
+
+// SetConfigValue sets a value in the configuration
+// This is mostly used to customize the application and
+// should be used carefully.
+func SetConfigValue(key string, value string) *Configuration {
+ c := GetConfiguration()
+ if value == "" || key == "" {
+ return c
+ }
+
+ v := reflect.ValueOf(c).Elem()
+ if v.Kind() == reflect.Struct {
+ f := v.FieldByName(key)
+ if f.IsValid() {
+ if f.CanSet() {
+ if f.Kind() == reflect.String {
+ f.SetString(value)
+ }
+ }
+ }
+ }
+ return c
+}
diff --git a/src/rsync/pkg/internal/utils.go b/src/rsync/pkg/internal/utils.go
new file mode 100644
index 00000000..59ff6df8
--- /dev/null
+++ b/src/rsync/pkg/internal/utils.go
@@ -0,0 +1,59 @@
+/*
+Copyright 2018 Intel Corporation.
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+ http://www.apache.org/licenses/LICENSE-2.0
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package utils
+
+import (
+ "io/ioutil"
+ "os"
+ "path"
+
+ pkgerrors "github.com/pkg/errors"
+ "k8s.io/apimachinery/pkg/runtime"
+ "k8s.io/client-go/kubernetes/scheme"
+)
+
+// DecodeYAML reads a YAMl file to extract the Kubernetes object definition
+func DecodeYAML(path string, into runtime.Object) (runtime.Object, error) {
+ if _, err := os.Stat(path); err != nil {
+ if os.IsNotExist(err) {
+ return nil, pkgerrors.New("File " + path + " not found")
+ } else {
+ return nil, pkgerrors.Wrap(err, "Stat file error")
+ }
+ }
+
+ rawBytes, err := ioutil.ReadFile(path)
+ if err != nil {
+ return nil, pkgerrors.Wrap(err, "Read YAML file error")
+ }
+
+ decode := scheme.Codecs.UniversalDeserializer().Decode
+ obj, _, err := decode(rawBytes, nil, into)
+ if err != nil {
+ return nil, pkgerrors.Wrap(err, "Deserialize YAML error")
+ }
+
+ return obj, nil
+}
+
+//EnsureDirectory makes sure that the directories specified in the path exist
+//If not, it will create them, if possible.
+func EnsureDirectory(f string) error {
+ base := path.Dir(f)
+ _, err := os.Stat(base)
+ if err != nil && !os.IsNotExist(err) {
+ return err
+ }
+ return os.MkdirAll(base, 0755)
+}
diff --git a/src/rsync/pkg/resource/resource.go b/src/rsync/pkg/resource/resource.go
new file mode 100644
index 00000000..9d715697
--- /dev/null
+++ b/src/rsync/pkg/resource/resource.go
@@ -0,0 +1,129 @@
+/*
+Copyright 2018 Intel Corporation.
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+ http://www.apache.org/licenses/LICENSE-2.0
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package resource
+
+import (
+ pkgerrors "github.com/pkg/errors"
+ "k8s.io/apimachinery/pkg/api/meta"
+ metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
+ "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
+ "k8s.io/apimachinery/pkg/runtime/schema"
+
+ utils "github.com/onap/multicloud-k8s/src/rsync/pkg/internal"
+ "github.com/onap/multicloud-k8s/src/rsync/pkg/internal/config"
+ "github.com/onap/multicloud-k8s/src/rsync/pkg/connector"
+)
+
+type Resource struct {
+}
+
+// Create deployment object in a specific Kubernetes cluster
+func (r Resource) Create(yamlFilePath string, namespace string, client connector.KubernetesConnector) (string, error) {
+ if namespace == "" {
+ namespace = "default"
+ }
+
+ //Decode the yaml file to create a runtime.Object
+ unstruct := &unstructured.Unstructured{}
+ //Ignore the returned obj as we expect the data in unstruct
+ _, err := utils.DecodeYAML(yamlFilePath, unstruct)
+ if err != nil {
+ return "", pkgerrors.Wrap(err, "Decode deployment object error")
+ }
+
+ dynClient := client.GetDynamicClient()
+ mapper := client.GetMapper()
+
+ gvk := unstruct.GroupVersionKind()
+ mapping, err := mapper.RESTMapping(schema.GroupKind{Group: gvk.Group, Kind: gvk.Kind}, gvk.Version)
+ if err != nil {
+ return "", pkgerrors.Wrap(err, "Mapping kind to resource error")
+ }
+
+ //Add the tracking label to all resources created here
+ labels := unstruct.GetLabels()
+ //Check if labels exist for this object
+ if labels == nil {
+ labels = map[string]string{}
+ }
+ labels[config.GetConfiguration().KubernetesLabelName] = client.GetInstanceID()
+ unstruct.SetLabels(labels)
+
+ // This checks if the resource we are creating has a podSpec in it
+ // Eg: Deployment, StatefulSet, Job etc..
+ // If a PodSpec is found, the label will be added to it too.
+ connector.TagPodsIfPresent(unstruct, client.GetInstanceID())
+
+ gvr := mapping.Resource
+ var createdObj *unstructured.Unstructured
+
+ switch mapping.Scope.Name() {
+ case meta.RESTScopeNameNamespace:
+ createdObj, err = dynClient.Resource(gvr).Namespace(namespace).Create(unstruct, metav1.CreateOptions{})
+ case meta.RESTScopeNameRoot:
+ createdObj, err = dynClient.Resource(gvr).Create(unstruct, metav1.CreateOptions{})
+ default:
+ return "", pkgerrors.New("Got an unknown RESTSCopeName for mapping: " + gvk.String())
+ }
+
+ if err != nil {
+ return "", pkgerrors.Wrap(err, "Create object error")
+ }
+
+ return createdObj.GetName(), nil
+}
+
+// Delete an existing resource hosted in a specific Kubernetes cluster
+func (r Resource) Delete(yamlFilePath string, resname string, namespace string, client connector.KubernetesConnector) error {
+ if namespace == "" {
+ namespace = "default"
+ }
+
+ //Decode the yaml file to create a runtime.Object
+ unstruct := &unstructured.Unstructured{}
+ //Ignore the returned obj as we expect the data in unstruct
+ _, err := utils.DecodeYAML(yamlFilePath, unstruct)
+ if err != nil {
+ return pkgerrors.Wrap(err, "Decode deployment object error")
+ }
+
+ dynClient := client.GetDynamicClient()
+ mapper := client.GetMapper()
+
+ gvk := unstruct.GroupVersionKind()
+ mapping, err := mapper.RESTMapping(schema.GroupKind{Group: gvk.Group, Kind: gvk.Kind}, gvk.Version)
+ if err != nil {
+ return pkgerrors.Wrap(err, "Mapping kind to resource error")
+ }
+
+ gvr := mapping.Resource
+ deletePolicy := metav1.DeletePropagationForeground
+ opts := &metav1.DeleteOptions{
+ PropagationPolicy: &deletePolicy,
+ }
+
+ switch mapping.Scope.Name() {
+ case meta.RESTScopeNameNamespace:
+ err = dynClient.Resource(gvr).Namespace(namespace).Delete(resname, opts)
+ case meta.RESTScopeNameRoot:
+ err = dynClient.Resource(gvr).Delete(resname, opts)
+ default:
+ return pkgerrors.New("Got an unknown RESTSCopeName for mappin")
+ }
+
+ if err != nil {
+ return pkgerrors.Wrap(err, "Delete object error")
+ }
+ return nil
+}