diff options
author | Manjunath Ranganathaiah <manjunath.ranganathaiah@intel.com> | 2020-06-19 17:54:58 +0000 |
---|---|---|
committer | Manjunath Ranganathaiah <manjunath.ranganathaiah@intel.com> | 2020-06-24 22:58:20 +0000 |
commit | 81c8ffaa3046245caf3aff5bffe2b971d497ac3d (patch) | |
tree | 92804e8f522fa2ec352b4d3bea16d8c2abc645ac /src | |
parent | 7b860ae60bf9686b449ab2fe3f18c33944bdd71c (diff) |
Instantiation and termination of a given context implementation.
Issue-ID: MULTICLOUD-1005
Signed-off-by: Manjunath Ranganathaiah <manjunath.ranganathaiah@intel.com>
Change-Id: I60e11aaad97b60efc24a02866dc0e580507e5296
Diffstat (limited to 'src')
-rw-r--r-- | src/rsync/cmd/main.go | 4 | ||||
-rw-r--r-- | src/rsync/go.mod | 12 | ||||
-rw-r--r-- | src/rsync/go.sum | 5 | ||||
-rw-r--r-- | src/rsync/pkg/app/client.go | 154 | ||||
-rw-r--r-- | src/rsync/pkg/connector/connector.go | 96 | ||||
-rw-r--r-- | src/rsync/pkg/context/context.go | 547 | ||||
-rw-r--r-- | src/rsync/pkg/grpc/installappserver/installappserver.go | 27 | ||||
-rw-r--r-- | src/rsync/pkg/internal/config/config.go | 128 | ||||
-rw-r--r-- | src/rsync/pkg/internal/utils.go | 59 | ||||
-rw-r--r-- | src/rsync/pkg/resource/resource.go | 129 |
10 files changed, 1144 insertions, 17 deletions
diff --git a/src/rsync/cmd/main.go b/src/rsync/cmd/main.go index f46fa79b..95c36e20 100644 --- a/src/rsync/cmd/main.go +++ b/src/rsync/cmd/main.go @@ -81,7 +81,6 @@ func main() { // Initialize the mongodb err := db.InitializeDatabaseConnection("mco") if err != nil { - fmt.Println(" Exiting mongod ") log.Println("Unable to initialize database connection...") log.Println(err) log.Fatalln("Exiting...") @@ -90,14 +89,13 @@ func main() { // Initialize contextdb err = contextDb.InitializeContextDatabase() if err != nil { - fmt.Println(" Exiting etcd") log.Println("Unable to initialize database connection...") log.Println(err) log.Fatalln("Exiting...") } // Start grpc - fmt.Println("starting rsync GRPC server..") + log.Println("starting rsync GRPC server..") err = startGrpcServer() if err != nil { log.Fatalf("GRPC server failed to start") diff --git a/src/rsync/go.mod b/src/rsync/go.mod index 9c3362ce..a2c5f83b 100644 --- a/src/rsync/go.mod +++ b/src/rsync/go.mod @@ -3,13 +3,15 @@ module github.com/onap/multicloud-k8s/src/rsync go 1.13 require ( - github.com/golang/protobuf v1.3.4 + github.com/golang/protobuf v1.4.1 + github.com/googleapis/gnostic v0.4.0 + github.com/grpc-ecosystem/go-grpc-middleware v1.0.1-0.20190118093823-f849b5445de4 // indirect + github.com/grpc-ecosystem/grpc-gateway v1.9.5 // indirect + github.com/mattn/go-isatty v0.0.4 // indirect + github.com/modern-go/reflect2 v1.0.1 // indirect github.com/onap/multicloud-k8s/src/orchestrator v0.0.0-20200601021239-7959bd4c6fd4 - golang.org/x/lint v0.0.0-20190313153728-d0100b6bd8b3 // indirect - google.golang.org/appengine v1.4.0 // indirect + go.etcd.io/bbolt v1.3.3 // indirect google.golang.org/grpc v1.27.1 - honnef.co/go/tools v0.0.0-20190523083050-ea95bdfd59fc // indirect - github.com/googleapis/gnostic v0.4.0 k8s.io/kubernetes v1.14.1 ) diff --git a/src/rsync/go.sum b/src/rsync/go.sum index 270417c2..c49c5b07 100644 --- a/src/rsync/go.sum +++ b/src/rsync/go.sum @@ -742,6 +742,8 @@ google.golang.org/genproto v0.0.0-20190819201941-24fa4b261c55 h1:gSJIx1SDwno+2El google.golang.org/genproto v0.0.0-20190819201941-24fa4b261c55/go.mod h1:DMBHOl98Agz4BDEuKkezgsaosCRResVns1a3J2ZsMNc= google.golang.org/genproto v0.0.0-20200305110556-506484158171 h1:xes2Q2k+d/+YNXVw0FpZkIDJiaux4OVrRKXRAzH6A0U= google.golang.org/genproto v0.0.0-20200305110556-506484158171/go.mod h1:55QSHmfGQM9UVYDPBsyGGes0y52j32PQ3BqQfXhyH3c= +google.golang.org/genproto v0.0.0-20200526211855-cb27e3aa2013 h1:+kGHl1aib/qcwaRi1CbqBZ1rk19r85MNUf8HaBghugY= +google.golang.org/genproto v0.0.0-20200526211855-cb27e3aa2013/go.mod h1:NbSheEEYHJ7i3ixzK3sjbqSGDJWnxyFXZblF3eUsNvo= google.golang.org/grpc v1.17.0/go.mod h1:6QZJwpn2B+Zp71q/5VxRsJ6NXXVCE5NRUHRo+f3cWCs= google.golang.org/grpc v1.19.0 h1:cfg4PD8YEdSFnm7qLV4++93WcmhH2nIUhMjhdCvl3j8= google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c= @@ -765,6 +767,9 @@ google.golang.org/protobuf v1.20.1-0.20200309200217-e05f789c0967/go.mod h1:A+miE google.golang.org/protobuf v1.21.0/go.mod h1:47Nbq4nVaFHyn7ilMalzfO3qCViNmqZ2kzikPIcrTAo= google.golang.org/protobuf v1.22.0 h1:cJv5/xdbk1NnMPR1VP9+HU6gupuG9MLBoH1r6RHZ2MY= google.golang.org/protobuf v1.22.0/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU= +google.golang.org/protobuf v1.23.1-0.20200526195155-81db48ad09cc/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU= +google.golang.org/protobuf v1.24.0 h1:UhZDfRO8JRQru4/+LlLE0BRKGF8L+PICnvYZmx/fEGA= +google.golang.org/protobuf v1.24.0/go.mod h1:r/3tXBNzIEhYS9I1OUVjXDlt8tc493IdKGjtUeSXeh4= gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLkstjWtayDeSgw= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= diff --git a/src/rsync/pkg/app/client.go b/src/rsync/pkg/app/client.go new file mode 100644 index 00000000..fb57d46b --- /dev/null +++ b/src/rsync/pkg/app/client.go @@ -0,0 +1,154 @@ +/* +Copyright 2018 Intel Corporation. +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package app + +import ( + "os" + "strings" + "time" + "encoding/base64" + + pkgerrors "github.com/pkg/errors" + "k8s.io/apimachinery/pkg/api/meta" + "k8s.io/client-go/discovery/cached/disk" + "k8s.io/client-go/dynamic" + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/restmapper" + "k8s.io/client-go/tools/clientcmd" + + "github.com/onap/multicloud-k8s/src/clm/pkg/cluster" +) + +const basePath string = "/tmp/rsync/" + +// KubernetesClient encapsulates the different clients' interfaces +// we need when interacting with a Kubernetes cluster +type KubernetesClient struct { + clientSet kubernetes.Interface + dynamicClient dynamic.Interface + discoverClient *disk.CachedDiscoveryClient + restMapper meta.RESTMapper + instanceID string +} + +// getKubeConfig uses the connectivity client to get the kubeconfig based on the name +// of the clustername. This is written out to a file. +func (k *KubernetesClient) getKubeConfig(clustername string, id string) (string, error) { + + if !strings.Contains(clustername, "+") { + return "", pkgerrors.New("Not a valid cluster name") + } + strs := strings.Split(clustername, "+") + if len(strs) != 2 { + return "", pkgerrors.New("Not a valid cluster name") + } + kubeConfig, err := cluster.NewClusterClient().GetClusterContent(strs[0], strs[1]) + if err != nil { + return "", pkgerrors.New("Get kubeconfig failed") + } + + var kubeConfigPath string = basePath + id + "/" + clustername + "/" + + if _, err := os.Stat(kubeConfigPath); os.IsNotExist(err) { + err = os.MkdirAll(kubeConfigPath, 0755) + if err != nil { + return "", err + } + } + kubeConfigPath = kubeConfigPath + "config" + + f, err := os.Create(kubeConfigPath) + defer f.Close() + if err != nil { + return "", err + } + dec, err := base64.StdEncoding.DecodeString(kubeConfig.Kubeconfig) + if err != nil { + return "", err + } + _, err = f.Write(dec) + if err != nil { + return "", err + } + + return kubeConfigPath, nil +} + +// init loads the Kubernetes configuation values stored into the local configuration file +func (k *KubernetesClient) Init(clustername string, iid string) error { + if clustername == "" { + return pkgerrors.New("Cloudregion is empty") + } + + if iid == "" { + return pkgerrors.New("Instance ID is empty") + } + + k.instanceID = iid + + configPath, err := k.getKubeConfig(clustername, iid) + if err != nil { + return pkgerrors.Wrap(err, "Get kubeconfig file") + } + + //Remove kubeconfigfile after the clients are created + defer os.Remove(configPath) + + config, err := clientcmd.BuildConfigFromFlags("", configPath) + if err != nil { + return pkgerrors.Wrap(err, "setConfig: Build config from flags raised an error") + } + + k.clientSet, err = kubernetes.NewForConfig(config) + if err != nil { + return err + } + + k.dynamicClient, err = dynamic.NewForConfig(config) + if err != nil { + return pkgerrors.Wrap(err, "Creating dynamic client") + } + + k.discoverClient, err = disk.NewCachedDiscoveryClientForConfig(config, os.TempDir(), "", 10*time.Minute) + if err != nil { + return pkgerrors.Wrap(err, "Creating discovery client") + } + + k.restMapper = restmapper.NewDeferredDiscoveryRESTMapper(k.discoverClient) + + return nil +} + +//GetMapper returns the RESTMapper that was created for this client +func (k *KubernetesClient) GetMapper() meta.RESTMapper { + return k.restMapper +} + +//GetDynamicClient returns the dynamic client that is needed for +//unstructured REST calls to the apiserver +func (k *KubernetesClient) GetDynamicClient() dynamic.Interface { + return k.dynamicClient +} + +// GetStandardClient returns the standard client that can be used to handle +// standard kubernetes kinds +func (k *KubernetesClient) GetStandardClient() kubernetes.Interface { + return k.clientSet +} + +//GetInstanceID returns the instanceID that is injected into all the +//resources created by the plugin +func (k *KubernetesClient) GetInstanceID() string { + return k.instanceID +} diff --git a/src/rsync/pkg/connector/connector.go b/src/rsync/pkg/connector/connector.go new file mode 100644 index 00000000..6e17f87a --- /dev/null +++ b/src/rsync/pkg/connector/connector.go @@ -0,0 +1,96 @@ +/* + * Copyright 2019 Intel Corporation, Inc + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package connector + +import ( + "log" + + "github.com/onap/multicloud-k8s/src/rsync/pkg/internal/config" + + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/meta" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/client-go/dynamic" + "k8s.io/client-go/kubernetes" +) + +// KubernetesConnector is an interface that is expected to be implemented +// by any code that calls the plugin framework functions. +// It implements methods that are needed by the plugins to get Kubernetes +// clients and other information needed to interface with Kubernetes +type KubernetesConnector interface { + //GetMapper returns the RESTMapper that was created for this client + GetMapper() meta.RESTMapper + + //GetDynamicClient returns the dynamic client that is needed for + //unstructured REST calls to the apiserver + GetDynamicClient() dynamic.Interface + + // GetStandardClient returns the standard client that can be used to handle + // standard kubernetes kinds + GetStandardClient() kubernetes.Interface + + //GetInstanceID returns the InstanceID for tracking during creation + GetInstanceID() string +} + +// Reference is the interface that is implemented +type Reference interface { + //Create a kubernetes resource described by the yaml in yamlFilePath + Create(yamlFilePath string, namespace string, client KubernetesConnector) (string, error) + //Delete a kubernetes resource described in the provided namespace + Delete(yamlFilePath string, resname string, namespace string, client KubernetesConnector) error +} + +// TagPodsIfPresent finds the PodTemplateSpec from any workload +// object that contains it and changes the spec to include the tag label +func TagPodsIfPresent(unstruct *unstructured.Unstructured, tag string) { + + spec, ok := unstruct.Object["spec"].(map[string]interface{}) + if !ok { + log.Println("Error converting spec to map") + return + } + + template, ok := spec["template"].(map[string]interface{}) + if !ok { + log.Println("Error converting template to map") + return + } + + //Attempt to convert the template to a podtemplatespec. + //This is to check if we have any pods being created. + podTemplateSpec := &corev1.PodTemplateSpec{} + err := runtime.DefaultUnstructuredConverter.FromUnstructured(template, podTemplateSpec) + if err != nil { + log.Println("Did not find a podTemplateSpec: " + err.Error()) + return + } + + labels := podTemplateSpec.GetLabels() + if labels == nil { + labels = map[string]string{} + } + labels[config.GetConfiguration().KubernetesLabelName] = tag + podTemplateSpec.SetLabels(labels) + + updatedTemplate, err := runtime.DefaultUnstructuredConverter.ToUnstructured(podTemplateSpec) + + //Set the label + spec["template"] = updatedTemplate +} diff --git a/src/rsync/pkg/context/context.go b/src/rsync/pkg/context/context.go new file mode 100644 index 00000000..67e589c9 --- /dev/null +++ b/src/rsync/pkg/context/context.go @@ -0,0 +1,547 @@ +/* + * Copyright 2020 Intel Corporation, Inc + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package context + +import ( + "encoding/json" + "fmt" + "log" + "os" + "sync" + "strings" + "github.com/onap/multicloud-k8s/src/orchestrator/pkg/appcontext" + pkgerrors "github.com/pkg/errors" + res "github.com/onap/multicloud-k8s/src/rsync/pkg/resource" + con "github.com/onap/multicloud-k8s/src/rsync/pkg/connector" + "github.com/onap/multicloud-k8s/src/rsync/pkg/app" +) + +type CompositeAppContext struct { + cid interface{} + appsorder string + appsdependency string + appsmap []instMap +} +type clusterInfo struct { + name string + resorder string + resdependency string + ressmap []instMap +} +type instMap struct { + name string + depinfo string + status string + rerr error + clusters []clusterInfo +} + +const basePath string = "/tmp/rsync/" + +func getInstMap(order string, dependency string, level string) ([]instMap, error) { + + if order == "" { + return nil, pkgerrors.Errorf("Not a valid order value") + } + if dependency == "" { + return nil, pkgerrors.Errorf("Not a valid dependency value") + } + + if !(level == "app" || level == "res") { + return nil, pkgerrors.Errorf("Not a valid level name given to create map") + } + + + var aov map[string]interface{} + json.Unmarshal([]byte(order), &aov) + + s := fmt.Sprintf("%vorder", level) + appso := aov[s].([]interface{}) + var instmap = make([]instMap, len(appso)) + + var adv map[string]interface{} + json.Unmarshal([]byte(dependency), &adv) + s = fmt.Sprintf("%vdependency", level) + appsd := adv[s].(map[string]interface{}) + for i, u := range appso { + instmap[i] = instMap{u.(string), appsd[u.(string)].(string), "none", nil, nil} + } + + return instmap, nil +} + +func deleteResource(clustername string, resname string, respath string) error { + k8sClient := app.KubernetesClient{} + err := k8sClient.Init(clustername, resname) + if err != nil { + log.Println("Init failed: " + err.Error()) + return err + } + + var c con.KubernetesConnector + c = &k8sClient + var gp res.Resource + err = gp.Delete(respath, resname, "default", c) + if err != nil { + log.Println("Delete resource failed: " + err.Error()) + return err + } + log.Println("Resource succesfully deleted") + return nil + +} + +func createResource(clustername string, resname string, respath string) error { + k8sClient := app.KubernetesClient{} + err := k8sClient.Init(clustername, resname) + if err != nil { + log.Println("Client init failed: " + err.Error()) + return err + } + + var c con.KubernetesConnector + c = &k8sClient + var gp res.Resource + _, err = gp.Create(respath,"default", c) + if err != nil { + log.Println("Create failed: " + err.Error()) + return err + } + log.Println("Resource succesfully created") + return nil + +} + +func terminateResource(ac appcontext.AppContext, resmap instMap, appname string, clustername string) error { + var resPath string = basePath + appname + "/" + clustername + "/resources/" + rh, err := ac.GetResourceHandle(appname, clustername, resmap.name) + if err != nil { + return err + } + + resval, err := ac.GetValue(rh) + if err != nil { + return err + } + + if resval != "" { + if _, err := os.Stat(resPath); os.IsNotExist(err) { + err = os.MkdirAll(resPath, 0755) + if err != nil { + return err + } + } + resPath := resPath + resmap.name + ".yaml" + f, err := os.Create(resPath) + defer f.Close() + if err != nil { + return err + } + _, err = f.WriteString(resval.(string)) + if err != nil { + return err + } + result := strings.Split(resmap.name, "+") + if result[0] == "" { + return pkgerrors.Errorf("Resource name is nil") + } + err = deleteResource(clustername, result[0], resPath) + if err != nil { + return err + } + //defer os.Remove(resPath) + } else { + return pkgerrors.Errorf("Resource value is nil") + } + + return nil + +} + +func instantiateResource(ac appcontext.AppContext, resmap instMap, appname string, clustername string) error { + var resPath string = basePath + appname + "/" + clustername + "/resources/" + rh, err := ac.GetResourceHandle(appname, clustername, resmap.name) + if err != nil { + return err + } + + resval, err := ac.GetValue(rh) + if err != nil { + return err + } + + if resval != "" { + if _, err := os.Stat(resPath); os.IsNotExist(err) { + err = os.MkdirAll(resPath, 0755) + if err != nil { + return err + } + } + resPath := resPath + resmap.name + ".yaml" + f, err := os.Create(resPath) + defer f.Close() + if err != nil { + return err + } + _, err = f.WriteString(resval.(string)) + if err != nil { + return err + } + result := strings.Split(resmap.name, "+") + if result[0] == "" { + return pkgerrors.Errorf("Resource name is nil") + } + err = createResource(clustername, result[0], resPath) + if err != nil { + return err + } + //defer os.Remove(resPath) + } else { + return pkgerrors.Errorf("Resource value is nil") + } + + return nil + +} + +func terminateResources(ac appcontext.AppContext, ressmap []instMap, appname string, clustername string) error { + var wg sync.WaitGroup + var chans = make([]chan int, len(ressmap)) + for l := range chans { + chans[l] = make(chan int) + } + for i:=0; i<len(ressmap); i++ { + wg.Add(1) + go func(index int) { + if ressmap[index].depinfo == "go" { + ressmap[index].status = "start" + } else { + ressmap[index].status = "waiting" + c := <- chans[index] + if c != index { + ressmap[index].status = "error" + ressmap[index].rerr = pkgerrors.Errorf("channel does not match") + wg.Done() + return + } + ressmap[index].status = "start" + } + ressmap[index].rerr = terminateResource(ac, ressmap[index], appname, clustername) + ressmap[index].status = "done" + waitstr := fmt.Sprintf("wait on %v",ressmap[index].name) + for j:=0; j<len(ressmap); j++ { + if ressmap[j].depinfo == waitstr { + chans[j] <- j + } + } + wg.Done() + }(i) + } + wg.Wait() + for k:=0; k<len(ressmap); k++ { + if ressmap[k].rerr != nil { + return pkgerrors.Errorf("error during resources termination") + } + } + return nil + +} + +func instantiateResources(ac appcontext.AppContext, ressmap []instMap, appname string, clustername string) error { + var wg sync.WaitGroup + var chans = make([]chan int, len(ressmap)) + for l := range chans { + chans[l] = make(chan int) + } + for i:=0; i<len(ressmap); i++ { + wg.Add(1) + go func(index int) { + if ressmap[index].depinfo == "go" { + ressmap[index].status = "start" + } else { + ressmap[index].status = "waiting" + c := <- chans[index] + if c != index { + ressmap[index].status = "error" + ressmap[index].rerr = pkgerrors.Errorf("channel does not match") + wg.Done() + return + } + ressmap[index].status = "start" + } + ressmap[index].rerr = instantiateResource(ac, ressmap[index], appname, clustername) + ressmap[index].status = "done" + waitstr := fmt.Sprintf("wait on %v",ressmap[index].name) + for j:=0; j<len(ressmap); j++ { + if ressmap[j].depinfo == waitstr { + chans[j] <- j + } + } + wg.Done() + }(i) + } + wg.Wait() + for k:=0; k<len(ressmap); k++ { + if ressmap[k].rerr != nil { + return pkgerrors.Errorf("error during resources instantiation") + } + } + return nil + +} + +func terminateApp(ac appcontext.AppContext, appmap instMap) error { + + for i:=0; i<len(appmap.clusters); i++ { + err := terminateResources(ac, appmap.clusters[i].ressmap, appmap.name, + appmap.clusters[i].name) + if err != nil { + return err + } + } + log.Println("Termination of app done: " + appmap.name) + + return nil + +} + + +func instantiateApp(ac appcontext.AppContext, appmap instMap) error { + + for i:=0; i<len(appmap.clusters); i++ { + err := instantiateResources(ac, appmap.clusters[i].ressmap, appmap.name, + appmap.clusters[i].name) + if err != nil { + return err + } + } + log.Println("Instantiation of app done: " + appmap.name) + return nil + +} + +func instantiateApps(ac appcontext.AppContext, appsmap []instMap) error { + var wg sync.WaitGroup + var chans = make([]chan int, len(appsmap)) + for l := range chans { + chans[l] = make(chan int) + } + for i:=0; i<len(appsmap); i++ { + wg.Add(1) + go func(index int) { + if appsmap[index].depinfo == "go" { + appsmap[index].status = "start" + } else { + appsmap[index].status = "waiting" + c := <- chans[index] + if c != index { + appsmap[index].status = "error" + appsmap[index].rerr = pkgerrors.Errorf("channel does not match") + wg.Done() + return + } + appsmap[index].status = "start" + } + appsmap[index].rerr = instantiateApp(ac, appsmap[index]) + appsmap[index].status = "done" + waitstr := fmt.Sprintf("wait on %v",appsmap[index].name) + for j:=0; j<len(appsmap); j++ { + if appsmap[j].depinfo == waitstr { + chans[j] <- j + } + } + wg.Done() + }(i) + } + wg.Wait() + for k:=0; k<len(appsmap); k++ { + if appsmap[k].rerr != nil { + return pkgerrors.Errorf("error during apps instantiation") + } + } + return nil + +} + +func (instca *CompositeAppContext) InstantiateComApp(cid interface{}) error { + ac := appcontext.AppContext{} + + _, err := ac.LoadAppContext(cid) + if err != nil { + return err + } + instca.cid = cid + + appsorder, err := ac.GetAppInstruction("order") + if err != nil { + return err + } + instca.appsorder = appsorder.(string) + appsdependency, err := ac.GetAppInstruction("dependency") + if err != nil { + return err + } + instca.appsdependency = appsdependency.(string) + instca.appsmap, err = getInstMap(instca.appsorder,instca.appsdependency, "app") + if err != nil { + return err + } + + for j:=0; j<len(instca.appsmap); j++ { + clusternames, err := ac.GetClusterNames(instca.appsmap[j].name) + if err != nil { + return err + } + instca.appsmap[j].clusters = make([]clusterInfo, len(clusternames)) + for k:=0; k<len(clusternames); k++ { + instca.appsmap[j].clusters[k].name = clusternames[k] + resorder, err := ac.GetResourceInstruction( + instca.appsmap[j].name, clusternames[k], "order") + if err != nil { + return err + } + instca.appsmap[j].clusters[k].resorder = resorder.(string) + + resdependency, err := ac.GetResourceInstruction( + instca.appsmap[j].name, clusternames[k], "dependency") + if err != nil { + return err + } + instca.appsmap[j].clusters[k].resdependency = resdependency.(string) + + instca.appsmap[j].clusters[k].ressmap, err = getInstMap( + instca.appsmap[j].clusters[k].resorder, + instca.appsmap[j].clusters[k].resdependency, "res") + if err != nil { + return err + } + } + } + err = instantiateApps(ac, instca.appsmap) + if err != nil { + return err + } + + return nil +} + +// Delete all the apps +func terminateApps(ac appcontext.AppContext, appsmap []instMap) error { + var wg sync.WaitGroup + var chans = make([]chan int, len(appsmap)) + for l := range chans { + chans[l] = make(chan int) + } + for i:=0; i<len(appsmap); i++ { + wg.Add(1) + go func(index int) { + if appsmap[index].depinfo == "go" { + appsmap[index].status = "start" + } else { + appsmap[index].status = "waiting" + c := <- chans[index] + if c != index { + appsmap[index].status = "error" + appsmap[index].rerr = pkgerrors.Errorf("channel does not match") + wg.Done() + return + } + appsmap[index].status = "start" + } + appsmap[index].rerr = terminateApp(ac, appsmap[index]) + appsmap[index].status = "done" + waitstr := fmt.Sprintf("wait on %v",appsmap[index].name) + for j:=0; j<len(appsmap); j++ { + if appsmap[j].depinfo == waitstr { + chans[j] <- j + } + } + wg.Done() + }(i) + } + wg.Wait() + for k:=0; k<len(appsmap); k++ { + if appsmap[k].rerr != nil { + return pkgerrors.Errorf("error during apps instantiation") + } + } + return nil + +} +// Delete all the resources for a given context +func (instca *CompositeAppContext) TerminateComApp(cid interface{}) error { + ac := appcontext.AppContext{} + + _, err := ac.LoadAppContext(cid) + if err != nil { + return err + } + instca.cid = cid + + appsorder, err := ac.GetAppInstruction("order") + if err != nil { + return err + } + instca.appsorder = appsorder.(string) + appsdependency, err := ac.GetAppInstruction("dependency") + if err != nil { + return err + } + instca.appsdependency = appsdependency.(string) + instca.appsmap, err = getInstMap(instca.appsorder,instca.appsdependency, "app") + if err != nil { + return err + } + + for j:=0; j<len(instca.appsmap); j++ { + clusternames, err := ac.GetClusterNames(instca.appsmap[j].name) + if err != nil { + return err + } + instca.appsmap[j].clusters = make([]clusterInfo, len(clusternames)) + for k:=0; k<len(clusternames); k++ { + instca.appsmap[j].clusters[k].name = clusternames[k] + resorder, err := ac.GetResourceInstruction( + instca.appsmap[j].name, clusternames[k], "order") + if err != nil { + return err + } + instca.appsmap[j].clusters[k].resorder = resorder.(string) + + resdependency, err := ac.GetResourceInstruction( + instca.appsmap[j].name, clusternames[k], "dependency") + if err != nil { + return err + } + instca.appsmap[j].clusters[k].resdependency = resdependency.(string) + + instca.appsmap[j].clusters[k].ressmap, err = getInstMap( + instca.appsmap[j].clusters[k].resorder, + instca.appsmap[j].clusters[k].resdependency, "res") + if err != nil { + return err + } + } + } + err = terminateApps(ac, instca.appsmap) + if err != nil { + return err + } + + return nil + +} diff --git a/src/rsync/pkg/grpc/installappserver/installappserver.go b/src/rsync/pkg/grpc/installappserver/installappserver.go index 28b4a585..68118ade 100644 --- a/src/rsync/pkg/grpc/installappserver/installappserver.go +++ b/src/rsync/pkg/grpc/installappserver/installappserver.go @@ -17,10 +17,8 @@ import ( "context" "encoding/json" "log" - "github.com/onap/multicloud-k8s/src/rsync/pkg/grpc/installapp" - //"google.golang.org/grpc/codes" - //"google.golang.org/grpc/status" + con "github.com/onap/multicloud-k8s/src/rsync/pkg/context" ) type installappServer struct { @@ -31,10 +29,17 @@ func (cs *installappServer) InstallApp(ctx context.Context, req *installapp.Inst installAppReq, _ := json.Marshal(req) log.Println("GRPC Server received installAppRequest: ", string(installAppReq)) - // Insert call to Server Functionality here - // - // - + // Try instantiate the comp app + instca := con.CompositeAppContext{} + err := instca.InstantiateComApp(req.GetAppContext()) + if err != nil { + log.Println("Instantiation failed: " + err.Error()) + err := instca.TerminateComApp(req.GetAppContext()) + if err != nil { + log.Println("Termination failed: " + err.Error()) + } + return &installapp.InstallAppResponse{AppContextInstalled: false}, err + } return &installapp.InstallAppResponse{AppContextInstalled: true}, nil } @@ -43,8 +48,12 @@ func (cs *installappServer) UninstallApp(ctx context.Context, req *installapp.Un log.Println("GRPC Server received uninstallAppRequest: ", string(uninstallAppReq)) // Try terminating the comp app here - // - // + instca := con.CompositeAppContext{} + err := instca.TerminateComApp(req.GetAppContext()) + if err != nil { + log.Println("Termination failed: " + err.Error()) + return &installapp.UninstallAppResponse{AppContextUninstalled: false}, err + } return &installapp.UninstallAppResponse{AppContextUninstalled: true}, nil } diff --git a/src/rsync/pkg/internal/config/config.go b/src/rsync/pkg/internal/config/config.go new file mode 100644 index 00000000..89f2553d --- /dev/null +++ b/src/rsync/pkg/internal/config/config.go @@ -0,0 +1,128 @@ +/* + * Copyright 2019 Intel Corporation, Inc + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package config + +import ( + "encoding/json" + "log" + "os" + "reflect" +) + +// Configuration loads up all the values that are used to configure +// backend implementations +type Configuration struct { + CAFile string `json:"ca-file"` + ServerCert string `json:"server-cert"` + ServerKey string `json:"server-key"` + Password string `json:"password"` + DatabaseAddress string `json:"database-address"` + DatabaseType string `json:"database-type"` + PluginDir string `json:"plugin-dir"` + EtcdIP string `json:"etcd-ip"` + EtcdCert string `json:"etcd-cert"` + EtcdKey string `json:"etcd-key"` + EtcdCAFile string `json:"etcd-ca-file"` + ServicePort string `json:"service-port"` + KubernetesLabelName string `json:"kubernetes-label-name"` +} + +// Config is the structure that stores the configuration +var gConfig *Configuration + +// readConfigFile reads the specified smsConfig file to setup some env variables +func readConfigFile(file string) (*Configuration, error) { + f, err := os.Open(file) + if err != nil { + return defaultConfiguration(), err + } + defer f.Close() + + // Setup some defaults here + // If the json file has values in it, the defaults will be overwritten + conf := defaultConfiguration() + + // Read the configuration from json file + decoder := json.NewDecoder(f) + err = decoder.Decode(conf) + if err != nil { + return conf, err + } + + return conf, nil +} + +func defaultConfiguration() *Configuration { + cwd, err := os.Getwd() + if err != nil { + log.Println("Error getting cwd. Using .") + cwd = "." + } + + return &Configuration{ + CAFile: "ca.cert", + ServerCert: "server.cert", + ServerKey: "server.key", + Password: "", + DatabaseAddress: "127.0.0.1", + DatabaseType: "mongo", + PluginDir: cwd, + EtcdIP: "127.0.0.1", + EtcdCert: "", + EtcdKey: "", + EtcdCAFile: "", + ServicePort: "9015", + KubernetesLabelName: "k8splugin.io/rb-instance-id", + } +} + +// GetConfiguration returns the configuration for the app. +// It will try to load it if it is not already loaded. +func GetConfiguration() *Configuration { + if gConfig == nil { + conf, err := readConfigFile("k8sconfig.json") + if err != nil { + log.Println("Error loading config file. Using defaults.") + } + gConfig = conf + } + + return gConfig +} + +// SetConfigValue sets a value in the configuration +// This is mostly used to customize the application and +// should be used carefully. +func SetConfigValue(key string, value string) *Configuration { + c := GetConfiguration() + if value == "" || key == "" { + return c + } + + v := reflect.ValueOf(c).Elem() + if v.Kind() == reflect.Struct { + f := v.FieldByName(key) + if f.IsValid() { + if f.CanSet() { + if f.Kind() == reflect.String { + f.SetString(value) + } + } + } + } + return c +} diff --git a/src/rsync/pkg/internal/utils.go b/src/rsync/pkg/internal/utils.go new file mode 100644 index 00000000..59ff6df8 --- /dev/null +++ b/src/rsync/pkg/internal/utils.go @@ -0,0 +1,59 @@ +/* +Copyright 2018 Intel Corporation. +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package utils + +import ( + "io/ioutil" + "os" + "path" + + pkgerrors "github.com/pkg/errors" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/client-go/kubernetes/scheme" +) + +// DecodeYAML reads a YAMl file to extract the Kubernetes object definition +func DecodeYAML(path string, into runtime.Object) (runtime.Object, error) { + if _, err := os.Stat(path); err != nil { + if os.IsNotExist(err) { + return nil, pkgerrors.New("File " + path + " not found") + } else { + return nil, pkgerrors.Wrap(err, "Stat file error") + } + } + + rawBytes, err := ioutil.ReadFile(path) + if err != nil { + return nil, pkgerrors.Wrap(err, "Read YAML file error") + } + + decode := scheme.Codecs.UniversalDeserializer().Decode + obj, _, err := decode(rawBytes, nil, into) + if err != nil { + return nil, pkgerrors.Wrap(err, "Deserialize YAML error") + } + + return obj, nil +} + +//EnsureDirectory makes sure that the directories specified in the path exist +//If not, it will create them, if possible. +func EnsureDirectory(f string) error { + base := path.Dir(f) + _, err := os.Stat(base) + if err != nil && !os.IsNotExist(err) { + return err + } + return os.MkdirAll(base, 0755) +} diff --git a/src/rsync/pkg/resource/resource.go b/src/rsync/pkg/resource/resource.go new file mode 100644 index 00000000..9d715697 --- /dev/null +++ b/src/rsync/pkg/resource/resource.go @@ -0,0 +1,129 @@ +/* +Copyright 2018 Intel Corporation. +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package resource + +import ( + pkgerrors "github.com/pkg/errors" + "k8s.io/apimachinery/pkg/api/meta" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/runtime/schema" + + utils "github.com/onap/multicloud-k8s/src/rsync/pkg/internal" + "github.com/onap/multicloud-k8s/src/rsync/pkg/internal/config" + "github.com/onap/multicloud-k8s/src/rsync/pkg/connector" +) + +type Resource struct { +} + +// Create deployment object in a specific Kubernetes cluster +func (r Resource) Create(yamlFilePath string, namespace string, client connector.KubernetesConnector) (string, error) { + if namespace == "" { + namespace = "default" + } + + //Decode the yaml file to create a runtime.Object + unstruct := &unstructured.Unstructured{} + //Ignore the returned obj as we expect the data in unstruct + _, err := utils.DecodeYAML(yamlFilePath, unstruct) + if err != nil { + return "", pkgerrors.Wrap(err, "Decode deployment object error") + } + + dynClient := client.GetDynamicClient() + mapper := client.GetMapper() + + gvk := unstruct.GroupVersionKind() + mapping, err := mapper.RESTMapping(schema.GroupKind{Group: gvk.Group, Kind: gvk.Kind}, gvk.Version) + if err != nil { + return "", pkgerrors.Wrap(err, "Mapping kind to resource error") + } + + //Add the tracking label to all resources created here + labels := unstruct.GetLabels() + //Check if labels exist for this object + if labels == nil { + labels = map[string]string{} + } + labels[config.GetConfiguration().KubernetesLabelName] = client.GetInstanceID() + unstruct.SetLabels(labels) + + // This checks if the resource we are creating has a podSpec in it + // Eg: Deployment, StatefulSet, Job etc.. + // If a PodSpec is found, the label will be added to it too. + connector.TagPodsIfPresent(unstruct, client.GetInstanceID()) + + gvr := mapping.Resource + var createdObj *unstructured.Unstructured + + switch mapping.Scope.Name() { + case meta.RESTScopeNameNamespace: + createdObj, err = dynClient.Resource(gvr).Namespace(namespace).Create(unstruct, metav1.CreateOptions{}) + case meta.RESTScopeNameRoot: + createdObj, err = dynClient.Resource(gvr).Create(unstruct, metav1.CreateOptions{}) + default: + return "", pkgerrors.New("Got an unknown RESTSCopeName for mapping: " + gvk.String()) + } + + if err != nil { + return "", pkgerrors.Wrap(err, "Create object error") + } + + return createdObj.GetName(), nil +} + +// Delete an existing resource hosted in a specific Kubernetes cluster +func (r Resource) Delete(yamlFilePath string, resname string, namespace string, client connector.KubernetesConnector) error { + if namespace == "" { + namespace = "default" + } + + //Decode the yaml file to create a runtime.Object + unstruct := &unstructured.Unstructured{} + //Ignore the returned obj as we expect the data in unstruct + _, err := utils.DecodeYAML(yamlFilePath, unstruct) + if err != nil { + return pkgerrors.Wrap(err, "Decode deployment object error") + } + + dynClient := client.GetDynamicClient() + mapper := client.GetMapper() + + gvk := unstruct.GroupVersionKind() + mapping, err := mapper.RESTMapping(schema.GroupKind{Group: gvk.Group, Kind: gvk.Kind}, gvk.Version) + if err != nil { + return pkgerrors.Wrap(err, "Mapping kind to resource error") + } + + gvr := mapping.Resource + deletePolicy := metav1.DeletePropagationForeground + opts := &metav1.DeleteOptions{ + PropagationPolicy: &deletePolicy, + } + + switch mapping.Scope.Name() { + case meta.RESTScopeNameNamespace: + err = dynClient.Resource(gvr).Namespace(namespace).Delete(resname, opts) + case meta.RESTScopeNameRoot: + err = dynClient.Resource(gvr).Delete(resname, opts) + default: + return pkgerrors.New("Got an unknown RESTSCopeName for mappin") + } + + if err != nil { + return pkgerrors.Wrap(err, "Delete object error") + } + return nil +} |