diff options
author | Eric Multanen <eric.w.multanen@intel.com> | 2020-07-15 18:43:10 +0000 |
---|---|---|
committer | Gerrit Code Review <gerrit@onap.org> | 2020-07-15 18:43:10 +0000 |
commit | ad17b4360890fc2915795515ac265fc66720f4ad (patch) | |
tree | bf5c2f68b3b43178f661f7c890e9f02a496e9b56 /src/rsync/pkg | |
parent | 8223d0671617ee6dcc68307aefd3634e1bb0ac8d (diff) | |
parent | b986e8938aaa26945dc7dcdcb990ec8aa53afff0 (diff) |
Merge "Update Rsync"
Diffstat (limited to 'src/rsync/pkg')
-rw-r--r-- | src/rsync/pkg/app/client.go | 130 | ||||
-rw-r--r-- | src/rsync/pkg/client/apply.go | 93 | ||||
-rw-r--r-- | src/rsync/pkg/client/client.go | 190 | ||||
-rw-r--r-- | src/rsync/pkg/client/create.go | 55 | ||||
-rw-r--r-- | src/rsync/pkg/client/delete.go | 95 | ||||
-rw-r--r-- | src/rsync/pkg/client/factory.go | 291 | ||||
-rw-r--r-- | src/rsync/pkg/client/helpers.go | 75 | ||||
-rw-r--r-- | src/rsync/pkg/client/patch.go | 216 | ||||
-rw-r--r-- | src/rsync/pkg/client/replace.go | 51 | ||||
-rw-r--r-- | src/rsync/pkg/connector/connector.go | 168 | ||||
-rw-r--r-- | src/rsync/pkg/context/context.go | 578 | ||||
-rw-r--r-- | src/rsync/pkg/internal/utils.go | 44 | ||||
-rw-r--r-- | src/rsync/pkg/resource/resource.go | 130 |
13 files changed, 1360 insertions, 756 deletions
diff --git a/src/rsync/pkg/app/client.go b/src/rsync/pkg/app/client.go deleted file mode 100644 index 49997ed0..00000000 --- a/src/rsync/pkg/app/client.go +++ /dev/null @@ -1,130 +0,0 @@ -/* -Copyright 2018 Intel Corporation. -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - http://www.apache.org/licenses/LICENSE-2.0 -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package app - -import ( - "os" - "strings" - "time" - "encoding/base64" - - pkgerrors "github.com/pkg/errors" - "k8s.io/apimachinery/pkg/api/meta" - "k8s.io/client-go/discovery/cached/disk" - "k8s.io/client-go/dynamic" - "k8s.io/client-go/kubernetes" - "k8s.io/client-go/restmapper" - "k8s.io/client-go/tools/clientcmd" - - "github.com/onap/multicloud-k8s/src/clm/pkg/cluster" -) - - -// KubernetesClient encapsulates the different clients' interfaces -// we need when interacting with a Kubernetes cluster -type KubernetesClient struct { - clientSet kubernetes.Interface - dynamicClient dynamic.Interface - discoverClient *disk.CachedDiscoveryClient - restMapper meta.RESTMapper - instanceID string -} - -// getKubeConfig uses the connectivity client to get the kubeconfig based on the name -// of the clustername. This is written out to a file. -func (k *KubernetesClient) getKubeConfig(clustername string, id string) ([]byte, error) { - - if !strings.Contains(clustername, "+") { - return nil, pkgerrors.New("Not a valid cluster name") - } - strs := strings.Split(clustername, "+") - if len(strs) != 2 { - return nil, pkgerrors.New("Not a valid cluster name") - } - kubeConfig, err := cluster.NewClusterClient().GetClusterContent(strs[0], strs[1]) - if err != nil { - return nil, pkgerrors.New("Get kubeconfig failed") - } - - dec, err := base64.StdEncoding.DecodeString(kubeConfig.Kubeconfig) - if err != nil { - return nil, err - } - return dec, nil -} - -// init loads the Kubernetes configuation values stored into the local configuration file -func (k *KubernetesClient) Init(clustername string, iid string) error { - if clustername == "" { - return pkgerrors.New("Cloudregion is empty") - } - - if iid == "" { - return pkgerrors.New("Instance ID is empty") - } - - k.instanceID = iid - - configData, err := k.getKubeConfig(clustername, iid) - if err != nil { - return pkgerrors.Wrap(err, "Get kubeconfig file") - } - - config, err := clientcmd.RESTConfigFromKubeConfig(configData) - if err != nil { - return pkgerrors.Wrap(err, "setConfig: Build config from flags raised an error") - } - - k.clientSet, err = kubernetes.NewForConfig(config) - if err != nil { - return err - } - - k.dynamicClient, err = dynamic.NewForConfig(config) - if err != nil { - return pkgerrors.Wrap(err, "Creating dynamic client") - } - - k.discoverClient, err = disk.NewCachedDiscoveryClientForConfig(config, os.TempDir(), "", 10*time.Minute) - if err != nil { - return pkgerrors.Wrap(err, "Creating discovery client") - } - - k.restMapper = restmapper.NewDeferredDiscoveryRESTMapper(k.discoverClient) - - return nil -} - -//GetMapper returns the RESTMapper that was created for this client -func (k *KubernetesClient) GetMapper() meta.RESTMapper { - return k.restMapper -} - -//GetDynamicClient returns the dynamic client that is needed for -//unstructured REST calls to the apiserver -func (k *KubernetesClient) GetDynamicClient() dynamic.Interface { - return k.dynamicClient -} - -// GetStandardClient returns the standard client that can be used to handle -// standard kubernetes kinds -func (k *KubernetesClient) GetStandardClient() kubernetes.Interface { - return k.clientSet -} - -//GetInstanceID returns the instanceID that is injected into all the -//resources created by the plugin -func (k *KubernetesClient) GetInstanceID() string { - return k.instanceID -} diff --git a/src/rsync/pkg/client/apply.go b/src/rsync/pkg/client/apply.go new file mode 100644 index 00000000..96233a31 --- /dev/null +++ b/src/rsync/pkg/client/apply.go @@ -0,0 +1,93 @@ +/* +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ +// Based on Code: https://github.com/johandry/klient +package client + +import ( + "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/types" + "k8s.io/cli-runtime/pkg/resource" + "k8s.io/kubectl/pkg/util" +) + +// Apply creates a resource with the given content +func (c *Client) Apply(content []byte) error { + r := c.ResultForContent(content, nil) + return c.ApplyResource(r) +} + +// ApplyFiles create the resource(s) from the given filenames (file, directory or STDIN) or HTTP URLs +func (c *Client) ApplyFiles(filenames ...string) error { + r := c.ResultForFilenameParam(filenames, nil) + return c.ApplyResource(r) +} + +// ApplyResource applies the given resource. Create the resources with `ResultForFilenameParam` or `ResultForContent` +func (c *Client) ApplyResource(r *resource.Result) error { + if err := r.Err(); err != nil { + return err + } + + // Is ServerSideApply requested + if c.ServerSideApply { + return r.Visit(serverSideApply) + } + + return r.Visit(apply) +} + +func apply(info *resource.Info, err error) error { + if err != nil { + return failedTo("apply", info, err) + } + + // If it does not exists, just create it + current, err := resource.NewHelper(info.Client, info.Mapping).Get(info.Namespace, info.Name, info.Export) + if err != nil { + if !errors.IsNotFound(err) { + return failedTo("retrieve current configuration", info, err) + } + if err := util.CreateApplyAnnotation(info.Object, unstructured.UnstructuredJSONScheme); err != nil { + return failedTo("set annotation", info, err) + } + return create(info, nil) + } + + // If exists, patch it + return patch(info, current) +} + +func serverSideApply(info *resource.Info, err error) error { + if err != nil { + return failedTo("serverside apply", info, err) + } + + data, err := runtime.Encode(unstructured.UnstructuredJSONScheme, info.Object) + if err != nil { + return failedTo("encode for the serverside apply", info, err) + } + + options := metav1.PatchOptions{ + // TODO: Find out how to get the force conflict flag + // Force: &forceConflicts, + // FieldManager: FieldManager, + } + obj, err := resource.NewHelper(info.Client, info.Mapping).Patch(info.Namespace, info.Name, types.ApplyPatchType, data, &options) + if err != nil { + return failedTo("serverside patch", info, err) + } + info.Refresh(obj, true) + return nil +} diff --git a/src/rsync/pkg/client/client.go b/src/rsync/pkg/client/client.go new file mode 100644 index 00000000..0eaded22 --- /dev/null +++ b/src/rsync/pkg/client/client.go @@ -0,0 +1,190 @@ +/* +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ +// Based on Code: https://github.com/johandry/klient +package client + +import ( + "bytes" + "fmt" + "io" + + v1 "k8s.io/api/core/v1" + "k8s.io/cli-runtime/pkg/resource" + "k8s.io/client-go/kubernetes" + "k8s.io/kubectl/pkg/validation" +) + +// DefaultValidation default action to validate. If `true` all resources by +// default will be validated. +const DefaultValidation = true + +// Client is a kubernetes client, like `kubectl` +type Client struct { + Clientset *kubernetes.Clientset + factory *factory + validator validation.Schema + namespace string + enforceNamespace bool + forceConflicts bool + ServerSideApply bool +} + +// Result is an alias for the Kubernetes CLI runtime resource.Result +type Result = resource.Result + +// BuilderOptions parameters to create a Resource Builder +type BuilderOptions struct { + Unstructured bool + Validate bool + Namespace string + LabelSelector string + FieldSelector string + All bool + AllNamespaces bool +} + +// NewBuilderOptions creates a BuilderOptions with the default values for +// the parameters to create a Resource Builder +func NewBuilderOptions() *BuilderOptions { + return &BuilderOptions{ + Unstructured: true, + Validate: true, + } +} + +// NewE creates a kubernetes client, returns an error if fail +func NewE(context, kubeconfig string, ns string) (*Client, error) { + var namespace string + var enforceNamespace bool + var err error + factory := newFactory(context, kubeconfig) + + // If `true` it will always validate the given objects/resources + // Unless something different is specified in the NewBuilderOptions + validator, _ := factory.Validator(DefaultValidation) + + if ns == "" { + namespace, enforceNamespace, err = factory.ToRawKubeConfigLoader().Namespace() + if err != nil { + namespace = v1.NamespaceDefault + enforceNamespace = true + } + } else { + namespace = ns + enforceNamespace = false + } + clientset, err := factory.KubernetesClientSet() + if err != nil { + return nil, err + } + if clientset == nil { + return nil, fmt.Errorf("cannot create a clientset from given context and kubeconfig") + } + + return &Client{ + factory: factory, + Clientset: clientset, + validator: validator, + namespace: namespace, + enforceNamespace: enforceNamespace, + }, nil +} + +// New creates a kubernetes client +func New(context, kubeconfig string, namespace string) *Client { + client, _ := NewE(context, kubeconfig, namespace) + return client +} + +// Builder creates a resource builder +func (c *Client) builder(opt *BuilderOptions) *resource.Builder { + validator := c.validator + namespace := c.namespace + + if opt == nil { + opt = NewBuilderOptions() + } else { + if opt.Validate != DefaultValidation { + validator, _ = c.factory.Validator(opt.Validate) + } + if opt.Namespace != "" { + namespace = opt.Namespace + } + } + + b := c.factory.NewBuilder() + if opt.Unstructured { + b = b.Unstructured() + } + + return b. + Schema(validator). + ContinueOnError(). + NamespaceParam(namespace).DefaultNamespace() +} + +// ResultForFilenameParam returns the builder results for the given list of files or URLs +func (c *Client) ResultForFilenameParam(filenames []string, opt *BuilderOptions) *Result { + filenameOptions := &resource.FilenameOptions{ + Recursive: false, + Filenames: filenames, + } + + return c.builder(opt). + FilenameParam(c.enforceNamespace, filenameOptions). + Flatten(). + Do() +} + +// ResultForReader returns the builder results for the given reader +func (c *Client) ResultForReader(r io.Reader, opt *BuilderOptions) *Result { + return c.builder(opt). + Stream(r, ""). + Flatten(). + Do() +} + +// func (c *Client) ResultForName(opt *BuilderOptions, names ...string) *Result { +// return c.builder(opt). +// LabelSelectorParam(opt.LabelSelector). +// FieldSelectorParam(opt.FieldSelector). +// SelectAllParam(opt.All). +// AllNamespaces(opt.AllNamespaces). +// ResourceTypeOrNameArgs(false, names...).RequireObject(false). +// Flatten(). +// Do() +// } + +// ResultForContent returns the builder results for the given content +func (c *Client) ResultForContent(content []byte, opt *BuilderOptions) *Result { + b := bytes.NewBuffer(content) + return c.ResultForReader(b, opt) +} + +func failedTo(action string, info *resource.Info, err error) error { + var resKind string + if info.Mapping != nil { + resKind = info.Mapping.GroupVersionKind.Kind + " " + } + + return fmt.Errorf("cannot %s object Kind: %q, Name: %q, Namespace: %q. %s", action, resKind, info.Name, info.Namespace, err) +} + +// IsReachable tests connectivity to the cluster +func (c *Client) IsReachable() error { + client, _ := c.factory.KubernetesClientSet() + _, err := client.ServerVersion() + if err != nil { + return fmt.Errorf("Kubernetes cluster unreachable") + } + return nil +}
\ No newline at end of file diff --git a/src/rsync/pkg/client/create.go b/src/rsync/pkg/client/create.go new file mode 100644 index 00000000..755420ca --- /dev/null +++ b/src/rsync/pkg/client/create.go @@ -0,0 +1,55 @@ +/* +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ +// Based on Code: https://github.com/johandry/klient +package client + +import ( + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/cli-runtime/pkg/resource" +) + +// Create creates a resource with the given content +func (c *Client) Create(content []byte) error { + r := c.ResultForContent(content, nil) + return c.CreateResource(r) +} + +// CreateFile creates a resource with the given content +func (c *Client) CreateFile(filenames ...string) error { + r := c.ResultForFilenameParam(filenames, nil) + return c.CreateResource(r) +} + +// CreateResource creates the given resource. Create the resources with `ResultForFilenameParam` or `ResultForContent` +func (c *Client) CreateResource(r *resource.Result) error { + if err := r.Err(); err != nil { + return err + } + return r.Visit(create) +} + +func create(info *resource.Info, err error) error { + if err != nil { + return failedTo("create", info, err) + } + + // TODO: If will be allow to do create then apply, here must be added the annotation as in Apply/Patch + + options := metav1.CreateOptions{} + obj, err := resource.NewHelper(info.Client, info.Mapping).Create(info.Namespace, true, info.Object, &options) + if err != nil { + return failedTo("create", info, err) + } + info.Refresh(obj, true) + + return nil +} diff --git a/src/rsync/pkg/client/delete.go b/src/rsync/pkg/client/delete.go new file mode 100644 index 00000000..1e6aa46a --- /dev/null +++ b/src/rsync/pkg/client/delete.go @@ -0,0 +1,95 @@ +/* +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ +// Based on Code: https://github.com/johandry/klient +package client + +import ( + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/cli-runtime/pkg/resource" +) + +const ( + // Period of time in seconds given to the resource to terminate gracefully when delete it (used when require to recreate the resource). Ignored if negative. Set to 1 for immediate shutdown. Can only be set to 0 when force is true (force deletion) + gracePeriod = -1 + // If true, cascade the deletion of the resources managed by this resource (e.g. Pods created by a ReplicationController). + cascade = true +) + +// Delete creates a resource with the given content +func (c *Client) Delete(content []byte) error { + r := c.ResultForContent(content, nil) + return c.DeleteResource(r) +} + +// DeleteFiles create the resource(s) from the given filenames (file, directory or STDIN) or HTTP URLs +func (c *Client) DeleteFiles(filenames ...string) error { + r := c.ResultForFilenameParam(filenames, nil) + return c.DeleteResource(r) +} + +// DeleteResource applies the given resource. Create the resources with `ResultForFilenameParam` or `ResultForContent` +func (c *Client) DeleteResource(r *resource.Result) error { + if err := r.Err(); err != nil { + return err + } + return r.Visit(delete) +} + +func delete(info *resource.Info, err error) error { + if err != nil { + return failedTo("delete", info, err) + } + + // TODO: Background or Foreground? + // policy := metav1.DeletePropagationForeground + policy := metav1.DeletePropagationBackground + options := metav1.DeleteOptions{ + PropagationPolicy: &policy, + } + + if _, err := deleteWithOptions(info, &options); err != nil { + return failedTo("delete", info, err) + } + return nil +} + +func defaultDeleteOptions() *metav1.DeleteOptions { + // TODO: Change DryRun value when DryRun is implemented + dryRun := false + + options := &metav1.DeleteOptions{} + if gracePeriod >= 0 { + options = metav1.NewDeleteOptions(int64(gracePeriod)) + } + + if dryRun { + options.DryRun = []string{metav1.DryRunAll} + } + + // TODO: Background or Foreground? + // policy := metav1.DeletePropagationBackground + policy := metav1.DeletePropagationForeground + if !cascade { + policy = metav1.DeletePropagationOrphan + } + options.PropagationPolicy = &policy + + return options +} + +func deleteWithOptions(info *resource.Info, options *metav1.DeleteOptions) (runtime.Object, error) { + if options == nil { + options = defaultDeleteOptions() + } + return resource.NewHelper(info.Client, info.Mapping).DeleteWithOptions(info.Namespace, info.Name, options) +} diff --git a/src/rsync/pkg/client/factory.go b/src/rsync/pkg/client/factory.go new file mode 100644 index 00000000..39c1a177 --- /dev/null +++ b/src/rsync/pkg/client/factory.go @@ -0,0 +1,291 @@ +/* +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ +// Based on Code: https://github.com/johandry/klient +package client + +import ( + "path/filepath" + "regexp" + "strings" + "sync" + "time" + + corev1 "k8s.io/api/core/v1" + apiextv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1" + apiextv1beta1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1beta1" + "k8s.io/apimachinery/pkg/api/meta" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/cli-runtime/pkg/genericclioptions" + "k8s.io/cli-runtime/pkg/resource" + "k8s.io/client-go/discovery" + diskcached "k8s.io/client-go/discovery/cached/disk" + "k8s.io/client-go/dynamic" + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/kubernetes/scheme" + "k8s.io/client-go/rest" + "k8s.io/client-go/restmapper" + "k8s.io/client-go/tools/clientcmd" + "k8s.io/client-go/util/homedir" + "k8s.io/kubectl/pkg/util/openapi" + openapivalidation "k8s.io/kubectl/pkg/util/openapi/validation" + "k8s.io/kubectl/pkg/validation" +) + +// factory implements the kubectl Factory interface which also requieres to +// implement the genericclioptions.RESTClientGetter interface. +// The Factory inerface provides abstractions that allow the Kubectl command to +// be extended across multiple types of resources and different API sets. +type factory struct { + KubeConfig string + Context string + initOpenAPIGetterOnce sync.Once + openAPIGetter openapi.Getter +} + +// If multiple clients are created, this sync.once make sure the CRDs are added +// only once into the API extensions v1 and v1beta schemes +var addToSchemeOnce sync.Once + +var _ genericclioptions.RESTClientGetter = &factory{} + +// newFactory creates a new client factory which encapsulate a REST client getter +func newFactory(context, kubeconfig string) *factory { + factory := &factory{ + KubeConfig: kubeconfig, + Context: context, + } + + // From: helm/pkg/kube/client.go > func New() + // Add CRDs to the scheme. They are missing by default. + addToSchemeOnce.Do(func() { + if err := apiextv1.AddToScheme(scheme.Scheme); err != nil { + panic(err) + } + if err := apiextv1beta1.AddToScheme(scheme.Scheme); err != nil { + panic(err) + } + }) + + return factory +} + +// BuildRESTConfig builds a kubernetes REST client factory using the following +// rules from ToRawKubeConfigLoader() +// func BuildRESTConfig(context, kubeconfig string) (*rest.Config, error) { +// return newFactory(context, kubeconfig).ToRESTConfig() +// } + +// ToRESTConfig creates a kubernetes REST client factory. +// It's required to implement the interface genericclioptions.RESTClientGetter +func (f *factory) ToRESTConfig() (*rest.Config, error) { + // From: k8s.io/kubectl/pkg/cmd/util/kubectl_match_version.go > func setKubernetesDefaults() + config, err := f.ToRawKubeConfigLoader().ClientConfig() + if err != nil { + return nil, err + } + + if config.GroupVersion == nil { + config.GroupVersion = &schema.GroupVersion{Group: "", Version: "v1"} + } + if config.APIPath == "" { + config.APIPath = "/api" + } + if config.NegotiatedSerializer == nil { + // This codec config ensures the resources are not converted. Therefore, resources + // will not be round-tripped through internal versions. Defaulting does not happen + // on the client. + config.NegotiatedSerializer = scheme.Codecs.WithoutConversion() + } + + rest.SetKubernetesDefaults(config) + return config, nil +} + +// ToRawKubeConfigLoader creates a client factory using the following rules: +// 1. builds from the given kubeconfig path, if not empty +// 2. use the in cluster factory if running in-cluster +// 3. gets the factory from KUBECONFIG env var +// 4. Uses $HOME/.kube/factory +// It's required to implement the interface genericclioptions.RESTClientGetter +func (f *factory) ToRawKubeConfigLoader() clientcmd.ClientConfig { + loadingRules := clientcmd.NewDefaultClientConfigLoadingRules() + loadingRules.DefaultClientConfig = &clientcmd.DefaultClientConfig + if len(f.KubeConfig) != 0 { + loadingRules.ExplicitPath = f.KubeConfig + } + configOverrides := &clientcmd.ConfigOverrides{ + ClusterDefaults: clientcmd.ClusterDefaults, + } + if len(f.Context) != 0 { + configOverrides.CurrentContext = f.Context + } + + return clientcmd.NewNonInteractiveDeferredLoadingClientConfig(loadingRules, configOverrides) +} + +// overlyCautiousIllegalFileCharacters matches characters that *might* not be supported. Windows is really restrictive, so this is really restrictive +var overlyCautiousIllegalFileCharacters = regexp.MustCompile(`[^(\w/\.)]`) + +// ToDiscoveryClient returns a CachedDiscoveryInterface using a computed RESTConfig +// It's required to implement the interface genericclioptions.RESTClientGetter +func (f *factory) ToDiscoveryClient() (discovery.CachedDiscoveryInterface, error) { + // From: k8s.io/cli-runtime/pkg/genericclioptions/config_flags.go > func (*configFlags) ToDiscoveryClient() + factory, err := f.ToRESTConfig() + if err != nil { + return nil, err + } + factory.Burst = 100 + defaultHTTPCacheDir := filepath.Join(homedir.HomeDir(), ".kube", "http-cache") + + // takes the parentDir and the host and comes up with a "usually non-colliding" name for the discoveryCacheDir + parentDir := filepath.Join(homedir.HomeDir(), ".kube", "cache", "discovery") + // strip the optional scheme from host if its there: + schemelessHost := strings.Replace(strings.Replace(factory.Host, "https://", "", 1), "http://", "", 1) + // now do a simple collapse of non-AZ09 characters. Collisions are possible but unlikely. Even if we do collide the problem is short lived + safeHost := overlyCautiousIllegalFileCharacters.ReplaceAllString(schemelessHost, "_") + discoveryCacheDir := filepath.Join(parentDir, safeHost) + + return diskcached.NewCachedDiscoveryClientForConfig(factory, discoveryCacheDir, defaultHTTPCacheDir, time.Duration(10*time.Minute)) +} + +// ToRESTMapper returns a mapper +// It's required to implement the interface genericclioptions.RESTClientGetter +func (f *factory) ToRESTMapper() (meta.RESTMapper, error) { + // From: k8s.io/cli-runtime/pkg/genericclioptions/config_flags.go > func (*configFlags) ToRESTMapper() + discoveryClient, err := f.ToDiscoveryClient() + if err != nil { + return nil, err + } + + mapper := restmapper.NewDeferredDiscoveryRESTMapper(discoveryClient) + expander := restmapper.NewShortcutExpander(mapper, discoveryClient) + return expander, nil +} + +// KubernetesClientSet creates a kubernetes clientset from the configuration +// It's required to implement the Factory interface +func (f *factory) KubernetesClientSet() (*kubernetes.Clientset, error) { + // From: k8s.io/kubectl/pkg/cmd/util/factory_client_access.go > func (*factoryImpl) KubernetesClientSet() + factory, err := f.ToRESTConfig() + if err != nil { + return nil, err + } + return kubernetes.NewForConfig(factory) +} + +// DynamicClient creates a dynamic client from the configuration +// It's required to implement the Factory interface +func (f *factory) DynamicClient() (dynamic.Interface, error) { + // From: k8s.io/kubectl/pkg/cmd/util/factory_client_access.go > func (*factoryImpl) DynamicClient() + factory, err := f.ToRESTConfig() + if err != nil { + return nil, err + } + return dynamic.NewForConfig(factory) +} + +// NewBuilder returns a new resource builder for structured api objects. +// It's required to implement the Factory interface +func (f *factory) NewBuilder() *resource.Builder { + // From: k8s.io/kubectl/pkg/cmd/util/factory_client_access.go > func (*factoryImpl) NewBuilder() + return resource.NewBuilder(f) +} + +// RESTClient creates a REST client from the configuration +// It's required to implement the Factory interface +func (f *factory) RESTClient() (*rest.RESTClient, error) { + // From: k8s.io/kubectl/pkg/cmd/util/factory_client_access.go > func (*factoryImpl) RESTClient() + factory, err := f.ToRESTConfig() + if err != nil { + return nil, err + } + return rest.RESTClientFor(factory) +} + +func (f *factory) configForMapping(mapping *meta.RESTMapping) (*rest.Config, error) { + factory, err := f.ToRESTConfig() + if err != nil { + return nil, err + } + + gvk := mapping.GroupVersionKind + factory.APIPath = "/apis" + if gvk.Group == corev1.GroupName { + factory.APIPath = "/api" + } + gv := gvk.GroupVersion() + factory.GroupVersion = &gv + + return factory, nil +} + +// ClientForMapping creates a resource REST client from the given mappings +// It's required to implement the Factory interface +func (f *factory) ClientForMapping(mapping *meta.RESTMapping) (resource.RESTClient, error) { + // From: k8s.io/kubectl/pkg/cmd/util/factory_client_access.go > func (*factoryImpl) ClientForMapping() + factory, err := f.configForMapping(mapping) + if err != nil { + return nil, err + } + + return rest.RESTClientFor(factory) +} + +// UnstructuredClientForMapping creates a unstructured resource REST client from the given mappings +// It's required to implement the Factory interface +func (f *factory) UnstructuredClientForMapping(mapping *meta.RESTMapping) (resource.RESTClient, error) { + // From: k8s.io/kubectl/pkg/cmd/util/factory_client_access.go > func (*factoryImpl) UnstructuredClientForMapping() + factory, err := f.configForMapping(mapping) + if err != nil { + return nil, err + } + factory.ContentConfig = resource.UnstructuredPlusDefaultContentConfig() + + return rest.RESTClientFor(factory) +} + +// Validator returns a schema that can validate objects stored on disk. +// It's required to implement the Factory interface +func (f *factory) Validator(validate bool) (validation.Schema, error) { + // From: k8s.io/kubectl/pkg/cmd/util/factory_client_access.go > func (*factoryImpl) Validator(bool) + if !validate { + return validation.NullSchema{}, nil + } + + resources, err := f.OpenAPISchema() + if err != nil { + return nil, err + } + + return validation.ConjunctiveSchema{ + openapivalidation.NewSchemaValidation(resources), + validation.NoDoubleKeySchema{}, + }, nil +} + +// OpenAPISchema returns metadata and structural information about Kubernetes object definitions. +// It's required to implement the Factory interface +func (f *factory) OpenAPISchema() (openapi.Resources, error) { + // From: k8s.io/kubectl/pkg/cmd/util/factory_client-access.go > func (*factoryImpl) OpenAPISchema() + discovery, err := f.ToDiscoveryClient() + if err != nil { + return nil, err + } + + f.initOpenAPIGetterOnce.Do(func() { + // Create the caching OpenAPIGetter + f.openAPIGetter = openapi.NewOpenAPIGetter(discovery) + }) + + // Delegate to the OpenAPIGetter + return f.openAPIGetter.Get() +} diff --git a/src/rsync/pkg/client/helpers.go b/src/rsync/pkg/client/helpers.go new file mode 100644 index 00000000..de6ed93f --- /dev/null +++ b/src/rsync/pkg/client/helpers.go @@ -0,0 +1,75 @@ +/* +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ +// Based on Code: https://github.com/johandry/klient +package client + +import ( + v1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" +) + +// CreateNamespace creates a namespace with the given name +func (c *Client) CreateNamespace(namespace string) error { + ns := &v1.Namespace{ + ObjectMeta: metav1.ObjectMeta{ + Name: namespace, + Labels: map[string]string{ + "name": namespace, + }, + }, + } + _, err := c.Clientset.CoreV1().Namespaces().Create(ns) + // if errors.IsAlreadyExists(err) { + // // If it failed because the NS is already there, then do not return such error + // return nil + // } + + return err +} + +// DeleteNamespace deletes the namespace with the given name +func (c *Client) DeleteNamespace(namespace string) error { + return c.Clientset.CoreV1().Namespaces().Delete(namespace, &metav1.DeleteOptions{}) +} + +// NodesReady returns the number of nodes ready +func (c *Client) NodesReady() (ready int, total int, err error) { + nodes, err := c.Clientset.CoreV1().Nodes().List(metav1.ListOptions{}) + if err != nil { + return 0, 0, err + } + total = len(nodes.Items) + if total == 0 { + return 0, 0, nil + } + for _, n := range nodes.Items { + for _, c := range n.Status.Conditions { + if c.Type == "Ready" && c.Status == "True" { + ready++ + break + } + } + } + + return ready, len(nodes.Items), nil +} + +// Version returns the cluster version. It can be used to verify if the cluster +// is reachable. It will return an error if failed to connect. +func (c *Client) Version() (string, error) { + v, err := c.Clientset.ServerVersion() + if err != nil { + return "", err + } + + return v.String(), nil +} diff --git a/src/rsync/pkg/client/patch.go b/src/rsync/pkg/client/patch.go new file mode 100644 index 00000000..3a620f49 --- /dev/null +++ b/src/rsync/pkg/client/patch.go @@ -0,0 +1,216 @@ +/* +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ +// Based on Code: https://github.com/johandry/klient +package client + +import ( + "fmt" + "os" + "time" + + "github.com/jonboulle/clockwork" + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/api/meta" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/util/jsonmergepatch" + "k8s.io/apimachinery/pkg/util/mergepatch" + "k8s.io/apimachinery/pkg/util/strategicpatch" + "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/cli-runtime/pkg/resource" + oapi "k8s.io/kube-openapi/pkg/util/proto" + "k8s.io/kubectl/pkg/scheme" + "k8s.io/kubectl/pkg/util" + "k8s.io/kubectl/pkg/util/openapi" +) + +const ( + // overwrite if true, automatically resolve conflicts between the modified and live configuration by using values from the modified configuration + overwrite = true + // maxPatchRetry is the maximum number of conflicts retry for during a patch operation before returning failure + maxPatchRetry = 5 + // backOffPeriod is the period to back off when apply patch results in error. + backOffPeriod = 1 * time.Second + // how many times we can retry before back off + triesBeforeBackOff = 1 + // force if true, immediately remove resources from API and bypass graceful deletion. Note that immediate deletion of some resources may result in inconsistency or data loss and requires confirmation. + force = false + // timeout waiting for the resource to be delete if it needs to be recreated + timeout = 0 +) + +// patch tries to patch an OpenAPI resource +func patch(info *resource.Info, current runtime.Object) error { + // From: k8s.io/kubectl/pkg/cmd/apply/apply.go & patcher.go + modified, err := util.GetModifiedConfiguration(info.Object, true, unstructured.UnstructuredJSONScheme) + if err != nil { + return fmt.Errorf("retrieving modified configuration. %s", err) + } + + metadata, _ := meta.Accessor(current) + annotationMap := metadata.GetAnnotations() + if _, ok := annotationMap[corev1.LastAppliedConfigAnnotation]; !ok { + // TODO: Find what to do with the warnings, they should not be printed + fmt.Fprintf(os.Stderr, "Warning: apply should be used on resource created by apply") + } + + patchBytes, patchObject, err := patchSimple(current, modified, info) + + var getErr error + for i := 1; i <= maxPatchRetry && errors.IsConflict(err); i++ { + if i > triesBeforeBackOff { + clockwork.NewRealClock().Sleep(backOffPeriod) + } + current, getErr = resource.NewHelper(info.Client, info.Mapping).Get(info.Namespace, info.Name, false) + if getErr != nil { + return getErr + } + patchBytes, patchObject, err = patchSimple(current, modified, info) + } + if err != nil && (errors.IsConflict(err) || errors.IsInvalid(err)) && force { + patchBytes, patchObject, err = deleteAndCreate(info, patchBytes) + } + + info.Refresh(patchObject, true) + + return nil +} + +func patchSimple(currentObj runtime.Object, modified []byte, info *resource.Info) ([]byte, runtime.Object, error) { + // Serialize the current configuration of the object from the server. + current, err := runtime.Encode(unstructured.UnstructuredJSONScheme, currentObj) + if err != nil { + return nil, nil, fmt.Errorf("serializing current configuration. %s", err) + } + + // Retrieve the original configuration of the object from the annotation. + original, err := util.GetOriginalConfiguration(currentObj) + if err != nil { + return nil, nil, fmt.Errorf("retrieving original configuration. %s", err) + } + + var patchType types.PatchType + var patch []byte + var lookupPatchMeta strategicpatch.LookupPatchMeta + var schema oapi.Schema + + // Create the versioned struct from the type defined in the restmapping + // (which is the API version we'll be submitting the patch to) + versionedObject, err := scheme.Scheme.New(info.Mapping.GroupVersionKind) + + // DEBUG: + // fmt.Printf("Modified: %v\n", string(modified)) + // fmt.Printf("Current: %v\n", string(current)) + // fmt.Printf("Original: %v\n", string(original)) + // fmt.Printf("versionedObj: %v\n", versionedObject) + // fmt.Printf("Error: %+v\nIsNotRegisteredError: %t\n", err, runtime.IsNotRegisteredError(err)) + + switch { + case runtime.IsNotRegisteredError(err): + // fall back to generic JSON merge patch + patchType = types.MergePatchType + preconditions := []mergepatch.PreconditionFunc{mergepatch.RequireKeyUnchanged("apiVersion"), + mergepatch.RequireKeyUnchanged("kind"), mergepatch.RequireMetadataKeyUnchanged("name")} + patch, err = jsonmergepatch.CreateThreeWayJSONMergePatch(original, modified, current, preconditions...) + if err != nil { + if mergepatch.IsPreconditionFailed(err) { + return nil, nil, fmt.Errorf("At least one of apiVersion, kind and name was changed") + } + return nil, nil, fmt.Errorf("creating patch. %s", err) + } + case err != nil: + return nil, nil, fmt.Errorf("getting instance of versioned object. %s", err) + case err == nil: + // Compute a three way strategic merge patch to send to server. + patchType = types.StrategicMergePatchType + + // Try to use openapi first if the openapi spec is available and can successfully calculate the patch. + // Otherwise, fall back to baked-in types. + var openapiSchema openapi.Resources + if openapiSchema != nil { + if schema = openapiSchema.LookupResource(info.Mapping.GroupVersionKind); schema != nil { + lookupPatchMeta = strategicpatch.PatchMetaFromOpenAPI{Schema: schema} + if openapiPatch, err := strategicpatch.CreateThreeWayMergePatch(original, modified, current, lookupPatchMeta, overwrite); err == nil { + patchType = types.StrategicMergePatchType + patch = openapiPatch + // TODO: In case it's necessary to report warnings + // } else { + // log.Printf("Warning: error calculating patch from openapi spec: %s", err) + } + } + } + + if patch == nil { + lookupPatchMeta, err = strategicpatch.NewPatchMetaFromStruct(versionedObject) + if err != nil { + return nil, nil, fmt.Errorf("creating patch. %s", err) + } + patch, err = strategicpatch.CreateThreeWayMergePatch(original, modified, current, lookupPatchMeta, overwrite) + if err != nil { + return nil, nil, fmt.Errorf("creating patch. %s", err) + } + } + } + + if string(patch) == "{}" { + return patch, currentObj, nil + } + + patchedObj, err := resource.NewHelper(info.Client, info.Mapping).Patch(info.Namespace, info.Name, patchType, patch, nil) + return patch, patchedObj, err +} + +func deleteAndCreate(info *resource.Info, modified []byte) ([]byte, runtime.Object, error) { + delOptions := defaultDeleteOptions() + if _, err := deleteWithOptions(info, delOptions); err != nil { + return nil, nil, err + } + + helper := resource.NewHelper(info.Client, info.Mapping) + + // TODO: make a waiter and use it + if err := wait.PollImmediate(1*time.Second, time.Duration(timeout), func() (bool, error) { + if _, err := helper.Get(info.Namespace, info.Name, false); !errors.IsNotFound(err) { + return false, err + } + return true, nil + }); err != nil { + return nil, nil, err + } + + // TODO: Check what GetModifiedConfiguration does, this could be an encode - decode waste of time + // modified, err := util.GetModifiedConfiguration(info.Object, true, unstructured.UnstructuredJSONScheme) + // if err != nil { + // return nil, nil, fmt.Errorf("retrieving modified configuration. %s", err) + // } + versionedObject, _, err := unstructured.UnstructuredJSONScheme.Decode(modified, nil, nil) + if err != nil { + return nil, nil, err + } + + options := metav1.CreateOptions{} + createdObject, err := helper.Create(info.Namespace, true, versionedObject, &options) + if err != nil { + // restore the original object if we fail to create the new one + // but still propagate and advertise error to user + recreated, recreateErr := helper.Create(info.Namespace, true, info.Object, &options) + if recreateErr != nil { + err = fmt.Errorf("An error occurred force-replacing the existing object with the newly provided one. %v.\n\nAdditionally, an error occurred attempting to restore the original object: %v", err, recreateErr) + } else { + createdObject = recreated + } + } + return modified, createdObject, err +} diff --git a/src/rsync/pkg/client/replace.go b/src/rsync/pkg/client/replace.go new file mode 100644 index 00000000..9aba8aca --- /dev/null +++ b/src/rsync/pkg/client/replace.go @@ -0,0 +1,51 @@ +/* +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ +// Based on Code: https://github.com/johandry/klient +package client + +import ( + "k8s.io/cli-runtime/pkg/resource" +) + +// Replace creates a resource with the given content +func (c *Client) Replace(content []byte) error { + r := c.ResultForContent(content, nil) + return c.ReplaceResource(r) +} + +// ReplaceFiles create the resource(s) from the given filenames (file, directory or STDIN) or HTTP URLs +func (c *Client) ReplaceFiles(filenames ...string) error { + r := c.ResultForFilenameParam(filenames, nil) + return c.ReplaceResource(r) +} + +// ReplaceResource applies the given resource. Create the resources with `ResultForFilenameParam` or `ResultForContent` +func (c *Client) ReplaceResource(r *resource.Result) error { + if err := r.Err(); err != nil { + return err + } + return r.Visit(replace) +} + +func replace(info *resource.Info, err error) error { + if err != nil { + return failedTo("replace", info, err) + } + + obj, err := resource.NewHelper(info.Client, info.Mapping).Replace(info.Namespace, info.Name, true, info.Object) + if err != nil { + return failedTo("replace", info, err) + } + info.Refresh(obj, true) + + return nil +} diff --git a/src/rsync/pkg/connector/connector.go b/src/rsync/pkg/connector/connector.go index fc8aa839..2d15d7ec 100644 --- a/src/rsync/pkg/connector/connector.go +++ b/src/rsync/pkg/connector/connector.go @@ -1,94 +1,122 @@ /* - * Copyright 2019 Intel Corporation, Inc - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ +Copyright 2020 Intel Corporation. +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ package connector import ( + "encoding/base64" + "fmt" "log" + "os" + "strings" + "sync" - corev1 "k8s.io/api/core/v1" - "k8s.io/apimachinery/pkg/api/meta" - "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" - "k8s.io/apimachinery/pkg/runtime" - "k8s.io/client-go/dynamic" - "k8s.io/client-go/kubernetes" + "github.com/onap/multicloud-k8s/src/clm/pkg/cluster" + kubeclient "github.com/onap/multicloud-k8s/src/rsync/pkg/client" + pkgerrors "github.com/pkg/errors" ) -// KubernetesConnector is an interface that is expected to be implemented -// by any code that calls the plugin framework functions. -// It implements methods that are needed by the plugins to get Kubernetes -// clients and other information needed to interface with Kubernetes -type KubernetesConnector interface { - //GetMapper returns the RESTMapper that was created for this client - GetMapper() meta.RESTMapper - - //GetDynamicClient returns the dynamic client that is needed for - //unstructured REST calls to the apiserver - GetDynamicClient() dynamic.Interface +type Connector struct { + cid string + Clients map[string]*kubeclient.Client + sync.Mutex +} - // GetStandardClient returns the standard client that can be used to handle - // standard kubernetes kinds - GetStandardClient() kubernetes.Interface +const basePath string = "/tmp/rsync/" - //GetInstanceID returns the InstanceID for tracking during creation - GetInstanceID() string +// Init connector for an app context +func Init(id interface{}) *Connector { + c := make(map[string]*kubeclient.Client) + str := fmt.Sprintf("%v", id) + return &Connector{ + Clients: c, + cid: str, + } } -// Reference is the interface that is implemented -type Reference interface { - //Create a kubernetes resource described by the yaml in yamlFilePath - Create(yamlFilePath string, namespace string, label string, client KubernetesConnector) (string, error) - //Delete a kubernetes resource described in the provided namespace - Delete(yamlFilePath string, resname string, namespace string, client KubernetesConnector) error +// getKubeConfig uses the connectivity client to get the kubeconfig based on the name +// of the clustername. +func getKubeConfig(clustername string) ([]byte, error) { + if !strings.Contains(clustername, "+") { + return nil, pkgerrors.New("Not a valid cluster name") + } + strs := strings.Split(clustername, "+") + if len(strs) != 2 { + return nil, pkgerrors.New("Not a valid cluster name") + } + kubeConfig, err := cluster.NewClusterClient().GetClusterContent(strs[0], strs[1]) + if err != nil { + return nil, pkgerrors.New("Get kubeconfig failed") + } + dec, err := base64.StdEncoding.DecodeString(kubeConfig.Kubeconfig) + if err != nil { + return nil, err + } + return dec, nil } -// TagPodsIfPresent finds the PodTemplateSpec from any workload -// object that contains it and changes the spec to include the tag label -func TagPodsIfPresent(unstruct *unstructured.Unstructured, tag string) { +// GetClient returns client for the cluster +func (c *Connector) GetClient(cluster string) (*kubeclient.Client, error) { + c.Lock() + defer c.Unlock() - spec, ok := unstruct.Object["spec"].(map[string]interface{}) + client, ok := c.Clients[cluster] if !ok { - log.Println("Error converting spec to map") - return - } - - template, ok := spec["template"].(map[string]interface{}) - if !ok { - log.Println("Error converting template to map") - return + // Get file from DB + dec, err := getKubeConfig(cluster) + if err != nil { + return nil, err + } + var kubeConfigPath string = basePath + c.cid + "/" + cluster + "/" + if _, err := os.Stat(kubeConfigPath); os.IsNotExist(err) { + err = os.MkdirAll(kubeConfigPath, 0755) + if err != nil { + return nil, err + } + } + kubeConfig := kubeConfigPath + "config" + f, err := os.Create(kubeConfig) + if err != nil { + return nil, err + } + _, err = f.Write(dec) + if err != nil { + return nil, err + } + client = kubeclient.New("", kubeConfig, "default") + if client != nil { + c.Clients[cluster] = client + } } + return client, nil +} - //Attempt to convert the template to a podtemplatespec. - //This is to check if we have any pods being created. - podTemplateSpec := &corev1.PodTemplateSpec{} - err := runtime.DefaultUnstructuredConverter.FromUnstructured(template, podTemplateSpec) +func (c *Connector) GetClientWithRetry(cluster string) (*kubeclient.Client, error) { + client, err := c.GetClient(cluster) if err != nil { - log.Println("Did not find a podTemplateSpec: " + err.Error()) - return + return nil, err } - - labels := podTemplateSpec.GetLabels() - if labels == nil { - labels = map[string]string{} + if err = client.IsReachable(); err != nil { + return nil, err // TODO: Add retry } - labels["emco/deployment-id"] = tag - podTemplateSpec.SetLabels(labels) - - updatedTemplate, err := runtime.DefaultUnstructuredConverter.ToUnstructured(podTemplateSpec) + return client, nil +} - //Set the label - spec["template"] = updatedTemplate +func (c *Connector) RemoveClient() { + c.Lock() + defer c.Unlock() + err := os.RemoveAll(basePath + "/" + c.cid) + if err != nil { + log.Printf("Warning: Deleting kubepath %s", err) + } } diff --git a/src/rsync/pkg/context/context.go b/src/rsync/pkg/context/context.go index 7e0fce3c..3ce6ee9b 100644 --- a/src/rsync/pkg/context/context.go +++ b/src/rsync/pkg/context/context.go @@ -17,503 +17,231 @@ package context import ( + "context" "encoding/json" "fmt" "log" "strings" - "sync" "github.com/onap/multicloud-k8s/src/orchestrator/pkg/appcontext" - "github.com/onap/multicloud-k8s/src/rsync/pkg/app" - con "github.com/onap/multicloud-k8s/src/rsync/pkg/connector" - res "github.com/onap/multicloud-k8s/src/rsync/pkg/resource" + "github.com/onap/multicloud-k8s/src/orchestrator/pkg/infra/logutils" + kubeclient "github.com/onap/multicloud-k8s/src/rsync/pkg/client" + connector "github.com/onap/multicloud-k8s/src/rsync/pkg/connector" + utils "github.com/onap/multicloud-k8s/src/rsync/pkg/internal" status "github.com/onap/multicloud-k8s/src/rsync/pkg/status" pkgerrors "github.com/pkg/errors" + "golang.org/x/sync/errgroup" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" ) type CompositeAppContext struct { - cid interface{} - appsorder string - appsdependency string - appsmap []instMap -} -type clusterInfo struct { - name string - resorder string - resdependency string - ressmap []instMap -} -type instMap struct { - name string - depinfo string - status string - rerr error - clusters []clusterInfo -} - -func getInstMap(order string, dependency string, level string) ([]instMap, error) { - - if order == "" { - return nil, pkgerrors.Errorf("Not a valid order value") - } - if dependency == "" { - return nil, pkgerrors.Errorf("Not a valid dependency value") - } - - if !(level == "app" || level == "res") { - return nil, pkgerrors.Errorf("Not a valid level name given to create map") - } - - var aov map[string]interface{} - json.Unmarshal([]byte(order), &aov) - - s := fmt.Sprintf("%vorder", level) - appso := aov[s].([]interface{}) - var instmap = make([]instMap, len(appso)) - - var adv map[string]interface{} - json.Unmarshal([]byte(dependency), &adv) - s = fmt.Sprintf("%vdependency", level) - appsd := adv[s].(map[string]interface{}) - for i, u := range appso { - instmap[i] = instMap{u.(string), appsd[u.(string)].(string), "none", nil, nil} - } - - return instmap, nil + cid interface{} } -func deleteResource(clustername string, resname string, respath string) error { - k8sClient := app.KubernetesClient{} - err := k8sClient.Init(clustername, resname) +func getRes(ac appcontext.AppContext, name string, app string, cluster string) ([]byte, error) { + var byteRes []byte + rh, err := ac.GetResourceHandle(app, cluster, name) if err != nil { - log.Println("Init failed: " + err.Error()) - return err + return nil, err } - - var c con.KubernetesConnector - c = &k8sClient - var gp res.Resource - err = gp.Delete(respath, resname, "default", c) - if err != nil { - log.Println("Delete resource failed: " + err.Error() + resname) - return err - } - log.Println("Resource succesfully deleted", resname) - return nil - -} - -func createResource(clustername string, resname string, respath string, label string) error { - k8sClient := app.KubernetesClient{} - err := k8sClient.Init(clustername, resname) - if err != nil { - log.Println("Client init failed: " + err.Error()) - return err - } - - var c con.KubernetesConnector - c = &k8sClient - var gp res.Resource - _, err = gp.Create(respath, "default", label, c) - if err != nil { - log.Println("Create failed: " + err.Error() + resname) - return err - } - log.Println("Resource succesfully created", resname) - return nil - -} - -func terminateResource(ac appcontext.AppContext, resmap instMap, appname string, clustername string) error { - - rh, err := ac.GetResourceHandle(appname, clustername, resmap.name) - if err != nil { - return err - } - resval, err := ac.GetValue(rh) if err != nil { - return err + return nil, err } - if resval != "" { - result := strings.Split(resmap.name, "+") + result := strings.Split(name, "+") if result[0] == "" { - return pkgerrors.Errorf("Resource name is nil") - } - err = deleteResource(clustername, result[0], resval.(string)) - if err != nil { - return err + return nil, pkgerrors.Errorf("Resource name is nil %s:", name) } + byteRes = []byte(fmt.Sprintf("%v", resval.(interface{}))) } else { - return pkgerrors.Errorf("Resource value is nil") + return nil, pkgerrors.Errorf("Resource value is nil %s", name) } - - return nil - + return byteRes, nil } -func instantiateResource(ac appcontext.AppContext, resmap instMap, appname string, clustername string, label string) error { - rh, err := ac.GetResourceHandle(appname, clustername, resmap.name) +func terminateResource(ac appcontext.AppContext, c *kubeclient.Client, name string, app string, cluster string, label string) error { + res, err := getRes(ac, name, app, cluster) if err != nil { return err } - - resval, err := ac.GetValue(rh) - if err != nil { + if err := c.Delete(res); err != nil { + logutils.Error("Failed to delete res", logutils.Fields{ + "error": err, + "resource": name, + }) return err } - - if resval != "" { - result := strings.Split(resmap.name, "+") - if result[0] == "" { - return pkgerrors.Errorf("Resource name is nil") - } - err = createResource(clustername, result[0], resval.(string), label) - if err != nil { - return err - } - } else { - return pkgerrors.Errorf("Resource value is nil") - } - + logutils.Info("Deleted::", logutils.Fields{ + "cluster": cluster, + "resource": res, + }) return nil - } -func terminateResources(ac appcontext.AppContext, ressmap []instMap, appname string, clustername string) error { - var wg sync.WaitGroup - var chans = make([]chan int, len(ressmap)) - for l := range chans { - chans[l] = make(chan int) - } - for i := 0; i < len(ressmap); i++ { - wg.Add(1) - go func(index int) { - if ressmap[index].depinfo == "go" { - ressmap[index].status = "start" - } else { - ressmap[index].status = "waiting" - c := <-chans[index] - if c != index { - ressmap[index].status = "error" - ressmap[index].rerr = pkgerrors.Errorf("channel does not match") - wg.Done() - return - } - ressmap[index].status = "start" - } - ressmap[index].rerr = terminateResource(ac, ressmap[index], appname, clustername) - ressmap[index].status = "done" - waitstr := fmt.Sprintf("wait on %v", ressmap[index].name) - for j := 0; j < len(ressmap); j++ { - if ressmap[j].depinfo == waitstr { - chans[j] <- j - } - } - wg.Done() - }(i) +func instantiateResource(ac appcontext.AppContext, c *kubeclient.Client, name string, app string, cluster string, label string) error { + res, err := getRes(ac, name, app, cluster) + if err != nil { + return err } - wg.Wait() - for k := 0; k < len(ressmap); k++ { - if ressmap[k].rerr != nil { - return pkgerrors.Errorf("error during resources termination") - } + //Decode the yaml to create a runtime.Object + unstruct := &unstructured.Unstructured{} + //Ignore the returned obj as we expect the data in unstruct + _, err = utils.DecodeYAMLData(string(res), unstruct) + if err != nil { + return pkgerrors.Wrap(err, "Decode deployment object error") } - return nil - -} - -func instantiateResources(ac appcontext.AppContext, ressmap []instMap, appname string, clustername string) error { - var wg sync.WaitGroup - var chans = make([]chan int, len(ressmap)) - cid, _ := ac.GetCompositeAppHandle() - results := strings.Split(cid.(string), "/") - label := results[2] + "-" + appname - - for l := range chans { - chans[l] = make(chan int) + //Add the tracking label to all resources created here + labels := unstruct.GetLabels() + //Check if labels exist for this object + if labels == nil { + labels = map[string]string{} } - for i := 0; i < len(ressmap); i++ { - wg.Add(1) - go func(index int) { - if ressmap[index].depinfo == "go" { - ressmap[index].status = "start" - } else { - ressmap[index].status = "waiting" - c := <-chans[index] - if c != index { - ressmap[index].status = "error" - ressmap[index].rerr = pkgerrors.Errorf("channel does not match") - wg.Done() - return - } - ressmap[index].status = "start" - } - ressmap[index].rerr = instantiateResource(ac, ressmap[index], appname, clustername, label) - ressmap[index].status = "done" - waitstr := fmt.Sprintf("wait on %v", ressmap[index].name) - for j := 0; j < len(ressmap); j++ { - if ressmap[j].depinfo == waitstr { - chans[j] <- j - } - } - wg.Done() - }(i) - } - wg.Wait() - for k := 0; k < len(ressmap); k++ { - if ressmap[k].rerr != nil { - return pkgerrors.Errorf("error during resources instantiation") - } - } - return nil - -} - -func terminateApp(ac appcontext.AppContext, appmap instMap) error { + //labels[config.GetConfiguration().KubernetesLabelName] = client.GetInstanceID() + labels["emco/deployment-id"] = label + unstruct.SetLabels(labels) - for i := 0; i < len(appmap.clusters); i++ { - err := terminateResources(ac, appmap.clusters[i].ressmap, appmap.name, - appmap.clusters[i].name) - if err != nil { - return err - } + // This checks if the resource we are creating has a podSpec in it + // Eg: Deployment, StatefulSet, Job etc.. + // If a PodSpec is found, the label will be added to it too. + //connector.TagPodsIfPresent(unstruct, client.GetInstanceID()) + utils.TagPodsIfPresent(unstruct, label) + b, err := unstruct.MarshalJSON() + if err != nil { + logutils.Error("Failed to MarshalJSON", logutils.Fields{ + "error": err, + "resource": name, + }) + return err } - log.Println("Termination of app done: " + appmap.name) - - return nil - -} - -func instantiateApp(ac appcontext.AppContext, appmap instMap) error { - - for i := 0; i < len(appmap.clusters); i++ { - err := instantiateResources(ac, appmap.clusters[i].ressmap, appmap.name, - appmap.clusters[i].name) - if err != nil { - return err - } - err = status.StartClusterWatcher(appmap.clusters[i].name) - if err != nil { - log.Printf("Error starting Cluster Watcher %v: %v\n", appmap.clusters[i], err) - } + if err := c.Apply(b); err != nil { + logutils.Error("Failed to apply res", logutils.Fields{ + "error": err, + "resource": name, + }) + return err } - log.Println("Instantiation of app done: " + appmap.name) + logutils.Info("Installed::", logutils.Fields{ + "cluster": cluster, + "resource": res, + }) return nil - } -func instantiateApps(ac appcontext.AppContext, appsmap []instMap) error { - var wg sync.WaitGroup - var chans = make([]chan int, len(appsmap)) - for l := range chans { - chans[l] = make(chan int) - } - for i := 0; i < len(appsmap); i++ { - wg.Add(1) - go func(index int) { - if appsmap[index].depinfo == "go" { - appsmap[index].status = "start" - } else { - appsmap[index].status = "waiting" - c := <-chans[index] - if c != index { - appsmap[index].status = "error" - appsmap[index].rerr = pkgerrors.Errorf("channel does not match") - wg.Done() - return - } - appsmap[index].status = "start" - } - appsmap[index].rerr = instantiateApp(ac, appsmap[index]) - appsmap[index].status = "done" - waitstr := fmt.Sprintf("wait on %v", appsmap[index].name) - for j := 0; j < len(appsmap); j++ { - if appsmap[j].depinfo == waitstr { - chans[j] <- j - } - } - wg.Done() - }(i) - } - wg.Wait() - for k := 0; k < len(appsmap); k++ { - if appsmap[k].rerr != nil { - return pkgerrors.Errorf("error during apps instantiation") - } - } - return nil - -} +type fn func(ac appcontext.AppContext, client *kubeclient.Client, res string, app string, cluster string, label string) error -func (instca *CompositeAppContext) InstantiateComApp(cid interface{}) error { +func applyFnComApp(cid interface{}, con *connector.Connector, f fn, breakonError bool) error { ac := appcontext.AppContext{} - + g, _ := errgroup.WithContext(context.Background()) _, err := ac.LoadAppContext(cid) if err != nil { return err } - instca.cid = cid - - appsorder, err := ac.GetAppInstruction("order") - if err != nil { - return err - } - instca.appsorder = appsorder.(string) - appsdependency, err := ac.GetAppInstruction("dependency") - if err != nil { - return err - } - instca.appsdependency = appsdependency.(string) - instca.appsmap, err = getInstMap(instca.appsorder, instca.appsdependency, "app") + appsOrder, err := ac.GetAppInstruction("order") if err != nil { return err } + var appList map[string][]string + json.Unmarshal([]byte(appsOrder.(string)), &appList) + logutils.Info("appsorder ", logutils.Fields{ + "appsorder": appsOrder, + "string": appList, + }) + id, _ := ac.GetCompositeAppHandle() - for j := 0; j < len(instca.appsmap); j++ { - clusternames, err := ac.GetClusterNames(instca.appsmap[j].name) - if err != nil { - return err - } - instca.appsmap[j].clusters = make([]clusterInfo, len(clusternames)) - for k := 0; k < len(clusternames); k++ { - instca.appsmap[j].clusters[k].name = clusternames[k] - resorder, err := ac.GetResourceInstruction( - instca.appsmap[j].name, clusternames[k], "order") - if err != nil { - return err - } - instca.appsmap[j].clusters[k].resorder = resorder.(string) + for _, app := range appList["apporder"] { - resdependency, err := ac.GetResourceInstruction( - instca.appsmap[j].name, clusternames[k], "dependency") + appName := app + results := strings.Split(id.(string), "/") + label := results[2] + "-" + app + g.Go(func() error { + clusterNames, err := ac.GetClusterNames(appName) if err != nil { return err } - instca.appsmap[j].clusters[k].resdependency = resdependency.(string) - - instca.appsmap[j].clusters[k].ressmap, err = getInstMap( - instca.appsmap[j].clusters[k].resorder, - instca.appsmap[j].clusters[k].resdependency, "res") - if err != nil { + rg, _ := errgroup.WithContext(context.Background()) + for k := 0; k < len(clusterNames); k++ { + cluster := clusterNames[k] + err = status.StartClusterWatcher(cluster) + if err != nil { + log.Printf("Error starting Cluster Watcher %v: %v\n", cluster, err) + } + rg.Go(func() error { + c, err := con.GetClient(cluster) + if err != nil { + logutils.Error("Error in creating kubeconfig client", logutils.Fields{ + "error": err, + "cluster": cluster, + "appName": appName, + }) + return err + } + resorder, err := ac.GetResourceInstruction(appName, cluster, "order") + if err != nil { + logutils.Error("Resorder error ", logutils.Fields{"error": err}) + return err + } + var aov map[string][]string + json.Unmarshal([]byte(resorder.(string)), &aov) + for _, res := range aov["resorder"] { + err = f(ac, c, res, appName, cluster, label) + if err != nil { + logutils.Error("Error in resource %s: %v", logutils.Fields{ + "error": err, + "cluster": cluster, + "resource": res, + }) + if breakonError { + return err + } + } + } + return nil + }) + } + if err := rg.Wait(); err != nil { + logutils.Error("Encountered error in App cluster", logutils.Fields{ + "error": err, + }) return err } - } + return nil + }) } - err = instantiateApps(ac, instca.appsmap) - if err != nil { + if err := g.Wait(); err != nil { + logutils.Error("Encountered error", logutils.Fields{ + "error": err, + }) return err } - return nil } -// Delete all the apps -func terminateApps(ac appcontext.AppContext, appsmap []instMap) error { - var wg sync.WaitGroup - var chans = make([]chan int, len(appsmap)) - for l := range chans { - chans[l] = make(chan int) - } - for i := 0; i < len(appsmap); i++ { - wg.Add(1) - go func(index int) { - if appsmap[index].depinfo == "go" { - appsmap[index].status = "start" - } else { - appsmap[index].status = "waiting" - c := <-chans[index] - if c != index { - appsmap[index].status = "error" - appsmap[index].rerr = pkgerrors.Errorf("channel does not match") - wg.Done() - return - } - appsmap[index].status = "start" - } - appsmap[index].rerr = terminateApp(ac, appsmap[index]) - appsmap[index].status = "done" - waitstr := fmt.Sprintf("wait on %v", appsmap[index].name) - for j := 0; j < len(appsmap); j++ { - if appsmap[j].depinfo == waitstr { - chans[j] <- j - } - } - wg.Done() - }(i) - } - wg.Wait() - for k := 0; k < len(appsmap); k++ { - if appsmap[k].rerr != nil { - return pkgerrors.Errorf("error during apps instantiation") - } +// InstantiateComApp Instantiate Apps in Composite App +func (instca *CompositeAppContext) InstantiateComApp(cid interface{}) error { + con := connector.Init(cid) + err := applyFnComApp(cid, con, instantiateResource, true) + if err != nil { + logutils.Error("InstantiateComApp unsuccessful", logutils.Fields{"error": err}) + return err } + //Cleanup + con.RemoveClient() return nil - } -// Delete all the resources for a given context +// TerminateComApp Terminates Apps in Composite App func (instca *CompositeAppContext) TerminateComApp(cid interface{}) error { - ac := appcontext.AppContext{} - - _, err := ac.LoadAppContext(cid) - if err != nil { - return err - } - instca.cid = cid - - appsorder, err := ac.GetAppInstruction("order") - if err != nil { - return err - } - instca.appsorder = appsorder.(string) - appsdependency, err := ac.GetAppInstruction("dependency") - if err != nil { - return err - } - instca.appsdependency = appsdependency.(string) - instca.appsmap, err = getInstMap(instca.appsorder, instca.appsdependency, "app") + con := connector.Init(cid) + err := applyFnComApp(cid, con, terminateResource, false) if err != nil { + logutils.Error("TerminateComApp unsuccessful", logutils.Fields{ + "error": err, + }) return err } - - for j := 0; j < len(instca.appsmap); j++ { - clusternames, err := ac.GetClusterNames(instca.appsmap[j].name) - if err != nil { - return err - } - instca.appsmap[j].clusters = make([]clusterInfo, len(clusternames)) - for k := 0; k < len(clusternames); k++ { - instca.appsmap[j].clusters[k].name = clusternames[k] - resorder, err := ac.GetResourceInstruction( - instca.appsmap[j].name, clusternames[k], "order") - if err != nil { - return err - } - instca.appsmap[j].clusters[k].resorder = resorder.(string) - - resdependency, err := ac.GetResourceInstruction( - instca.appsmap[j].name, clusternames[k], "dependency") - if err != nil { - return err - } - instca.appsmap[j].clusters[k].resdependency = resdependency.(string) - - instca.appsmap[j].clusters[k].ressmap, err = getInstMap( - instca.appsmap[j].clusters[k].resorder, - instca.appsmap[j].clusters[k].resdependency, "res") - if err != nil { - return err - } - } - } - err = terminateApps(ac, instca.appsmap) - if err != nil { - return err - } - + //Cleanup + con.RemoveClient() return nil - } diff --git a/src/rsync/pkg/internal/utils.go b/src/rsync/pkg/internal/utils.go index 270edba5..09415ed5 100644 --- a/src/rsync/pkg/internal/utils.go +++ b/src/rsync/pkg/internal/utils.go @@ -1,5 +1,5 @@ /* -Copyright 2018 Intel Corporation. +Copyright 2020 Intel Corporation. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at @@ -15,10 +15,14 @@ package utils import ( "io/ioutil" + "log" "os" "path" + corev1 "k8s.io/api/core/v1" + pkgerrors "github.com/pkg/errors" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/runtime" "k8s.io/client-go/kubernetes/scheme" ) @@ -68,3 +72,41 @@ func EnsureDirectory(f string) error { } return os.MkdirAll(base, 0755) } + +// TagPodsIfPresent finds the PodTemplateSpec from any workload +// object that contains it and changes the spec to include the tag label +func TagPodsIfPresent(unstruct *unstructured.Unstructured, tag string) { + + spec, ok := unstruct.Object["spec"].(map[string]interface{}) + if !ok { + log.Println("Error converting spec to map") + return + } + + template, ok := spec["template"].(map[string]interface{}) + if !ok { + //log.Println("Error converting template to map") + return + } + log.Println("Apply label in template") + //Attempt to convert the template to a podtemplatespec. + //This is to check if we have any pods being created. + podTemplateSpec := &corev1.PodTemplateSpec{} + err := runtime.DefaultUnstructuredConverter.FromUnstructured(template, podTemplateSpec) + if err != nil { + log.Println("Did not find a podTemplateSpec: " + err.Error()) + return + } + + labels := podTemplateSpec.GetLabels() + if labels == nil { + labels = map[string]string{} + } + labels["emco/deployment-id"] = tag + podTemplateSpec.SetLabels(labels) + + updatedTemplate, err := runtime.DefaultUnstructuredConverter.ToUnstructured(podTemplateSpec) + + //Set the label + spec["template"] = updatedTemplate +} diff --git a/src/rsync/pkg/resource/resource.go b/src/rsync/pkg/resource/resource.go deleted file mode 100644 index 2877e2a3..00000000 --- a/src/rsync/pkg/resource/resource.go +++ /dev/null @@ -1,130 +0,0 @@ -/* -Copyright 2018 Intel Corporation. -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - http://www.apache.org/licenses/LICENSE-2.0 -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package resource - -import ( - pkgerrors "github.com/pkg/errors" - "k8s.io/apimachinery/pkg/api/meta" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" - "k8s.io/apimachinery/pkg/runtime/schema" - - "github.com/onap/multicloud-k8s/src/rsync/pkg/connector" - utils "github.com/onap/multicloud-k8s/src/rsync/pkg/internal" -) - -type Resource struct { -} - -// Create deployment object in a specific Kubernetes cluster -func (r Resource) Create(data string, namespace string, label string, client connector.KubernetesConnector) (string, error) { - if namespace == "" { - namespace = "default" - } - - //Decode the yaml file to create a runtime.Object - unstruct := &unstructured.Unstructured{} - //Ignore the returned obj as we expect the data in unstruct - _, err := utils.DecodeYAMLData(data, unstruct) - if err != nil { - return "", pkgerrors.Wrap(err, "Decode deployment object error") - } - - dynClient := client.GetDynamicClient() - mapper := client.GetMapper() - - gvk := unstruct.GroupVersionKind() - mapping, err := mapper.RESTMapping(schema.GroupKind{Group: gvk.Group, Kind: gvk.Kind}, gvk.Version) - if err != nil { - return "", pkgerrors.Wrap(err, "Mapping kind to resource error") - } - - //Add the tracking label to all resources created here - labels := unstruct.GetLabels() - //Check if labels exist for this object - if labels == nil { - labels = map[string]string{} - } - //labels[config.GetConfiguration().KubernetesLabelName] = client.GetInstanceID() - labels["emco/deployment-id"] = label - unstruct.SetLabels(labels) - - // This checks if the resource we are creating has a podSpec in it - // Eg: Deployment, StatefulSet, Job etc.. - // If a PodSpec is found, the label will be added to it too. - //connector.TagPodsIfPresent(unstruct, client.GetInstanceID()) - connector.TagPodsIfPresent(unstruct, label) - - gvr := mapping.Resource - var createdObj *unstructured.Unstructured - - switch mapping.Scope.Name() { - case meta.RESTScopeNameNamespace: - createdObj, err = dynClient.Resource(gvr).Namespace(namespace).Create(unstruct, metav1.CreateOptions{}) - case meta.RESTScopeNameRoot: - createdObj, err = dynClient.Resource(gvr).Create(unstruct, metav1.CreateOptions{}) - default: - return "", pkgerrors.New("Got an unknown RESTSCopeName for mapping: " + gvk.String()) - } - - if err != nil { - return "", pkgerrors.Wrap(err, "Create object error") - } - - return createdObj.GetName(), nil -} - -// Delete an existing resource hosted in a specific Kubernetes cluster -func (r Resource) Delete(data string, resname string, namespace string, client connector.KubernetesConnector) error { - if namespace == "" { - namespace = "default" - } - - //Decode the yaml file to create a runtime.Object - unstruct := &unstructured.Unstructured{} - //Ignore the returned obj as we expect the data in unstruct - _, err := utils.DecodeYAMLData(data, unstruct) - if err != nil { - return pkgerrors.Wrap(err, "Decode deployment object error") - } - - dynClient := client.GetDynamicClient() - mapper := client.GetMapper() - - gvk := unstruct.GroupVersionKind() - mapping, err := mapper.RESTMapping(schema.GroupKind{Group: gvk.Group, Kind: gvk.Kind}, gvk.Version) - if err != nil { - return pkgerrors.Wrap(err, "Mapping kind to resource error") - } - - gvr := mapping.Resource - deletePolicy := metav1.DeletePropagationForeground - opts := &metav1.DeleteOptions{ - PropagationPolicy: &deletePolicy, - } - - switch mapping.Scope.Name() { - case meta.RESTScopeNameNamespace: - err = dynClient.Resource(gvr).Namespace(namespace).Delete(resname, opts) - case meta.RESTScopeNameRoot: - err = dynClient.Resource(gvr).Delete(resname, opts) - default: - return pkgerrors.New("Got an unknown RESTSCopeName for mappin") - } - - if err != nil { - return pkgerrors.Wrap(err, "Delete object error") - } - return nil -} |