summaryrefslogtreecommitdiffstats
path: root/src/rsync/pkg
diff options
context:
space:
mode:
authorEric Multanen <eric.w.multanen@intel.com>2020-07-15 18:43:10 +0000
committerGerrit Code Review <gerrit@onap.org>2020-07-15 18:43:10 +0000
commitad17b4360890fc2915795515ac265fc66720f4ad (patch)
treebf5c2f68b3b43178f661f7c890e9f02a496e9b56 /src/rsync/pkg
parent8223d0671617ee6dcc68307aefd3634e1bb0ac8d (diff)
parentb986e8938aaa26945dc7dcdcb990ec8aa53afff0 (diff)
Merge "Update Rsync"
Diffstat (limited to 'src/rsync/pkg')
-rw-r--r--src/rsync/pkg/app/client.go130
-rw-r--r--src/rsync/pkg/client/apply.go93
-rw-r--r--src/rsync/pkg/client/client.go190
-rw-r--r--src/rsync/pkg/client/create.go55
-rw-r--r--src/rsync/pkg/client/delete.go95
-rw-r--r--src/rsync/pkg/client/factory.go291
-rw-r--r--src/rsync/pkg/client/helpers.go75
-rw-r--r--src/rsync/pkg/client/patch.go216
-rw-r--r--src/rsync/pkg/client/replace.go51
-rw-r--r--src/rsync/pkg/connector/connector.go168
-rw-r--r--src/rsync/pkg/context/context.go578
-rw-r--r--src/rsync/pkg/internal/utils.go44
-rw-r--r--src/rsync/pkg/resource/resource.go130
13 files changed, 1360 insertions, 756 deletions
diff --git a/src/rsync/pkg/app/client.go b/src/rsync/pkg/app/client.go
deleted file mode 100644
index 49997ed0..00000000
--- a/src/rsync/pkg/app/client.go
+++ /dev/null
@@ -1,130 +0,0 @@
-/*
-Copyright 2018 Intel Corporation.
-Licensed under the Apache License, Version 2.0 (the "License");
-you may not use this file except in compliance with the License.
-You may obtain a copy of the License at
- http://www.apache.org/licenses/LICENSE-2.0
-Unless required by applicable law or agreed to in writing, software
-distributed under the License is distributed on an "AS IS" BASIS,
-WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-See the License for the specific language governing permissions and
-limitations under the License.
-*/
-
-package app
-
-import (
- "os"
- "strings"
- "time"
- "encoding/base64"
-
- pkgerrors "github.com/pkg/errors"
- "k8s.io/apimachinery/pkg/api/meta"
- "k8s.io/client-go/discovery/cached/disk"
- "k8s.io/client-go/dynamic"
- "k8s.io/client-go/kubernetes"
- "k8s.io/client-go/restmapper"
- "k8s.io/client-go/tools/clientcmd"
-
- "github.com/onap/multicloud-k8s/src/clm/pkg/cluster"
-)
-
-
-// KubernetesClient encapsulates the different clients' interfaces
-// we need when interacting with a Kubernetes cluster
-type KubernetesClient struct {
- clientSet kubernetes.Interface
- dynamicClient dynamic.Interface
- discoverClient *disk.CachedDiscoveryClient
- restMapper meta.RESTMapper
- instanceID string
-}
-
-// getKubeConfig uses the connectivity client to get the kubeconfig based on the name
-// of the clustername. This is written out to a file.
-func (k *KubernetesClient) getKubeConfig(clustername string, id string) ([]byte, error) {
-
- if !strings.Contains(clustername, "+") {
- return nil, pkgerrors.New("Not a valid cluster name")
- }
- strs := strings.Split(clustername, "+")
- if len(strs) != 2 {
- return nil, pkgerrors.New("Not a valid cluster name")
- }
- kubeConfig, err := cluster.NewClusterClient().GetClusterContent(strs[0], strs[1])
- if err != nil {
- return nil, pkgerrors.New("Get kubeconfig failed")
- }
-
- dec, err := base64.StdEncoding.DecodeString(kubeConfig.Kubeconfig)
- if err != nil {
- return nil, err
- }
- return dec, nil
-}
-
-// init loads the Kubernetes configuation values stored into the local configuration file
-func (k *KubernetesClient) Init(clustername string, iid string) error {
- if clustername == "" {
- return pkgerrors.New("Cloudregion is empty")
- }
-
- if iid == "" {
- return pkgerrors.New("Instance ID is empty")
- }
-
- k.instanceID = iid
-
- configData, err := k.getKubeConfig(clustername, iid)
- if err != nil {
- return pkgerrors.Wrap(err, "Get kubeconfig file")
- }
-
- config, err := clientcmd.RESTConfigFromKubeConfig(configData)
- if err != nil {
- return pkgerrors.Wrap(err, "setConfig: Build config from flags raised an error")
- }
-
- k.clientSet, err = kubernetes.NewForConfig(config)
- if err != nil {
- return err
- }
-
- k.dynamicClient, err = dynamic.NewForConfig(config)
- if err != nil {
- return pkgerrors.Wrap(err, "Creating dynamic client")
- }
-
- k.discoverClient, err = disk.NewCachedDiscoveryClientForConfig(config, os.TempDir(), "", 10*time.Minute)
- if err != nil {
- return pkgerrors.Wrap(err, "Creating discovery client")
- }
-
- k.restMapper = restmapper.NewDeferredDiscoveryRESTMapper(k.discoverClient)
-
- return nil
-}
-
-//GetMapper returns the RESTMapper that was created for this client
-func (k *KubernetesClient) GetMapper() meta.RESTMapper {
- return k.restMapper
-}
-
-//GetDynamicClient returns the dynamic client that is needed for
-//unstructured REST calls to the apiserver
-func (k *KubernetesClient) GetDynamicClient() dynamic.Interface {
- return k.dynamicClient
-}
-
-// GetStandardClient returns the standard client that can be used to handle
-// standard kubernetes kinds
-func (k *KubernetesClient) GetStandardClient() kubernetes.Interface {
- return k.clientSet
-}
-
-//GetInstanceID returns the instanceID that is injected into all the
-//resources created by the plugin
-func (k *KubernetesClient) GetInstanceID() string {
- return k.instanceID
-}
diff --git a/src/rsync/pkg/client/apply.go b/src/rsync/pkg/client/apply.go
new file mode 100644
index 00000000..96233a31
--- /dev/null
+++ b/src/rsync/pkg/client/apply.go
@@ -0,0 +1,93 @@
+/*
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+ http://www.apache.org/licenses/LICENSE-2.0
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+// Based on Code: https://github.com/johandry/klient
+package client
+
+import (
+ "k8s.io/apimachinery/pkg/api/errors"
+ metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
+ "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
+ "k8s.io/apimachinery/pkg/runtime"
+ "k8s.io/apimachinery/pkg/types"
+ "k8s.io/cli-runtime/pkg/resource"
+ "k8s.io/kubectl/pkg/util"
+)
+
+// Apply creates a resource with the given content
+func (c *Client) Apply(content []byte) error {
+ r := c.ResultForContent(content, nil)
+ return c.ApplyResource(r)
+}
+
+// ApplyFiles create the resource(s) from the given filenames (file, directory or STDIN) or HTTP URLs
+func (c *Client) ApplyFiles(filenames ...string) error {
+ r := c.ResultForFilenameParam(filenames, nil)
+ return c.ApplyResource(r)
+}
+
+// ApplyResource applies the given resource. Create the resources with `ResultForFilenameParam` or `ResultForContent`
+func (c *Client) ApplyResource(r *resource.Result) error {
+ if err := r.Err(); err != nil {
+ return err
+ }
+
+ // Is ServerSideApply requested
+ if c.ServerSideApply {
+ return r.Visit(serverSideApply)
+ }
+
+ return r.Visit(apply)
+}
+
+func apply(info *resource.Info, err error) error {
+ if err != nil {
+ return failedTo("apply", info, err)
+ }
+
+ // If it does not exists, just create it
+ current, err := resource.NewHelper(info.Client, info.Mapping).Get(info.Namespace, info.Name, info.Export)
+ if err != nil {
+ if !errors.IsNotFound(err) {
+ return failedTo("retrieve current configuration", info, err)
+ }
+ if err := util.CreateApplyAnnotation(info.Object, unstructured.UnstructuredJSONScheme); err != nil {
+ return failedTo("set annotation", info, err)
+ }
+ return create(info, nil)
+ }
+
+ // If exists, patch it
+ return patch(info, current)
+}
+
+func serverSideApply(info *resource.Info, err error) error {
+ if err != nil {
+ return failedTo("serverside apply", info, err)
+ }
+
+ data, err := runtime.Encode(unstructured.UnstructuredJSONScheme, info.Object)
+ if err != nil {
+ return failedTo("encode for the serverside apply", info, err)
+ }
+
+ options := metav1.PatchOptions{
+ // TODO: Find out how to get the force conflict flag
+ // Force: &forceConflicts,
+ // FieldManager: FieldManager,
+ }
+ obj, err := resource.NewHelper(info.Client, info.Mapping).Patch(info.Namespace, info.Name, types.ApplyPatchType, data, &options)
+ if err != nil {
+ return failedTo("serverside patch", info, err)
+ }
+ info.Refresh(obj, true)
+ return nil
+}
diff --git a/src/rsync/pkg/client/client.go b/src/rsync/pkg/client/client.go
new file mode 100644
index 00000000..0eaded22
--- /dev/null
+++ b/src/rsync/pkg/client/client.go
@@ -0,0 +1,190 @@
+/*
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+ http://www.apache.org/licenses/LICENSE-2.0
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+// Based on Code: https://github.com/johandry/klient
+package client
+
+import (
+ "bytes"
+ "fmt"
+ "io"
+
+ v1 "k8s.io/api/core/v1"
+ "k8s.io/cli-runtime/pkg/resource"
+ "k8s.io/client-go/kubernetes"
+ "k8s.io/kubectl/pkg/validation"
+)
+
+// DefaultValidation default action to validate. If `true` all resources by
+// default will be validated.
+const DefaultValidation = true
+
+// Client is a kubernetes client, like `kubectl`
+type Client struct {
+ Clientset *kubernetes.Clientset
+ factory *factory
+ validator validation.Schema
+ namespace string
+ enforceNamespace bool
+ forceConflicts bool
+ ServerSideApply bool
+}
+
+// Result is an alias for the Kubernetes CLI runtime resource.Result
+type Result = resource.Result
+
+// BuilderOptions parameters to create a Resource Builder
+type BuilderOptions struct {
+ Unstructured bool
+ Validate bool
+ Namespace string
+ LabelSelector string
+ FieldSelector string
+ All bool
+ AllNamespaces bool
+}
+
+// NewBuilderOptions creates a BuilderOptions with the default values for
+// the parameters to create a Resource Builder
+func NewBuilderOptions() *BuilderOptions {
+ return &BuilderOptions{
+ Unstructured: true,
+ Validate: true,
+ }
+}
+
+// NewE creates a kubernetes client, returns an error if fail
+func NewE(context, kubeconfig string, ns string) (*Client, error) {
+ var namespace string
+ var enforceNamespace bool
+ var err error
+ factory := newFactory(context, kubeconfig)
+
+ // If `true` it will always validate the given objects/resources
+ // Unless something different is specified in the NewBuilderOptions
+ validator, _ := factory.Validator(DefaultValidation)
+
+ if ns == "" {
+ namespace, enforceNamespace, err = factory.ToRawKubeConfigLoader().Namespace()
+ if err != nil {
+ namespace = v1.NamespaceDefault
+ enforceNamespace = true
+ }
+ } else {
+ namespace = ns
+ enforceNamespace = false
+ }
+ clientset, err := factory.KubernetesClientSet()
+ if err != nil {
+ return nil, err
+ }
+ if clientset == nil {
+ return nil, fmt.Errorf("cannot create a clientset from given context and kubeconfig")
+ }
+
+ return &Client{
+ factory: factory,
+ Clientset: clientset,
+ validator: validator,
+ namespace: namespace,
+ enforceNamespace: enforceNamespace,
+ }, nil
+}
+
+// New creates a kubernetes client
+func New(context, kubeconfig string, namespace string) *Client {
+ client, _ := NewE(context, kubeconfig, namespace)
+ return client
+}
+
+// Builder creates a resource builder
+func (c *Client) builder(opt *BuilderOptions) *resource.Builder {
+ validator := c.validator
+ namespace := c.namespace
+
+ if opt == nil {
+ opt = NewBuilderOptions()
+ } else {
+ if opt.Validate != DefaultValidation {
+ validator, _ = c.factory.Validator(opt.Validate)
+ }
+ if opt.Namespace != "" {
+ namespace = opt.Namespace
+ }
+ }
+
+ b := c.factory.NewBuilder()
+ if opt.Unstructured {
+ b = b.Unstructured()
+ }
+
+ return b.
+ Schema(validator).
+ ContinueOnError().
+ NamespaceParam(namespace).DefaultNamespace()
+}
+
+// ResultForFilenameParam returns the builder results for the given list of files or URLs
+func (c *Client) ResultForFilenameParam(filenames []string, opt *BuilderOptions) *Result {
+ filenameOptions := &resource.FilenameOptions{
+ Recursive: false,
+ Filenames: filenames,
+ }
+
+ return c.builder(opt).
+ FilenameParam(c.enforceNamespace, filenameOptions).
+ Flatten().
+ Do()
+}
+
+// ResultForReader returns the builder results for the given reader
+func (c *Client) ResultForReader(r io.Reader, opt *BuilderOptions) *Result {
+ return c.builder(opt).
+ Stream(r, "").
+ Flatten().
+ Do()
+}
+
+// func (c *Client) ResultForName(opt *BuilderOptions, names ...string) *Result {
+// return c.builder(opt).
+// LabelSelectorParam(opt.LabelSelector).
+// FieldSelectorParam(opt.FieldSelector).
+// SelectAllParam(opt.All).
+// AllNamespaces(opt.AllNamespaces).
+// ResourceTypeOrNameArgs(false, names...).RequireObject(false).
+// Flatten().
+// Do()
+// }
+
+// ResultForContent returns the builder results for the given content
+func (c *Client) ResultForContent(content []byte, opt *BuilderOptions) *Result {
+ b := bytes.NewBuffer(content)
+ return c.ResultForReader(b, opt)
+}
+
+func failedTo(action string, info *resource.Info, err error) error {
+ var resKind string
+ if info.Mapping != nil {
+ resKind = info.Mapping.GroupVersionKind.Kind + " "
+ }
+
+ return fmt.Errorf("cannot %s object Kind: %q, Name: %q, Namespace: %q. %s", action, resKind, info.Name, info.Namespace, err)
+}
+
+// IsReachable tests connectivity to the cluster
+func (c *Client) IsReachable() error {
+ client, _ := c.factory.KubernetesClientSet()
+ _, err := client.ServerVersion()
+ if err != nil {
+ return fmt.Errorf("Kubernetes cluster unreachable")
+ }
+ return nil
+} \ No newline at end of file
diff --git a/src/rsync/pkg/client/create.go b/src/rsync/pkg/client/create.go
new file mode 100644
index 00000000..755420ca
--- /dev/null
+++ b/src/rsync/pkg/client/create.go
@@ -0,0 +1,55 @@
+/*
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+ http://www.apache.org/licenses/LICENSE-2.0
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+// Based on Code: https://github.com/johandry/klient
+package client
+
+import (
+ metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
+ "k8s.io/cli-runtime/pkg/resource"
+)
+
+// Create creates a resource with the given content
+func (c *Client) Create(content []byte) error {
+ r := c.ResultForContent(content, nil)
+ return c.CreateResource(r)
+}
+
+// CreateFile creates a resource with the given content
+func (c *Client) CreateFile(filenames ...string) error {
+ r := c.ResultForFilenameParam(filenames, nil)
+ return c.CreateResource(r)
+}
+
+// CreateResource creates the given resource. Create the resources with `ResultForFilenameParam` or `ResultForContent`
+func (c *Client) CreateResource(r *resource.Result) error {
+ if err := r.Err(); err != nil {
+ return err
+ }
+ return r.Visit(create)
+}
+
+func create(info *resource.Info, err error) error {
+ if err != nil {
+ return failedTo("create", info, err)
+ }
+
+ // TODO: If will be allow to do create then apply, here must be added the annotation as in Apply/Patch
+
+ options := metav1.CreateOptions{}
+ obj, err := resource.NewHelper(info.Client, info.Mapping).Create(info.Namespace, true, info.Object, &options)
+ if err != nil {
+ return failedTo("create", info, err)
+ }
+ info.Refresh(obj, true)
+
+ return nil
+}
diff --git a/src/rsync/pkg/client/delete.go b/src/rsync/pkg/client/delete.go
new file mode 100644
index 00000000..1e6aa46a
--- /dev/null
+++ b/src/rsync/pkg/client/delete.go
@@ -0,0 +1,95 @@
+/*
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+ http://www.apache.org/licenses/LICENSE-2.0
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+// Based on Code: https://github.com/johandry/klient
+package client
+
+import (
+ metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
+ "k8s.io/apimachinery/pkg/runtime"
+ "k8s.io/cli-runtime/pkg/resource"
+)
+
+const (
+ // Period of time in seconds given to the resource to terminate gracefully when delete it (used when require to recreate the resource). Ignored if negative. Set to 1 for immediate shutdown. Can only be set to 0 when force is true (force deletion)
+ gracePeriod = -1
+ // If true, cascade the deletion of the resources managed by this resource (e.g. Pods created by a ReplicationController).
+ cascade = true
+)
+
+// Delete creates a resource with the given content
+func (c *Client) Delete(content []byte) error {
+ r := c.ResultForContent(content, nil)
+ return c.DeleteResource(r)
+}
+
+// DeleteFiles create the resource(s) from the given filenames (file, directory or STDIN) or HTTP URLs
+func (c *Client) DeleteFiles(filenames ...string) error {
+ r := c.ResultForFilenameParam(filenames, nil)
+ return c.DeleteResource(r)
+}
+
+// DeleteResource applies the given resource. Create the resources with `ResultForFilenameParam` or `ResultForContent`
+func (c *Client) DeleteResource(r *resource.Result) error {
+ if err := r.Err(); err != nil {
+ return err
+ }
+ return r.Visit(delete)
+}
+
+func delete(info *resource.Info, err error) error {
+ if err != nil {
+ return failedTo("delete", info, err)
+ }
+
+ // TODO: Background or Foreground?
+ // policy := metav1.DeletePropagationForeground
+ policy := metav1.DeletePropagationBackground
+ options := metav1.DeleteOptions{
+ PropagationPolicy: &policy,
+ }
+
+ if _, err := deleteWithOptions(info, &options); err != nil {
+ return failedTo("delete", info, err)
+ }
+ return nil
+}
+
+func defaultDeleteOptions() *metav1.DeleteOptions {
+ // TODO: Change DryRun value when DryRun is implemented
+ dryRun := false
+
+ options := &metav1.DeleteOptions{}
+ if gracePeriod >= 0 {
+ options = metav1.NewDeleteOptions(int64(gracePeriod))
+ }
+
+ if dryRun {
+ options.DryRun = []string{metav1.DryRunAll}
+ }
+
+ // TODO: Background or Foreground?
+ // policy := metav1.DeletePropagationBackground
+ policy := metav1.DeletePropagationForeground
+ if !cascade {
+ policy = metav1.DeletePropagationOrphan
+ }
+ options.PropagationPolicy = &policy
+
+ return options
+}
+
+func deleteWithOptions(info *resource.Info, options *metav1.DeleteOptions) (runtime.Object, error) {
+ if options == nil {
+ options = defaultDeleteOptions()
+ }
+ return resource.NewHelper(info.Client, info.Mapping).DeleteWithOptions(info.Namespace, info.Name, options)
+}
diff --git a/src/rsync/pkg/client/factory.go b/src/rsync/pkg/client/factory.go
new file mode 100644
index 00000000..39c1a177
--- /dev/null
+++ b/src/rsync/pkg/client/factory.go
@@ -0,0 +1,291 @@
+/*
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+ http://www.apache.org/licenses/LICENSE-2.0
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+// Based on Code: https://github.com/johandry/klient
+package client
+
+import (
+ "path/filepath"
+ "regexp"
+ "strings"
+ "sync"
+ "time"
+
+ corev1 "k8s.io/api/core/v1"
+ apiextv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
+ apiextv1beta1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1beta1"
+ "k8s.io/apimachinery/pkg/api/meta"
+ "k8s.io/apimachinery/pkg/runtime/schema"
+ "k8s.io/cli-runtime/pkg/genericclioptions"
+ "k8s.io/cli-runtime/pkg/resource"
+ "k8s.io/client-go/discovery"
+ diskcached "k8s.io/client-go/discovery/cached/disk"
+ "k8s.io/client-go/dynamic"
+ "k8s.io/client-go/kubernetes"
+ "k8s.io/client-go/kubernetes/scheme"
+ "k8s.io/client-go/rest"
+ "k8s.io/client-go/restmapper"
+ "k8s.io/client-go/tools/clientcmd"
+ "k8s.io/client-go/util/homedir"
+ "k8s.io/kubectl/pkg/util/openapi"
+ openapivalidation "k8s.io/kubectl/pkg/util/openapi/validation"
+ "k8s.io/kubectl/pkg/validation"
+)
+
+// factory implements the kubectl Factory interface which also requieres to
+// implement the genericclioptions.RESTClientGetter interface.
+// The Factory inerface provides abstractions that allow the Kubectl command to
+// be extended across multiple types of resources and different API sets.
+type factory struct {
+ KubeConfig string
+ Context string
+ initOpenAPIGetterOnce sync.Once
+ openAPIGetter openapi.Getter
+}
+
+// If multiple clients are created, this sync.once make sure the CRDs are added
+// only once into the API extensions v1 and v1beta schemes
+var addToSchemeOnce sync.Once
+
+var _ genericclioptions.RESTClientGetter = &factory{}
+
+// newFactory creates a new client factory which encapsulate a REST client getter
+func newFactory(context, kubeconfig string) *factory {
+ factory := &factory{
+ KubeConfig: kubeconfig,
+ Context: context,
+ }
+
+ // From: helm/pkg/kube/client.go > func New()
+ // Add CRDs to the scheme. They are missing by default.
+ addToSchemeOnce.Do(func() {
+ if err := apiextv1.AddToScheme(scheme.Scheme); err != nil {
+ panic(err)
+ }
+ if err := apiextv1beta1.AddToScheme(scheme.Scheme); err != nil {
+ panic(err)
+ }
+ })
+
+ return factory
+}
+
+// BuildRESTConfig builds a kubernetes REST client factory using the following
+// rules from ToRawKubeConfigLoader()
+// func BuildRESTConfig(context, kubeconfig string) (*rest.Config, error) {
+// return newFactory(context, kubeconfig).ToRESTConfig()
+// }
+
+// ToRESTConfig creates a kubernetes REST client factory.
+// It's required to implement the interface genericclioptions.RESTClientGetter
+func (f *factory) ToRESTConfig() (*rest.Config, error) {
+ // From: k8s.io/kubectl/pkg/cmd/util/kubectl_match_version.go > func setKubernetesDefaults()
+ config, err := f.ToRawKubeConfigLoader().ClientConfig()
+ if err != nil {
+ return nil, err
+ }
+
+ if config.GroupVersion == nil {
+ config.GroupVersion = &schema.GroupVersion{Group: "", Version: "v1"}
+ }
+ if config.APIPath == "" {
+ config.APIPath = "/api"
+ }
+ if config.NegotiatedSerializer == nil {
+ // This codec config ensures the resources are not converted. Therefore, resources
+ // will not be round-tripped through internal versions. Defaulting does not happen
+ // on the client.
+ config.NegotiatedSerializer = scheme.Codecs.WithoutConversion()
+ }
+
+ rest.SetKubernetesDefaults(config)
+ return config, nil
+}
+
+// ToRawKubeConfigLoader creates a client factory using the following rules:
+// 1. builds from the given kubeconfig path, if not empty
+// 2. use the in cluster factory if running in-cluster
+// 3. gets the factory from KUBECONFIG env var
+// 4. Uses $HOME/.kube/factory
+// It's required to implement the interface genericclioptions.RESTClientGetter
+func (f *factory) ToRawKubeConfigLoader() clientcmd.ClientConfig {
+ loadingRules := clientcmd.NewDefaultClientConfigLoadingRules()
+ loadingRules.DefaultClientConfig = &clientcmd.DefaultClientConfig
+ if len(f.KubeConfig) != 0 {
+ loadingRules.ExplicitPath = f.KubeConfig
+ }
+ configOverrides := &clientcmd.ConfigOverrides{
+ ClusterDefaults: clientcmd.ClusterDefaults,
+ }
+ if len(f.Context) != 0 {
+ configOverrides.CurrentContext = f.Context
+ }
+
+ return clientcmd.NewNonInteractiveDeferredLoadingClientConfig(loadingRules, configOverrides)
+}
+
+// overlyCautiousIllegalFileCharacters matches characters that *might* not be supported. Windows is really restrictive, so this is really restrictive
+var overlyCautiousIllegalFileCharacters = regexp.MustCompile(`[^(\w/\.)]`)
+
+// ToDiscoveryClient returns a CachedDiscoveryInterface using a computed RESTConfig
+// It's required to implement the interface genericclioptions.RESTClientGetter
+func (f *factory) ToDiscoveryClient() (discovery.CachedDiscoveryInterface, error) {
+ // From: k8s.io/cli-runtime/pkg/genericclioptions/config_flags.go > func (*configFlags) ToDiscoveryClient()
+ factory, err := f.ToRESTConfig()
+ if err != nil {
+ return nil, err
+ }
+ factory.Burst = 100
+ defaultHTTPCacheDir := filepath.Join(homedir.HomeDir(), ".kube", "http-cache")
+
+ // takes the parentDir and the host and comes up with a "usually non-colliding" name for the discoveryCacheDir
+ parentDir := filepath.Join(homedir.HomeDir(), ".kube", "cache", "discovery")
+ // strip the optional scheme from host if its there:
+ schemelessHost := strings.Replace(strings.Replace(factory.Host, "https://", "", 1), "http://", "", 1)
+ // now do a simple collapse of non-AZ09 characters. Collisions are possible but unlikely. Even if we do collide the problem is short lived
+ safeHost := overlyCautiousIllegalFileCharacters.ReplaceAllString(schemelessHost, "_")
+ discoveryCacheDir := filepath.Join(parentDir, safeHost)
+
+ return diskcached.NewCachedDiscoveryClientForConfig(factory, discoveryCacheDir, defaultHTTPCacheDir, time.Duration(10*time.Minute))
+}
+
+// ToRESTMapper returns a mapper
+// It's required to implement the interface genericclioptions.RESTClientGetter
+func (f *factory) ToRESTMapper() (meta.RESTMapper, error) {
+ // From: k8s.io/cli-runtime/pkg/genericclioptions/config_flags.go > func (*configFlags) ToRESTMapper()
+ discoveryClient, err := f.ToDiscoveryClient()
+ if err != nil {
+ return nil, err
+ }
+
+ mapper := restmapper.NewDeferredDiscoveryRESTMapper(discoveryClient)
+ expander := restmapper.NewShortcutExpander(mapper, discoveryClient)
+ return expander, nil
+}
+
+// KubernetesClientSet creates a kubernetes clientset from the configuration
+// It's required to implement the Factory interface
+func (f *factory) KubernetesClientSet() (*kubernetes.Clientset, error) {
+ // From: k8s.io/kubectl/pkg/cmd/util/factory_client_access.go > func (*factoryImpl) KubernetesClientSet()
+ factory, err := f.ToRESTConfig()
+ if err != nil {
+ return nil, err
+ }
+ return kubernetes.NewForConfig(factory)
+}
+
+// DynamicClient creates a dynamic client from the configuration
+// It's required to implement the Factory interface
+func (f *factory) DynamicClient() (dynamic.Interface, error) {
+ // From: k8s.io/kubectl/pkg/cmd/util/factory_client_access.go > func (*factoryImpl) DynamicClient()
+ factory, err := f.ToRESTConfig()
+ if err != nil {
+ return nil, err
+ }
+ return dynamic.NewForConfig(factory)
+}
+
+// NewBuilder returns a new resource builder for structured api objects.
+// It's required to implement the Factory interface
+func (f *factory) NewBuilder() *resource.Builder {
+ // From: k8s.io/kubectl/pkg/cmd/util/factory_client_access.go > func (*factoryImpl) NewBuilder()
+ return resource.NewBuilder(f)
+}
+
+// RESTClient creates a REST client from the configuration
+// It's required to implement the Factory interface
+func (f *factory) RESTClient() (*rest.RESTClient, error) {
+ // From: k8s.io/kubectl/pkg/cmd/util/factory_client_access.go > func (*factoryImpl) RESTClient()
+ factory, err := f.ToRESTConfig()
+ if err != nil {
+ return nil, err
+ }
+ return rest.RESTClientFor(factory)
+}
+
+func (f *factory) configForMapping(mapping *meta.RESTMapping) (*rest.Config, error) {
+ factory, err := f.ToRESTConfig()
+ if err != nil {
+ return nil, err
+ }
+
+ gvk := mapping.GroupVersionKind
+ factory.APIPath = "/apis"
+ if gvk.Group == corev1.GroupName {
+ factory.APIPath = "/api"
+ }
+ gv := gvk.GroupVersion()
+ factory.GroupVersion = &gv
+
+ return factory, nil
+}
+
+// ClientForMapping creates a resource REST client from the given mappings
+// It's required to implement the Factory interface
+func (f *factory) ClientForMapping(mapping *meta.RESTMapping) (resource.RESTClient, error) {
+ // From: k8s.io/kubectl/pkg/cmd/util/factory_client_access.go > func (*factoryImpl) ClientForMapping()
+ factory, err := f.configForMapping(mapping)
+ if err != nil {
+ return nil, err
+ }
+
+ return rest.RESTClientFor(factory)
+}
+
+// UnstructuredClientForMapping creates a unstructured resource REST client from the given mappings
+// It's required to implement the Factory interface
+func (f *factory) UnstructuredClientForMapping(mapping *meta.RESTMapping) (resource.RESTClient, error) {
+ // From: k8s.io/kubectl/pkg/cmd/util/factory_client_access.go > func (*factoryImpl) UnstructuredClientForMapping()
+ factory, err := f.configForMapping(mapping)
+ if err != nil {
+ return nil, err
+ }
+ factory.ContentConfig = resource.UnstructuredPlusDefaultContentConfig()
+
+ return rest.RESTClientFor(factory)
+}
+
+// Validator returns a schema that can validate objects stored on disk.
+// It's required to implement the Factory interface
+func (f *factory) Validator(validate bool) (validation.Schema, error) {
+ // From: k8s.io/kubectl/pkg/cmd/util/factory_client_access.go > func (*factoryImpl) Validator(bool)
+ if !validate {
+ return validation.NullSchema{}, nil
+ }
+
+ resources, err := f.OpenAPISchema()
+ if err != nil {
+ return nil, err
+ }
+
+ return validation.ConjunctiveSchema{
+ openapivalidation.NewSchemaValidation(resources),
+ validation.NoDoubleKeySchema{},
+ }, nil
+}
+
+// OpenAPISchema returns metadata and structural information about Kubernetes object definitions.
+// It's required to implement the Factory interface
+func (f *factory) OpenAPISchema() (openapi.Resources, error) {
+ // From: k8s.io/kubectl/pkg/cmd/util/factory_client-access.go > func (*factoryImpl) OpenAPISchema()
+ discovery, err := f.ToDiscoveryClient()
+ if err != nil {
+ return nil, err
+ }
+
+ f.initOpenAPIGetterOnce.Do(func() {
+ // Create the caching OpenAPIGetter
+ f.openAPIGetter = openapi.NewOpenAPIGetter(discovery)
+ })
+
+ // Delegate to the OpenAPIGetter
+ return f.openAPIGetter.Get()
+}
diff --git a/src/rsync/pkg/client/helpers.go b/src/rsync/pkg/client/helpers.go
new file mode 100644
index 00000000..de6ed93f
--- /dev/null
+++ b/src/rsync/pkg/client/helpers.go
@@ -0,0 +1,75 @@
+/*
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+ http://www.apache.org/licenses/LICENSE-2.0
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+// Based on Code: https://github.com/johandry/klient
+package client
+
+import (
+ v1 "k8s.io/api/core/v1"
+ metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
+)
+
+// CreateNamespace creates a namespace with the given name
+func (c *Client) CreateNamespace(namespace string) error {
+ ns := &v1.Namespace{
+ ObjectMeta: metav1.ObjectMeta{
+ Name: namespace,
+ Labels: map[string]string{
+ "name": namespace,
+ },
+ },
+ }
+ _, err := c.Clientset.CoreV1().Namespaces().Create(ns)
+ // if errors.IsAlreadyExists(err) {
+ // // If it failed because the NS is already there, then do not return such error
+ // return nil
+ // }
+
+ return err
+}
+
+// DeleteNamespace deletes the namespace with the given name
+func (c *Client) DeleteNamespace(namespace string) error {
+ return c.Clientset.CoreV1().Namespaces().Delete(namespace, &metav1.DeleteOptions{})
+}
+
+// NodesReady returns the number of nodes ready
+func (c *Client) NodesReady() (ready int, total int, err error) {
+ nodes, err := c.Clientset.CoreV1().Nodes().List(metav1.ListOptions{})
+ if err != nil {
+ return 0, 0, err
+ }
+ total = len(nodes.Items)
+ if total == 0 {
+ return 0, 0, nil
+ }
+ for _, n := range nodes.Items {
+ for _, c := range n.Status.Conditions {
+ if c.Type == "Ready" && c.Status == "True" {
+ ready++
+ break
+ }
+ }
+ }
+
+ return ready, len(nodes.Items), nil
+}
+
+// Version returns the cluster version. It can be used to verify if the cluster
+// is reachable. It will return an error if failed to connect.
+func (c *Client) Version() (string, error) {
+ v, err := c.Clientset.ServerVersion()
+ if err != nil {
+ return "", err
+ }
+
+ return v.String(), nil
+}
diff --git a/src/rsync/pkg/client/patch.go b/src/rsync/pkg/client/patch.go
new file mode 100644
index 00000000..3a620f49
--- /dev/null
+++ b/src/rsync/pkg/client/patch.go
@@ -0,0 +1,216 @@
+/*
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+ http://www.apache.org/licenses/LICENSE-2.0
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+// Based on Code: https://github.com/johandry/klient
+package client
+
+import (
+ "fmt"
+ "os"
+ "time"
+
+ "github.com/jonboulle/clockwork"
+ corev1 "k8s.io/api/core/v1"
+ "k8s.io/apimachinery/pkg/api/errors"
+ "k8s.io/apimachinery/pkg/api/meta"
+ metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
+ "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
+ "k8s.io/apimachinery/pkg/runtime"
+ "k8s.io/apimachinery/pkg/types"
+ "k8s.io/apimachinery/pkg/util/jsonmergepatch"
+ "k8s.io/apimachinery/pkg/util/mergepatch"
+ "k8s.io/apimachinery/pkg/util/strategicpatch"
+ "k8s.io/apimachinery/pkg/util/wait"
+ "k8s.io/cli-runtime/pkg/resource"
+ oapi "k8s.io/kube-openapi/pkg/util/proto"
+ "k8s.io/kubectl/pkg/scheme"
+ "k8s.io/kubectl/pkg/util"
+ "k8s.io/kubectl/pkg/util/openapi"
+)
+
+const (
+ // overwrite if true, automatically resolve conflicts between the modified and live configuration by using values from the modified configuration
+ overwrite = true
+ // maxPatchRetry is the maximum number of conflicts retry for during a patch operation before returning failure
+ maxPatchRetry = 5
+ // backOffPeriod is the period to back off when apply patch results in error.
+ backOffPeriod = 1 * time.Second
+ // how many times we can retry before back off
+ triesBeforeBackOff = 1
+ // force if true, immediately remove resources from API and bypass graceful deletion. Note that immediate deletion of some resources may result in inconsistency or data loss and requires confirmation.
+ force = false
+ // timeout waiting for the resource to be delete if it needs to be recreated
+ timeout = 0
+)
+
+// patch tries to patch an OpenAPI resource
+func patch(info *resource.Info, current runtime.Object) error {
+ // From: k8s.io/kubectl/pkg/cmd/apply/apply.go & patcher.go
+ modified, err := util.GetModifiedConfiguration(info.Object, true, unstructured.UnstructuredJSONScheme)
+ if err != nil {
+ return fmt.Errorf("retrieving modified configuration. %s", err)
+ }
+
+ metadata, _ := meta.Accessor(current)
+ annotationMap := metadata.GetAnnotations()
+ if _, ok := annotationMap[corev1.LastAppliedConfigAnnotation]; !ok {
+ // TODO: Find what to do with the warnings, they should not be printed
+ fmt.Fprintf(os.Stderr, "Warning: apply should be used on resource created by apply")
+ }
+
+ patchBytes, patchObject, err := patchSimple(current, modified, info)
+
+ var getErr error
+ for i := 1; i <= maxPatchRetry && errors.IsConflict(err); i++ {
+ if i > triesBeforeBackOff {
+ clockwork.NewRealClock().Sleep(backOffPeriod)
+ }
+ current, getErr = resource.NewHelper(info.Client, info.Mapping).Get(info.Namespace, info.Name, false)
+ if getErr != nil {
+ return getErr
+ }
+ patchBytes, patchObject, err = patchSimple(current, modified, info)
+ }
+ if err != nil && (errors.IsConflict(err) || errors.IsInvalid(err)) && force {
+ patchBytes, patchObject, err = deleteAndCreate(info, patchBytes)
+ }
+
+ info.Refresh(patchObject, true)
+
+ return nil
+}
+
+func patchSimple(currentObj runtime.Object, modified []byte, info *resource.Info) ([]byte, runtime.Object, error) {
+ // Serialize the current configuration of the object from the server.
+ current, err := runtime.Encode(unstructured.UnstructuredJSONScheme, currentObj)
+ if err != nil {
+ return nil, nil, fmt.Errorf("serializing current configuration. %s", err)
+ }
+
+ // Retrieve the original configuration of the object from the annotation.
+ original, err := util.GetOriginalConfiguration(currentObj)
+ if err != nil {
+ return nil, nil, fmt.Errorf("retrieving original configuration. %s", err)
+ }
+
+ var patchType types.PatchType
+ var patch []byte
+ var lookupPatchMeta strategicpatch.LookupPatchMeta
+ var schema oapi.Schema
+
+ // Create the versioned struct from the type defined in the restmapping
+ // (which is the API version we'll be submitting the patch to)
+ versionedObject, err := scheme.Scheme.New(info.Mapping.GroupVersionKind)
+
+ // DEBUG:
+ // fmt.Printf("Modified: %v\n", string(modified))
+ // fmt.Printf("Current: %v\n", string(current))
+ // fmt.Printf("Original: %v\n", string(original))
+ // fmt.Printf("versionedObj: %v\n", versionedObject)
+ // fmt.Printf("Error: %+v\nIsNotRegisteredError: %t\n", err, runtime.IsNotRegisteredError(err))
+
+ switch {
+ case runtime.IsNotRegisteredError(err):
+ // fall back to generic JSON merge patch
+ patchType = types.MergePatchType
+ preconditions := []mergepatch.PreconditionFunc{mergepatch.RequireKeyUnchanged("apiVersion"),
+ mergepatch.RequireKeyUnchanged("kind"), mergepatch.RequireMetadataKeyUnchanged("name")}
+ patch, err = jsonmergepatch.CreateThreeWayJSONMergePatch(original, modified, current, preconditions...)
+ if err != nil {
+ if mergepatch.IsPreconditionFailed(err) {
+ return nil, nil, fmt.Errorf("At least one of apiVersion, kind and name was changed")
+ }
+ return nil, nil, fmt.Errorf("creating patch. %s", err)
+ }
+ case err != nil:
+ return nil, nil, fmt.Errorf("getting instance of versioned object. %s", err)
+ case err == nil:
+ // Compute a three way strategic merge patch to send to server.
+ patchType = types.StrategicMergePatchType
+
+ // Try to use openapi first if the openapi spec is available and can successfully calculate the patch.
+ // Otherwise, fall back to baked-in types.
+ var openapiSchema openapi.Resources
+ if openapiSchema != nil {
+ if schema = openapiSchema.LookupResource(info.Mapping.GroupVersionKind); schema != nil {
+ lookupPatchMeta = strategicpatch.PatchMetaFromOpenAPI{Schema: schema}
+ if openapiPatch, err := strategicpatch.CreateThreeWayMergePatch(original, modified, current, lookupPatchMeta, overwrite); err == nil {
+ patchType = types.StrategicMergePatchType
+ patch = openapiPatch
+ // TODO: In case it's necessary to report warnings
+ // } else {
+ // log.Printf("Warning: error calculating patch from openapi spec: %s", err)
+ }
+ }
+ }
+
+ if patch == nil {
+ lookupPatchMeta, err = strategicpatch.NewPatchMetaFromStruct(versionedObject)
+ if err != nil {
+ return nil, nil, fmt.Errorf("creating patch. %s", err)
+ }
+ patch, err = strategicpatch.CreateThreeWayMergePatch(original, modified, current, lookupPatchMeta, overwrite)
+ if err != nil {
+ return nil, nil, fmt.Errorf("creating patch. %s", err)
+ }
+ }
+ }
+
+ if string(patch) == "{}" {
+ return patch, currentObj, nil
+ }
+
+ patchedObj, err := resource.NewHelper(info.Client, info.Mapping).Patch(info.Namespace, info.Name, patchType, patch, nil)
+ return patch, patchedObj, err
+}
+
+func deleteAndCreate(info *resource.Info, modified []byte) ([]byte, runtime.Object, error) {
+ delOptions := defaultDeleteOptions()
+ if _, err := deleteWithOptions(info, delOptions); err != nil {
+ return nil, nil, err
+ }
+
+ helper := resource.NewHelper(info.Client, info.Mapping)
+
+ // TODO: make a waiter and use it
+ if err := wait.PollImmediate(1*time.Second, time.Duration(timeout), func() (bool, error) {
+ if _, err := helper.Get(info.Namespace, info.Name, false); !errors.IsNotFound(err) {
+ return false, err
+ }
+ return true, nil
+ }); err != nil {
+ return nil, nil, err
+ }
+
+ // TODO: Check what GetModifiedConfiguration does, this could be an encode - decode waste of time
+ // modified, err := util.GetModifiedConfiguration(info.Object, true, unstructured.UnstructuredJSONScheme)
+ // if err != nil {
+ // return nil, nil, fmt.Errorf("retrieving modified configuration. %s", err)
+ // }
+ versionedObject, _, err := unstructured.UnstructuredJSONScheme.Decode(modified, nil, nil)
+ if err != nil {
+ return nil, nil, err
+ }
+
+ options := metav1.CreateOptions{}
+ createdObject, err := helper.Create(info.Namespace, true, versionedObject, &options)
+ if err != nil {
+ // restore the original object if we fail to create the new one
+ // but still propagate and advertise error to user
+ recreated, recreateErr := helper.Create(info.Namespace, true, info.Object, &options)
+ if recreateErr != nil {
+ err = fmt.Errorf("An error occurred force-replacing the existing object with the newly provided one. %v.\n\nAdditionally, an error occurred attempting to restore the original object: %v", err, recreateErr)
+ } else {
+ createdObject = recreated
+ }
+ }
+ return modified, createdObject, err
+}
diff --git a/src/rsync/pkg/client/replace.go b/src/rsync/pkg/client/replace.go
new file mode 100644
index 00000000..9aba8aca
--- /dev/null
+++ b/src/rsync/pkg/client/replace.go
@@ -0,0 +1,51 @@
+/*
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+ http://www.apache.org/licenses/LICENSE-2.0
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+// Based on Code: https://github.com/johandry/klient
+package client
+
+import (
+ "k8s.io/cli-runtime/pkg/resource"
+)
+
+// Replace creates a resource with the given content
+func (c *Client) Replace(content []byte) error {
+ r := c.ResultForContent(content, nil)
+ return c.ReplaceResource(r)
+}
+
+// ReplaceFiles create the resource(s) from the given filenames (file, directory or STDIN) or HTTP URLs
+func (c *Client) ReplaceFiles(filenames ...string) error {
+ r := c.ResultForFilenameParam(filenames, nil)
+ return c.ReplaceResource(r)
+}
+
+// ReplaceResource applies the given resource. Create the resources with `ResultForFilenameParam` or `ResultForContent`
+func (c *Client) ReplaceResource(r *resource.Result) error {
+ if err := r.Err(); err != nil {
+ return err
+ }
+ return r.Visit(replace)
+}
+
+func replace(info *resource.Info, err error) error {
+ if err != nil {
+ return failedTo("replace", info, err)
+ }
+
+ obj, err := resource.NewHelper(info.Client, info.Mapping).Replace(info.Namespace, info.Name, true, info.Object)
+ if err != nil {
+ return failedTo("replace", info, err)
+ }
+ info.Refresh(obj, true)
+
+ return nil
+}
diff --git a/src/rsync/pkg/connector/connector.go b/src/rsync/pkg/connector/connector.go
index fc8aa839..2d15d7ec 100644
--- a/src/rsync/pkg/connector/connector.go
+++ b/src/rsync/pkg/connector/connector.go
@@ -1,94 +1,122 @@
/*
- * Copyright 2019 Intel Corporation, Inc
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
+Copyright 2020 Intel Corporation.
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+ http://www.apache.org/licenses/LICENSE-2.0
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
package connector
import (
+ "encoding/base64"
+ "fmt"
"log"
+ "os"
+ "strings"
+ "sync"
- corev1 "k8s.io/api/core/v1"
- "k8s.io/apimachinery/pkg/api/meta"
- "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
- "k8s.io/apimachinery/pkg/runtime"
- "k8s.io/client-go/dynamic"
- "k8s.io/client-go/kubernetes"
+ "github.com/onap/multicloud-k8s/src/clm/pkg/cluster"
+ kubeclient "github.com/onap/multicloud-k8s/src/rsync/pkg/client"
+ pkgerrors "github.com/pkg/errors"
)
-// KubernetesConnector is an interface that is expected to be implemented
-// by any code that calls the plugin framework functions.
-// It implements methods that are needed by the plugins to get Kubernetes
-// clients and other information needed to interface with Kubernetes
-type KubernetesConnector interface {
- //GetMapper returns the RESTMapper that was created for this client
- GetMapper() meta.RESTMapper
-
- //GetDynamicClient returns the dynamic client that is needed for
- //unstructured REST calls to the apiserver
- GetDynamicClient() dynamic.Interface
+type Connector struct {
+ cid string
+ Clients map[string]*kubeclient.Client
+ sync.Mutex
+}
- // GetStandardClient returns the standard client that can be used to handle
- // standard kubernetes kinds
- GetStandardClient() kubernetes.Interface
+const basePath string = "/tmp/rsync/"
- //GetInstanceID returns the InstanceID for tracking during creation
- GetInstanceID() string
+// Init connector for an app context
+func Init(id interface{}) *Connector {
+ c := make(map[string]*kubeclient.Client)
+ str := fmt.Sprintf("%v", id)
+ return &Connector{
+ Clients: c,
+ cid: str,
+ }
}
-// Reference is the interface that is implemented
-type Reference interface {
- //Create a kubernetes resource described by the yaml in yamlFilePath
- Create(yamlFilePath string, namespace string, label string, client KubernetesConnector) (string, error)
- //Delete a kubernetes resource described in the provided namespace
- Delete(yamlFilePath string, resname string, namespace string, client KubernetesConnector) error
+// getKubeConfig uses the connectivity client to get the kubeconfig based on the name
+// of the clustername.
+func getKubeConfig(clustername string) ([]byte, error) {
+ if !strings.Contains(clustername, "+") {
+ return nil, pkgerrors.New("Not a valid cluster name")
+ }
+ strs := strings.Split(clustername, "+")
+ if len(strs) != 2 {
+ return nil, pkgerrors.New("Not a valid cluster name")
+ }
+ kubeConfig, err := cluster.NewClusterClient().GetClusterContent(strs[0], strs[1])
+ if err != nil {
+ return nil, pkgerrors.New("Get kubeconfig failed")
+ }
+ dec, err := base64.StdEncoding.DecodeString(kubeConfig.Kubeconfig)
+ if err != nil {
+ return nil, err
+ }
+ return dec, nil
}
-// TagPodsIfPresent finds the PodTemplateSpec from any workload
-// object that contains it and changes the spec to include the tag label
-func TagPodsIfPresent(unstruct *unstructured.Unstructured, tag string) {
+// GetClient returns client for the cluster
+func (c *Connector) GetClient(cluster string) (*kubeclient.Client, error) {
+ c.Lock()
+ defer c.Unlock()
- spec, ok := unstruct.Object["spec"].(map[string]interface{})
+ client, ok := c.Clients[cluster]
if !ok {
- log.Println("Error converting spec to map")
- return
- }
-
- template, ok := spec["template"].(map[string]interface{})
- if !ok {
- log.Println("Error converting template to map")
- return
+ // Get file from DB
+ dec, err := getKubeConfig(cluster)
+ if err != nil {
+ return nil, err
+ }
+ var kubeConfigPath string = basePath + c.cid + "/" + cluster + "/"
+ if _, err := os.Stat(kubeConfigPath); os.IsNotExist(err) {
+ err = os.MkdirAll(kubeConfigPath, 0755)
+ if err != nil {
+ return nil, err
+ }
+ }
+ kubeConfig := kubeConfigPath + "config"
+ f, err := os.Create(kubeConfig)
+ if err != nil {
+ return nil, err
+ }
+ _, err = f.Write(dec)
+ if err != nil {
+ return nil, err
+ }
+ client = kubeclient.New("", kubeConfig, "default")
+ if client != nil {
+ c.Clients[cluster] = client
+ }
}
+ return client, nil
+}
- //Attempt to convert the template to a podtemplatespec.
- //This is to check if we have any pods being created.
- podTemplateSpec := &corev1.PodTemplateSpec{}
- err := runtime.DefaultUnstructuredConverter.FromUnstructured(template, podTemplateSpec)
+func (c *Connector) GetClientWithRetry(cluster string) (*kubeclient.Client, error) {
+ client, err := c.GetClient(cluster)
if err != nil {
- log.Println("Did not find a podTemplateSpec: " + err.Error())
- return
+ return nil, err
}
-
- labels := podTemplateSpec.GetLabels()
- if labels == nil {
- labels = map[string]string{}
+ if err = client.IsReachable(); err != nil {
+ return nil, err // TODO: Add retry
}
- labels["emco/deployment-id"] = tag
- podTemplateSpec.SetLabels(labels)
-
- updatedTemplate, err := runtime.DefaultUnstructuredConverter.ToUnstructured(podTemplateSpec)
+ return client, nil
+}
- //Set the label
- spec["template"] = updatedTemplate
+func (c *Connector) RemoveClient() {
+ c.Lock()
+ defer c.Unlock()
+ err := os.RemoveAll(basePath + "/" + c.cid)
+ if err != nil {
+ log.Printf("Warning: Deleting kubepath %s", err)
+ }
}
diff --git a/src/rsync/pkg/context/context.go b/src/rsync/pkg/context/context.go
index 7e0fce3c..3ce6ee9b 100644
--- a/src/rsync/pkg/context/context.go
+++ b/src/rsync/pkg/context/context.go
@@ -17,503 +17,231 @@
package context
import (
+ "context"
"encoding/json"
"fmt"
"log"
"strings"
- "sync"
"github.com/onap/multicloud-k8s/src/orchestrator/pkg/appcontext"
- "github.com/onap/multicloud-k8s/src/rsync/pkg/app"
- con "github.com/onap/multicloud-k8s/src/rsync/pkg/connector"
- res "github.com/onap/multicloud-k8s/src/rsync/pkg/resource"
+ "github.com/onap/multicloud-k8s/src/orchestrator/pkg/infra/logutils"
+ kubeclient "github.com/onap/multicloud-k8s/src/rsync/pkg/client"
+ connector "github.com/onap/multicloud-k8s/src/rsync/pkg/connector"
+ utils "github.com/onap/multicloud-k8s/src/rsync/pkg/internal"
status "github.com/onap/multicloud-k8s/src/rsync/pkg/status"
pkgerrors "github.com/pkg/errors"
+ "golang.org/x/sync/errgroup"
+ "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
)
type CompositeAppContext struct {
- cid interface{}
- appsorder string
- appsdependency string
- appsmap []instMap
-}
-type clusterInfo struct {
- name string
- resorder string
- resdependency string
- ressmap []instMap
-}
-type instMap struct {
- name string
- depinfo string
- status string
- rerr error
- clusters []clusterInfo
-}
-
-func getInstMap(order string, dependency string, level string) ([]instMap, error) {
-
- if order == "" {
- return nil, pkgerrors.Errorf("Not a valid order value")
- }
- if dependency == "" {
- return nil, pkgerrors.Errorf("Not a valid dependency value")
- }
-
- if !(level == "app" || level == "res") {
- return nil, pkgerrors.Errorf("Not a valid level name given to create map")
- }
-
- var aov map[string]interface{}
- json.Unmarshal([]byte(order), &aov)
-
- s := fmt.Sprintf("%vorder", level)
- appso := aov[s].([]interface{})
- var instmap = make([]instMap, len(appso))
-
- var adv map[string]interface{}
- json.Unmarshal([]byte(dependency), &adv)
- s = fmt.Sprintf("%vdependency", level)
- appsd := adv[s].(map[string]interface{})
- for i, u := range appso {
- instmap[i] = instMap{u.(string), appsd[u.(string)].(string), "none", nil, nil}
- }
-
- return instmap, nil
+ cid interface{}
}
-func deleteResource(clustername string, resname string, respath string) error {
- k8sClient := app.KubernetesClient{}
- err := k8sClient.Init(clustername, resname)
+func getRes(ac appcontext.AppContext, name string, app string, cluster string) ([]byte, error) {
+ var byteRes []byte
+ rh, err := ac.GetResourceHandle(app, cluster, name)
if err != nil {
- log.Println("Init failed: " + err.Error())
- return err
+ return nil, err
}
-
- var c con.KubernetesConnector
- c = &k8sClient
- var gp res.Resource
- err = gp.Delete(respath, resname, "default", c)
- if err != nil {
- log.Println("Delete resource failed: " + err.Error() + resname)
- return err
- }
- log.Println("Resource succesfully deleted", resname)
- return nil
-
-}
-
-func createResource(clustername string, resname string, respath string, label string) error {
- k8sClient := app.KubernetesClient{}
- err := k8sClient.Init(clustername, resname)
- if err != nil {
- log.Println("Client init failed: " + err.Error())
- return err
- }
-
- var c con.KubernetesConnector
- c = &k8sClient
- var gp res.Resource
- _, err = gp.Create(respath, "default", label, c)
- if err != nil {
- log.Println("Create failed: " + err.Error() + resname)
- return err
- }
- log.Println("Resource succesfully created", resname)
- return nil
-
-}
-
-func terminateResource(ac appcontext.AppContext, resmap instMap, appname string, clustername string) error {
-
- rh, err := ac.GetResourceHandle(appname, clustername, resmap.name)
- if err != nil {
- return err
- }
-
resval, err := ac.GetValue(rh)
if err != nil {
- return err
+ return nil, err
}
-
if resval != "" {
- result := strings.Split(resmap.name, "+")
+ result := strings.Split(name, "+")
if result[0] == "" {
- return pkgerrors.Errorf("Resource name is nil")
- }
- err = deleteResource(clustername, result[0], resval.(string))
- if err != nil {
- return err
+ return nil, pkgerrors.Errorf("Resource name is nil %s:", name)
}
+ byteRes = []byte(fmt.Sprintf("%v", resval.(interface{})))
} else {
- return pkgerrors.Errorf("Resource value is nil")
+ return nil, pkgerrors.Errorf("Resource value is nil %s", name)
}
-
- return nil
-
+ return byteRes, nil
}
-func instantiateResource(ac appcontext.AppContext, resmap instMap, appname string, clustername string, label string) error {
- rh, err := ac.GetResourceHandle(appname, clustername, resmap.name)
+func terminateResource(ac appcontext.AppContext, c *kubeclient.Client, name string, app string, cluster string, label string) error {
+ res, err := getRes(ac, name, app, cluster)
if err != nil {
return err
}
-
- resval, err := ac.GetValue(rh)
- if err != nil {
+ if err := c.Delete(res); err != nil {
+ logutils.Error("Failed to delete res", logutils.Fields{
+ "error": err,
+ "resource": name,
+ })
return err
}
-
- if resval != "" {
- result := strings.Split(resmap.name, "+")
- if result[0] == "" {
- return pkgerrors.Errorf("Resource name is nil")
- }
- err = createResource(clustername, result[0], resval.(string), label)
- if err != nil {
- return err
- }
- } else {
- return pkgerrors.Errorf("Resource value is nil")
- }
-
+ logutils.Info("Deleted::", logutils.Fields{
+ "cluster": cluster,
+ "resource": res,
+ })
return nil
-
}
-func terminateResources(ac appcontext.AppContext, ressmap []instMap, appname string, clustername string) error {
- var wg sync.WaitGroup
- var chans = make([]chan int, len(ressmap))
- for l := range chans {
- chans[l] = make(chan int)
- }
- for i := 0; i < len(ressmap); i++ {
- wg.Add(1)
- go func(index int) {
- if ressmap[index].depinfo == "go" {
- ressmap[index].status = "start"
- } else {
- ressmap[index].status = "waiting"
- c := <-chans[index]
- if c != index {
- ressmap[index].status = "error"
- ressmap[index].rerr = pkgerrors.Errorf("channel does not match")
- wg.Done()
- return
- }
- ressmap[index].status = "start"
- }
- ressmap[index].rerr = terminateResource(ac, ressmap[index], appname, clustername)
- ressmap[index].status = "done"
- waitstr := fmt.Sprintf("wait on %v", ressmap[index].name)
- for j := 0; j < len(ressmap); j++ {
- if ressmap[j].depinfo == waitstr {
- chans[j] <- j
- }
- }
- wg.Done()
- }(i)
+func instantiateResource(ac appcontext.AppContext, c *kubeclient.Client, name string, app string, cluster string, label string) error {
+ res, err := getRes(ac, name, app, cluster)
+ if err != nil {
+ return err
}
- wg.Wait()
- for k := 0; k < len(ressmap); k++ {
- if ressmap[k].rerr != nil {
- return pkgerrors.Errorf("error during resources termination")
- }
+ //Decode the yaml to create a runtime.Object
+ unstruct := &unstructured.Unstructured{}
+ //Ignore the returned obj as we expect the data in unstruct
+ _, err = utils.DecodeYAMLData(string(res), unstruct)
+ if err != nil {
+ return pkgerrors.Wrap(err, "Decode deployment object error")
}
- return nil
-
-}
-
-func instantiateResources(ac appcontext.AppContext, ressmap []instMap, appname string, clustername string) error {
- var wg sync.WaitGroup
- var chans = make([]chan int, len(ressmap))
- cid, _ := ac.GetCompositeAppHandle()
- results := strings.Split(cid.(string), "/")
- label := results[2] + "-" + appname
-
- for l := range chans {
- chans[l] = make(chan int)
+ //Add the tracking label to all resources created here
+ labels := unstruct.GetLabels()
+ //Check if labels exist for this object
+ if labels == nil {
+ labels = map[string]string{}
}
- for i := 0; i < len(ressmap); i++ {
- wg.Add(1)
- go func(index int) {
- if ressmap[index].depinfo == "go" {
- ressmap[index].status = "start"
- } else {
- ressmap[index].status = "waiting"
- c := <-chans[index]
- if c != index {
- ressmap[index].status = "error"
- ressmap[index].rerr = pkgerrors.Errorf("channel does not match")
- wg.Done()
- return
- }
- ressmap[index].status = "start"
- }
- ressmap[index].rerr = instantiateResource(ac, ressmap[index], appname, clustername, label)
- ressmap[index].status = "done"
- waitstr := fmt.Sprintf("wait on %v", ressmap[index].name)
- for j := 0; j < len(ressmap); j++ {
- if ressmap[j].depinfo == waitstr {
- chans[j] <- j
- }
- }
- wg.Done()
- }(i)
- }
- wg.Wait()
- for k := 0; k < len(ressmap); k++ {
- if ressmap[k].rerr != nil {
- return pkgerrors.Errorf("error during resources instantiation")
- }
- }
- return nil
-
-}
-
-func terminateApp(ac appcontext.AppContext, appmap instMap) error {
+ //labels[config.GetConfiguration().KubernetesLabelName] = client.GetInstanceID()
+ labels["emco/deployment-id"] = label
+ unstruct.SetLabels(labels)
- for i := 0; i < len(appmap.clusters); i++ {
- err := terminateResources(ac, appmap.clusters[i].ressmap, appmap.name,
- appmap.clusters[i].name)
- if err != nil {
- return err
- }
+ // This checks if the resource we are creating has a podSpec in it
+ // Eg: Deployment, StatefulSet, Job etc..
+ // If a PodSpec is found, the label will be added to it too.
+ //connector.TagPodsIfPresent(unstruct, client.GetInstanceID())
+ utils.TagPodsIfPresent(unstruct, label)
+ b, err := unstruct.MarshalJSON()
+ if err != nil {
+ logutils.Error("Failed to MarshalJSON", logutils.Fields{
+ "error": err,
+ "resource": name,
+ })
+ return err
}
- log.Println("Termination of app done: " + appmap.name)
-
- return nil
-
-}
-
-func instantiateApp(ac appcontext.AppContext, appmap instMap) error {
-
- for i := 0; i < len(appmap.clusters); i++ {
- err := instantiateResources(ac, appmap.clusters[i].ressmap, appmap.name,
- appmap.clusters[i].name)
- if err != nil {
- return err
- }
- err = status.StartClusterWatcher(appmap.clusters[i].name)
- if err != nil {
- log.Printf("Error starting Cluster Watcher %v: %v\n", appmap.clusters[i], err)
- }
+ if err := c.Apply(b); err != nil {
+ logutils.Error("Failed to apply res", logutils.Fields{
+ "error": err,
+ "resource": name,
+ })
+ return err
}
- log.Println("Instantiation of app done: " + appmap.name)
+ logutils.Info("Installed::", logutils.Fields{
+ "cluster": cluster,
+ "resource": res,
+ })
return nil
-
}
-func instantiateApps(ac appcontext.AppContext, appsmap []instMap) error {
- var wg sync.WaitGroup
- var chans = make([]chan int, len(appsmap))
- for l := range chans {
- chans[l] = make(chan int)
- }
- for i := 0; i < len(appsmap); i++ {
- wg.Add(1)
- go func(index int) {
- if appsmap[index].depinfo == "go" {
- appsmap[index].status = "start"
- } else {
- appsmap[index].status = "waiting"
- c := <-chans[index]
- if c != index {
- appsmap[index].status = "error"
- appsmap[index].rerr = pkgerrors.Errorf("channel does not match")
- wg.Done()
- return
- }
- appsmap[index].status = "start"
- }
- appsmap[index].rerr = instantiateApp(ac, appsmap[index])
- appsmap[index].status = "done"
- waitstr := fmt.Sprintf("wait on %v", appsmap[index].name)
- for j := 0; j < len(appsmap); j++ {
- if appsmap[j].depinfo == waitstr {
- chans[j] <- j
- }
- }
- wg.Done()
- }(i)
- }
- wg.Wait()
- for k := 0; k < len(appsmap); k++ {
- if appsmap[k].rerr != nil {
- return pkgerrors.Errorf("error during apps instantiation")
- }
- }
- return nil
-
-}
+type fn func(ac appcontext.AppContext, client *kubeclient.Client, res string, app string, cluster string, label string) error
-func (instca *CompositeAppContext) InstantiateComApp(cid interface{}) error {
+func applyFnComApp(cid interface{}, con *connector.Connector, f fn, breakonError bool) error {
ac := appcontext.AppContext{}
-
+ g, _ := errgroup.WithContext(context.Background())
_, err := ac.LoadAppContext(cid)
if err != nil {
return err
}
- instca.cid = cid
-
- appsorder, err := ac.GetAppInstruction("order")
- if err != nil {
- return err
- }
- instca.appsorder = appsorder.(string)
- appsdependency, err := ac.GetAppInstruction("dependency")
- if err != nil {
- return err
- }
- instca.appsdependency = appsdependency.(string)
- instca.appsmap, err = getInstMap(instca.appsorder, instca.appsdependency, "app")
+ appsOrder, err := ac.GetAppInstruction("order")
if err != nil {
return err
}
+ var appList map[string][]string
+ json.Unmarshal([]byte(appsOrder.(string)), &appList)
+ logutils.Info("appsorder ", logutils.Fields{
+ "appsorder": appsOrder,
+ "string": appList,
+ })
+ id, _ := ac.GetCompositeAppHandle()
- for j := 0; j < len(instca.appsmap); j++ {
- clusternames, err := ac.GetClusterNames(instca.appsmap[j].name)
- if err != nil {
- return err
- }
- instca.appsmap[j].clusters = make([]clusterInfo, len(clusternames))
- for k := 0; k < len(clusternames); k++ {
- instca.appsmap[j].clusters[k].name = clusternames[k]
- resorder, err := ac.GetResourceInstruction(
- instca.appsmap[j].name, clusternames[k], "order")
- if err != nil {
- return err
- }
- instca.appsmap[j].clusters[k].resorder = resorder.(string)
+ for _, app := range appList["apporder"] {
- resdependency, err := ac.GetResourceInstruction(
- instca.appsmap[j].name, clusternames[k], "dependency")
+ appName := app
+ results := strings.Split(id.(string), "/")
+ label := results[2] + "-" + app
+ g.Go(func() error {
+ clusterNames, err := ac.GetClusterNames(appName)
if err != nil {
return err
}
- instca.appsmap[j].clusters[k].resdependency = resdependency.(string)
-
- instca.appsmap[j].clusters[k].ressmap, err = getInstMap(
- instca.appsmap[j].clusters[k].resorder,
- instca.appsmap[j].clusters[k].resdependency, "res")
- if err != nil {
+ rg, _ := errgroup.WithContext(context.Background())
+ for k := 0; k < len(clusterNames); k++ {
+ cluster := clusterNames[k]
+ err = status.StartClusterWatcher(cluster)
+ if err != nil {
+ log.Printf("Error starting Cluster Watcher %v: %v\n", cluster, err)
+ }
+ rg.Go(func() error {
+ c, err := con.GetClient(cluster)
+ if err != nil {
+ logutils.Error("Error in creating kubeconfig client", logutils.Fields{
+ "error": err,
+ "cluster": cluster,
+ "appName": appName,
+ })
+ return err
+ }
+ resorder, err := ac.GetResourceInstruction(appName, cluster, "order")
+ if err != nil {
+ logutils.Error("Resorder error ", logutils.Fields{"error": err})
+ return err
+ }
+ var aov map[string][]string
+ json.Unmarshal([]byte(resorder.(string)), &aov)
+ for _, res := range aov["resorder"] {
+ err = f(ac, c, res, appName, cluster, label)
+ if err != nil {
+ logutils.Error("Error in resource %s: %v", logutils.Fields{
+ "error": err,
+ "cluster": cluster,
+ "resource": res,
+ })
+ if breakonError {
+ return err
+ }
+ }
+ }
+ return nil
+ })
+ }
+ if err := rg.Wait(); err != nil {
+ logutils.Error("Encountered error in App cluster", logutils.Fields{
+ "error": err,
+ })
return err
}
- }
+ return nil
+ })
}
- err = instantiateApps(ac, instca.appsmap)
- if err != nil {
+ if err := g.Wait(); err != nil {
+ logutils.Error("Encountered error", logutils.Fields{
+ "error": err,
+ })
return err
}
-
return nil
}
-// Delete all the apps
-func terminateApps(ac appcontext.AppContext, appsmap []instMap) error {
- var wg sync.WaitGroup
- var chans = make([]chan int, len(appsmap))
- for l := range chans {
- chans[l] = make(chan int)
- }
- for i := 0; i < len(appsmap); i++ {
- wg.Add(1)
- go func(index int) {
- if appsmap[index].depinfo == "go" {
- appsmap[index].status = "start"
- } else {
- appsmap[index].status = "waiting"
- c := <-chans[index]
- if c != index {
- appsmap[index].status = "error"
- appsmap[index].rerr = pkgerrors.Errorf("channel does not match")
- wg.Done()
- return
- }
- appsmap[index].status = "start"
- }
- appsmap[index].rerr = terminateApp(ac, appsmap[index])
- appsmap[index].status = "done"
- waitstr := fmt.Sprintf("wait on %v", appsmap[index].name)
- for j := 0; j < len(appsmap); j++ {
- if appsmap[j].depinfo == waitstr {
- chans[j] <- j
- }
- }
- wg.Done()
- }(i)
- }
- wg.Wait()
- for k := 0; k < len(appsmap); k++ {
- if appsmap[k].rerr != nil {
- return pkgerrors.Errorf("error during apps instantiation")
- }
+// InstantiateComApp Instantiate Apps in Composite App
+func (instca *CompositeAppContext) InstantiateComApp(cid interface{}) error {
+ con := connector.Init(cid)
+ err := applyFnComApp(cid, con, instantiateResource, true)
+ if err != nil {
+ logutils.Error("InstantiateComApp unsuccessful", logutils.Fields{"error": err})
+ return err
}
+ //Cleanup
+ con.RemoveClient()
return nil
-
}
-// Delete all the resources for a given context
+// TerminateComApp Terminates Apps in Composite App
func (instca *CompositeAppContext) TerminateComApp(cid interface{}) error {
- ac := appcontext.AppContext{}
-
- _, err := ac.LoadAppContext(cid)
- if err != nil {
- return err
- }
- instca.cid = cid
-
- appsorder, err := ac.GetAppInstruction("order")
- if err != nil {
- return err
- }
- instca.appsorder = appsorder.(string)
- appsdependency, err := ac.GetAppInstruction("dependency")
- if err != nil {
- return err
- }
- instca.appsdependency = appsdependency.(string)
- instca.appsmap, err = getInstMap(instca.appsorder, instca.appsdependency, "app")
+ con := connector.Init(cid)
+ err := applyFnComApp(cid, con, terminateResource, false)
if err != nil {
+ logutils.Error("TerminateComApp unsuccessful", logutils.Fields{
+ "error": err,
+ })
return err
}
-
- for j := 0; j < len(instca.appsmap); j++ {
- clusternames, err := ac.GetClusterNames(instca.appsmap[j].name)
- if err != nil {
- return err
- }
- instca.appsmap[j].clusters = make([]clusterInfo, len(clusternames))
- for k := 0; k < len(clusternames); k++ {
- instca.appsmap[j].clusters[k].name = clusternames[k]
- resorder, err := ac.GetResourceInstruction(
- instca.appsmap[j].name, clusternames[k], "order")
- if err != nil {
- return err
- }
- instca.appsmap[j].clusters[k].resorder = resorder.(string)
-
- resdependency, err := ac.GetResourceInstruction(
- instca.appsmap[j].name, clusternames[k], "dependency")
- if err != nil {
- return err
- }
- instca.appsmap[j].clusters[k].resdependency = resdependency.(string)
-
- instca.appsmap[j].clusters[k].ressmap, err = getInstMap(
- instca.appsmap[j].clusters[k].resorder,
- instca.appsmap[j].clusters[k].resdependency, "res")
- if err != nil {
- return err
- }
- }
- }
- err = terminateApps(ac, instca.appsmap)
- if err != nil {
- return err
- }
-
+ //Cleanup
+ con.RemoveClient()
return nil
-
}
diff --git a/src/rsync/pkg/internal/utils.go b/src/rsync/pkg/internal/utils.go
index 270edba5..09415ed5 100644
--- a/src/rsync/pkg/internal/utils.go
+++ b/src/rsync/pkg/internal/utils.go
@@ -1,5 +1,5 @@
/*
-Copyright 2018 Intel Corporation.
+Copyright 2020 Intel Corporation.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
@@ -15,10 +15,14 @@ package utils
import (
"io/ioutil"
+ "log"
"os"
"path"
+ corev1 "k8s.io/api/core/v1"
+
pkgerrors "github.com/pkg/errors"
+ "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/kubernetes/scheme"
)
@@ -68,3 +72,41 @@ func EnsureDirectory(f string) error {
}
return os.MkdirAll(base, 0755)
}
+
+// TagPodsIfPresent finds the PodTemplateSpec from any workload
+// object that contains it and changes the spec to include the tag label
+func TagPodsIfPresent(unstruct *unstructured.Unstructured, tag string) {
+
+ spec, ok := unstruct.Object["spec"].(map[string]interface{})
+ if !ok {
+ log.Println("Error converting spec to map")
+ return
+ }
+
+ template, ok := spec["template"].(map[string]interface{})
+ if !ok {
+ //log.Println("Error converting template to map")
+ return
+ }
+ log.Println("Apply label in template")
+ //Attempt to convert the template to a podtemplatespec.
+ //This is to check if we have any pods being created.
+ podTemplateSpec := &corev1.PodTemplateSpec{}
+ err := runtime.DefaultUnstructuredConverter.FromUnstructured(template, podTemplateSpec)
+ if err != nil {
+ log.Println("Did not find a podTemplateSpec: " + err.Error())
+ return
+ }
+
+ labels := podTemplateSpec.GetLabels()
+ if labels == nil {
+ labels = map[string]string{}
+ }
+ labels["emco/deployment-id"] = tag
+ podTemplateSpec.SetLabels(labels)
+
+ updatedTemplate, err := runtime.DefaultUnstructuredConverter.ToUnstructured(podTemplateSpec)
+
+ //Set the label
+ spec["template"] = updatedTemplate
+}
diff --git a/src/rsync/pkg/resource/resource.go b/src/rsync/pkg/resource/resource.go
deleted file mode 100644
index 2877e2a3..00000000
--- a/src/rsync/pkg/resource/resource.go
+++ /dev/null
@@ -1,130 +0,0 @@
-/*
-Copyright 2018 Intel Corporation.
-Licensed under the Apache License, Version 2.0 (the "License");
-you may not use this file except in compliance with the License.
-You may obtain a copy of the License at
- http://www.apache.org/licenses/LICENSE-2.0
-Unless required by applicable law or agreed to in writing, software
-distributed under the License is distributed on an "AS IS" BASIS,
-WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-See the License for the specific language governing permissions and
-limitations under the License.
-*/
-
-package resource
-
-import (
- pkgerrors "github.com/pkg/errors"
- "k8s.io/apimachinery/pkg/api/meta"
- metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
- "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
- "k8s.io/apimachinery/pkg/runtime/schema"
-
- "github.com/onap/multicloud-k8s/src/rsync/pkg/connector"
- utils "github.com/onap/multicloud-k8s/src/rsync/pkg/internal"
-)
-
-type Resource struct {
-}
-
-// Create deployment object in a specific Kubernetes cluster
-func (r Resource) Create(data string, namespace string, label string, client connector.KubernetesConnector) (string, error) {
- if namespace == "" {
- namespace = "default"
- }
-
- //Decode the yaml file to create a runtime.Object
- unstruct := &unstructured.Unstructured{}
- //Ignore the returned obj as we expect the data in unstruct
- _, err := utils.DecodeYAMLData(data, unstruct)
- if err != nil {
- return "", pkgerrors.Wrap(err, "Decode deployment object error")
- }
-
- dynClient := client.GetDynamicClient()
- mapper := client.GetMapper()
-
- gvk := unstruct.GroupVersionKind()
- mapping, err := mapper.RESTMapping(schema.GroupKind{Group: gvk.Group, Kind: gvk.Kind}, gvk.Version)
- if err != nil {
- return "", pkgerrors.Wrap(err, "Mapping kind to resource error")
- }
-
- //Add the tracking label to all resources created here
- labels := unstruct.GetLabels()
- //Check if labels exist for this object
- if labels == nil {
- labels = map[string]string{}
- }
- //labels[config.GetConfiguration().KubernetesLabelName] = client.GetInstanceID()
- labels["emco/deployment-id"] = label
- unstruct.SetLabels(labels)
-
- // This checks if the resource we are creating has a podSpec in it
- // Eg: Deployment, StatefulSet, Job etc..
- // If a PodSpec is found, the label will be added to it too.
- //connector.TagPodsIfPresent(unstruct, client.GetInstanceID())
- connector.TagPodsIfPresent(unstruct, label)
-
- gvr := mapping.Resource
- var createdObj *unstructured.Unstructured
-
- switch mapping.Scope.Name() {
- case meta.RESTScopeNameNamespace:
- createdObj, err = dynClient.Resource(gvr).Namespace(namespace).Create(unstruct, metav1.CreateOptions{})
- case meta.RESTScopeNameRoot:
- createdObj, err = dynClient.Resource(gvr).Create(unstruct, metav1.CreateOptions{})
- default:
- return "", pkgerrors.New("Got an unknown RESTSCopeName for mapping: " + gvk.String())
- }
-
- if err != nil {
- return "", pkgerrors.Wrap(err, "Create object error")
- }
-
- return createdObj.GetName(), nil
-}
-
-// Delete an existing resource hosted in a specific Kubernetes cluster
-func (r Resource) Delete(data string, resname string, namespace string, client connector.KubernetesConnector) error {
- if namespace == "" {
- namespace = "default"
- }
-
- //Decode the yaml file to create a runtime.Object
- unstruct := &unstructured.Unstructured{}
- //Ignore the returned obj as we expect the data in unstruct
- _, err := utils.DecodeYAMLData(data, unstruct)
- if err != nil {
- return pkgerrors.Wrap(err, "Decode deployment object error")
- }
-
- dynClient := client.GetDynamicClient()
- mapper := client.GetMapper()
-
- gvk := unstruct.GroupVersionKind()
- mapping, err := mapper.RESTMapping(schema.GroupKind{Group: gvk.Group, Kind: gvk.Kind}, gvk.Version)
- if err != nil {
- return pkgerrors.Wrap(err, "Mapping kind to resource error")
- }
-
- gvr := mapping.Resource
- deletePolicy := metav1.DeletePropagationForeground
- opts := &metav1.DeleteOptions{
- PropagationPolicy: &deletePolicy,
- }
-
- switch mapping.Scope.Name() {
- case meta.RESTScopeNameNamespace:
- err = dynClient.Resource(gvr).Namespace(namespace).Delete(resname, opts)
- case meta.RESTScopeNameRoot:
- err = dynClient.Resource(gvr).Delete(resname, opts)
- default:
- return pkgerrors.New("Got an unknown RESTSCopeName for mappin")
- }
-
- if err != nil {
- return pkgerrors.Wrap(err, "Delete object error")
- }
- return nil
-}