summaryrefslogtreecommitdiffstats
path: root/src/rsync/pkg/client
diff options
context:
space:
mode:
authorRitu Sood <ritu.sood@intel.com>2020-07-08 20:44:33 -0700
committerRitu Sood <ritu.sood@intel.com>2020-07-10 19:05:02 -0700
commitb986e8938aaa26945dc7dcdcb990ec8aa53afff0 (patch)
tree85fe870b3cf197fb865c1d02d482b95a169ee714 /src/rsync/pkg/client
parent9a9a6aedbd7a0dea952baad52d78cf43cd6e2ecf (diff)
Update Rsync
Changed Rsync to use ordered install. Changed to use cli-runtime instead of go-client. Based on code from repo https://github.com/johandry/klient Issue-ID: MULTICLOUD-1005 Signed-off-by: Ritu Sood <ritu.sood@intel.com> Change-Id: I4c2537cb74bd4d24a409cc1f0b7f9ee0875a4e39
Diffstat (limited to 'src/rsync/pkg/client')
-rw-r--r--src/rsync/pkg/client/apply.go93
-rw-r--r--src/rsync/pkg/client/client.go190
-rw-r--r--src/rsync/pkg/client/create.go55
-rw-r--r--src/rsync/pkg/client/delete.go95
-rw-r--r--src/rsync/pkg/client/factory.go291
-rw-r--r--src/rsync/pkg/client/helpers.go75
-rw-r--r--src/rsync/pkg/client/patch.go216
-rw-r--r--src/rsync/pkg/client/replace.go51
8 files changed, 1066 insertions, 0 deletions
diff --git a/src/rsync/pkg/client/apply.go b/src/rsync/pkg/client/apply.go
new file mode 100644
index 00000000..96233a31
--- /dev/null
+++ b/src/rsync/pkg/client/apply.go
@@ -0,0 +1,93 @@
+/*
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+ http://www.apache.org/licenses/LICENSE-2.0
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+// Based on Code: https://github.com/johandry/klient
+package client
+
+import (
+ "k8s.io/apimachinery/pkg/api/errors"
+ metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
+ "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
+ "k8s.io/apimachinery/pkg/runtime"
+ "k8s.io/apimachinery/pkg/types"
+ "k8s.io/cli-runtime/pkg/resource"
+ "k8s.io/kubectl/pkg/util"
+)
+
+// Apply creates a resource with the given content
+func (c *Client) Apply(content []byte) error {
+ r := c.ResultForContent(content, nil)
+ return c.ApplyResource(r)
+}
+
+// ApplyFiles create the resource(s) from the given filenames (file, directory or STDIN) or HTTP URLs
+func (c *Client) ApplyFiles(filenames ...string) error {
+ r := c.ResultForFilenameParam(filenames, nil)
+ return c.ApplyResource(r)
+}
+
+// ApplyResource applies the given resource. Create the resources with `ResultForFilenameParam` or `ResultForContent`
+func (c *Client) ApplyResource(r *resource.Result) error {
+ if err := r.Err(); err != nil {
+ return err
+ }
+
+ // Is ServerSideApply requested
+ if c.ServerSideApply {
+ return r.Visit(serverSideApply)
+ }
+
+ return r.Visit(apply)
+}
+
+func apply(info *resource.Info, err error) error {
+ if err != nil {
+ return failedTo("apply", info, err)
+ }
+
+ // If it does not exists, just create it
+ current, err := resource.NewHelper(info.Client, info.Mapping).Get(info.Namespace, info.Name, info.Export)
+ if err != nil {
+ if !errors.IsNotFound(err) {
+ return failedTo("retrieve current configuration", info, err)
+ }
+ if err := util.CreateApplyAnnotation(info.Object, unstructured.UnstructuredJSONScheme); err != nil {
+ return failedTo("set annotation", info, err)
+ }
+ return create(info, nil)
+ }
+
+ // If exists, patch it
+ return patch(info, current)
+}
+
+func serverSideApply(info *resource.Info, err error) error {
+ if err != nil {
+ return failedTo("serverside apply", info, err)
+ }
+
+ data, err := runtime.Encode(unstructured.UnstructuredJSONScheme, info.Object)
+ if err != nil {
+ return failedTo("encode for the serverside apply", info, err)
+ }
+
+ options := metav1.PatchOptions{
+ // TODO: Find out how to get the force conflict flag
+ // Force: &forceConflicts,
+ // FieldManager: FieldManager,
+ }
+ obj, err := resource.NewHelper(info.Client, info.Mapping).Patch(info.Namespace, info.Name, types.ApplyPatchType, data, &options)
+ if err != nil {
+ return failedTo("serverside patch", info, err)
+ }
+ info.Refresh(obj, true)
+ return nil
+}
diff --git a/src/rsync/pkg/client/client.go b/src/rsync/pkg/client/client.go
new file mode 100644
index 00000000..0eaded22
--- /dev/null
+++ b/src/rsync/pkg/client/client.go
@@ -0,0 +1,190 @@
+/*
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+ http://www.apache.org/licenses/LICENSE-2.0
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+// Based on Code: https://github.com/johandry/klient
+package client
+
+import (
+ "bytes"
+ "fmt"
+ "io"
+
+ v1 "k8s.io/api/core/v1"
+ "k8s.io/cli-runtime/pkg/resource"
+ "k8s.io/client-go/kubernetes"
+ "k8s.io/kubectl/pkg/validation"
+)
+
+// DefaultValidation default action to validate. If `true` all resources by
+// default will be validated.
+const DefaultValidation = true
+
+// Client is a kubernetes client, like `kubectl`
+type Client struct {
+ Clientset *kubernetes.Clientset
+ factory *factory
+ validator validation.Schema
+ namespace string
+ enforceNamespace bool
+ forceConflicts bool
+ ServerSideApply bool
+}
+
+// Result is an alias for the Kubernetes CLI runtime resource.Result
+type Result = resource.Result
+
+// BuilderOptions parameters to create a Resource Builder
+type BuilderOptions struct {
+ Unstructured bool
+ Validate bool
+ Namespace string
+ LabelSelector string
+ FieldSelector string
+ All bool
+ AllNamespaces bool
+}
+
+// NewBuilderOptions creates a BuilderOptions with the default values for
+// the parameters to create a Resource Builder
+func NewBuilderOptions() *BuilderOptions {
+ return &BuilderOptions{
+ Unstructured: true,
+ Validate: true,
+ }
+}
+
+// NewE creates a kubernetes client, returns an error if fail
+func NewE(context, kubeconfig string, ns string) (*Client, error) {
+ var namespace string
+ var enforceNamespace bool
+ var err error
+ factory := newFactory(context, kubeconfig)
+
+ // If `true` it will always validate the given objects/resources
+ // Unless something different is specified in the NewBuilderOptions
+ validator, _ := factory.Validator(DefaultValidation)
+
+ if ns == "" {
+ namespace, enforceNamespace, err = factory.ToRawKubeConfigLoader().Namespace()
+ if err != nil {
+ namespace = v1.NamespaceDefault
+ enforceNamespace = true
+ }
+ } else {
+ namespace = ns
+ enforceNamespace = false
+ }
+ clientset, err := factory.KubernetesClientSet()
+ if err != nil {
+ return nil, err
+ }
+ if clientset == nil {
+ return nil, fmt.Errorf("cannot create a clientset from given context and kubeconfig")
+ }
+
+ return &Client{
+ factory: factory,
+ Clientset: clientset,
+ validator: validator,
+ namespace: namespace,
+ enforceNamespace: enforceNamespace,
+ }, nil
+}
+
+// New creates a kubernetes client
+func New(context, kubeconfig string, namespace string) *Client {
+ client, _ := NewE(context, kubeconfig, namespace)
+ return client
+}
+
+// Builder creates a resource builder
+func (c *Client) builder(opt *BuilderOptions) *resource.Builder {
+ validator := c.validator
+ namespace := c.namespace
+
+ if opt == nil {
+ opt = NewBuilderOptions()
+ } else {
+ if opt.Validate != DefaultValidation {
+ validator, _ = c.factory.Validator(opt.Validate)
+ }
+ if opt.Namespace != "" {
+ namespace = opt.Namespace
+ }
+ }
+
+ b := c.factory.NewBuilder()
+ if opt.Unstructured {
+ b = b.Unstructured()
+ }
+
+ return b.
+ Schema(validator).
+ ContinueOnError().
+ NamespaceParam(namespace).DefaultNamespace()
+}
+
+// ResultForFilenameParam returns the builder results for the given list of files or URLs
+func (c *Client) ResultForFilenameParam(filenames []string, opt *BuilderOptions) *Result {
+ filenameOptions := &resource.FilenameOptions{
+ Recursive: false,
+ Filenames: filenames,
+ }
+
+ return c.builder(opt).
+ FilenameParam(c.enforceNamespace, filenameOptions).
+ Flatten().
+ Do()
+}
+
+// ResultForReader returns the builder results for the given reader
+func (c *Client) ResultForReader(r io.Reader, opt *BuilderOptions) *Result {
+ return c.builder(opt).
+ Stream(r, "").
+ Flatten().
+ Do()
+}
+
+// func (c *Client) ResultForName(opt *BuilderOptions, names ...string) *Result {
+// return c.builder(opt).
+// LabelSelectorParam(opt.LabelSelector).
+// FieldSelectorParam(opt.FieldSelector).
+// SelectAllParam(opt.All).
+// AllNamespaces(opt.AllNamespaces).
+// ResourceTypeOrNameArgs(false, names...).RequireObject(false).
+// Flatten().
+// Do()
+// }
+
+// ResultForContent returns the builder results for the given content
+func (c *Client) ResultForContent(content []byte, opt *BuilderOptions) *Result {
+ b := bytes.NewBuffer(content)
+ return c.ResultForReader(b, opt)
+}
+
+func failedTo(action string, info *resource.Info, err error) error {
+ var resKind string
+ if info.Mapping != nil {
+ resKind = info.Mapping.GroupVersionKind.Kind + " "
+ }
+
+ return fmt.Errorf("cannot %s object Kind: %q, Name: %q, Namespace: %q. %s", action, resKind, info.Name, info.Namespace, err)
+}
+
+// IsReachable tests connectivity to the cluster
+func (c *Client) IsReachable() error {
+ client, _ := c.factory.KubernetesClientSet()
+ _, err := client.ServerVersion()
+ if err != nil {
+ return fmt.Errorf("Kubernetes cluster unreachable")
+ }
+ return nil
+} \ No newline at end of file
diff --git a/src/rsync/pkg/client/create.go b/src/rsync/pkg/client/create.go
new file mode 100644
index 00000000..755420ca
--- /dev/null
+++ b/src/rsync/pkg/client/create.go
@@ -0,0 +1,55 @@
+/*
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+ http://www.apache.org/licenses/LICENSE-2.0
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+// Based on Code: https://github.com/johandry/klient
+package client
+
+import (
+ metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
+ "k8s.io/cli-runtime/pkg/resource"
+)
+
+// Create creates a resource with the given content
+func (c *Client) Create(content []byte) error {
+ r := c.ResultForContent(content, nil)
+ return c.CreateResource(r)
+}
+
+// CreateFile creates a resource with the given content
+func (c *Client) CreateFile(filenames ...string) error {
+ r := c.ResultForFilenameParam(filenames, nil)
+ return c.CreateResource(r)
+}
+
+// CreateResource creates the given resource. Create the resources with `ResultForFilenameParam` or `ResultForContent`
+func (c *Client) CreateResource(r *resource.Result) error {
+ if err := r.Err(); err != nil {
+ return err
+ }
+ return r.Visit(create)
+}
+
+func create(info *resource.Info, err error) error {
+ if err != nil {
+ return failedTo("create", info, err)
+ }
+
+ // TODO: If will be allow to do create then apply, here must be added the annotation as in Apply/Patch
+
+ options := metav1.CreateOptions{}
+ obj, err := resource.NewHelper(info.Client, info.Mapping).Create(info.Namespace, true, info.Object, &options)
+ if err != nil {
+ return failedTo("create", info, err)
+ }
+ info.Refresh(obj, true)
+
+ return nil
+}
diff --git a/src/rsync/pkg/client/delete.go b/src/rsync/pkg/client/delete.go
new file mode 100644
index 00000000..1e6aa46a
--- /dev/null
+++ b/src/rsync/pkg/client/delete.go
@@ -0,0 +1,95 @@
+/*
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+ http://www.apache.org/licenses/LICENSE-2.0
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+// Based on Code: https://github.com/johandry/klient
+package client
+
+import (
+ metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
+ "k8s.io/apimachinery/pkg/runtime"
+ "k8s.io/cli-runtime/pkg/resource"
+)
+
+const (
+ // Period of time in seconds given to the resource to terminate gracefully when delete it (used when require to recreate the resource). Ignored if negative. Set to 1 for immediate shutdown. Can only be set to 0 when force is true (force deletion)
+ gracePeriod = -1
+ // If true, cascade the deletion of the resources managed by this resource (e.g. Pods created by a ReplicationController).
+ cascade = true
+)
+
+// Delete creates a resource with the given content
+func (c *Client) Delete(content []byte) error {
+ r := c.ResultForContent(content, nil)
+ return c.DeleteResource(r)
+}
+
+// DeleteFiles create the resource(s) from the given filenames (file, directory or STDIN) or HTTP URLs
+func (c *Client) DeleteFiles(filenames ...string) error {
+ r := c.ResultForFilenameParam(filenames, nil)
+ return c.DeleteResource(r)
+}
+
+// DeleteResource applies the given resource. Create the resources with `ResultForFilenameParam` or `ResultForContent`
+func (c *Client) DeleteResource(r *resource.Result) error {
+ if err := r.Err(); err != nil {
+ return err
+ }
+ return r.Visit(delete)
+}
+
+func delete(info *resource.Info, err error) error {
+ if err != nil {
+ return failedTo("delete", info, err)
+ }
+
+ // TODO: Background or Foreground?
+ // policy := metav1.DeletePropagationForeground
+ policy := metav1.DeletePropagationBackground
+ options := metav1.DeleteOptions{
+ PropagationPolicy: &policy,
+ }
+
+ if _, err := deleteWithOptions(info, &options); err != nil {
+ return failedTo("delete", info, err)
+ }
+ return nil
+}
+
+func defaultDeleteOptions() *metav1.DeleteOptions {
+ // TODO: Change DryRun value when DryRun is implemented
+ dryRun := false
+
+ options := &metav1.DeleteOptions{}
+ if gracePeriod >= 0 {
+ options = metav1.NewDeleteOptions(int64(gracePeriod))
+ }
+
+ if dryRun {
+ options.DryRun = []string{metav1.DryRunAll}
+ }
+
+ // TODO: Background or Foreground?
+ // policy := metav1.DeletePropagationBackground
+ policy := metav1.DeletePropagationForeground
+ if !cascade {
+ policy = metav1.DeletePropagationOrphan
+ }
+ options.PropagationPolicy = &policy
+
+ return options
+}
+
+func deleteWithOptions(info *resource.Info, options *metav1.DeleteOptions) (runtime.Object, error) {
+ if options == nil {
+ options = defaultDeleteOptions()
+ }
+ return resource.NewHelper(info.Client, info.Mapping).DeleteWithOptions(info.Namespace, info.Name, options)
+}
diff --git a/src/rsync/pkg/client/factory.go b/src/rsync/pkg/client/factory.go
new file mode 100644
index 00000000..39c1a177
--- /dev/null
+++ b/src/rsync/pkg/client/factory.go
@@ -0,0 +1,291 @@
+/*
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+ http://www.apache.org/licenses/LICENSE-2.0
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+// Based on Code: https://github.com/johandry/klient
+package client
+
+import (
+ "path/filepath"
+ "regexp"
+ "strings"
+ "sync"
+ "time"
+
+ corev1 "k8s.io/api/core/v1"
+ apiextv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
+ apiextv1beta1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1beta1"
+ "k8s.io/apimachinery/pkg/api/meta"
+ "k8s.io/apimachinery/pkg/runtime/schema"
+ "k8s.io/cli-runtime/pkg/genericclioptions"
+ "k8s.io/cli-runtime/pkg/resource"
+ "k8s.io/client-go/discovery"
+ diskcached "k8s.io/client-go/discovery/cached/disk"
+ "k8s.io/client-go/dynamic"
+ "k8s.io/client-go/kubernetes"
+ "k8s.io/client-go/kubernetes/scheme"
+ "k8s.io/client-go/rest"
+ "k8s.io/client-go/restmapper"
+ "k8s.io/client-go/tools/clientcmd"
+ "k8s.io/client-go/util/homedir"
+ "k8s.io/kubectl/pkg/util/openapi"
+ openapivalidation "k8s.io/kubectl/pkg/util/openapi/validation"
+ "k8s.io/kubectl/pkg/validation"
+)
+
+// factory implements the kubectl Factory interface which also requieres to
+// implement the genericclioptions.RESTClientGetter interface.
+// The Factory inerface provides abstractions that allow the Kubectl command to
+// be extended across multiple types of resources and different API sets.
+type factory struct {
+ KubeConfig string
+ Context string
+ initOpenAPIGetterOnce sync.Once
+ openAPIGetter openapi.Getter
+}
+
+// If multiple clients are created, this sync.once make sure the CRDs are added
+// only once into the API extensions v1 and v1beta schemes
+var addToSchemeOnce sync.Once
+
+var _ genericclioptions.RESTClientGetter = &factory{}
+
+// newFactory creates a new client factory which encapsulate a REST client getter
+func newFactory(context, kubeconfig string) *factory {
+ factory := &factory{
+ KubeConfig: kubeconfig,
+ Context: context,
+ }
+
+ // From: helm/pkg/kube/client.go > func New()
+ // Add CRDs to the scheme. They are missing by default.
+ addToSchemeOnce.Do(func() {
+ if err := apiextv1.AddToScheme(scheme.Scheme); err != nil {
+ panic(err)
+ }
+ if err := apiextv1beta1.AddToScheme(scheme.Scheme); err != nil {
+ panic(err)
+ }
+ })
+
+ return factory
+}
+
+// BuildRESTConfig builds a kubernetes REST client factory using the following
+// rules from ToRawKubeConfigLoader()
+// func BuildRESTConfig(context, kubeconfig string) (*rest.Config, error) {
+// return newFactory(context, kubeconfig).ToRESTConfig()
+// }
+
+// ToRESTConfig creates a kubernetes REST client factory.
+// It's required to implement the interface genericclioptions.RESTClientGetter
+func (f *factory) ToRESTConfig() (*rest.Config, error) {
+ // From: k8s.io/kubectl/pkg/cmd/util/kubectl_match_version.go > func setKubernetesDefaults()
+ config, err := f.ToRawKubeConfigLoader().ClientConfig()
+ if err != nil {
+ return nil, err
+ }
+
+ if config.GroupVersion == nil {
+ config.GroupVersion = &schema.GroupVersion{Group: "", Version: "v1"}
+ }
+ if config.APIPath == "" {
+ config.APIPath = "/api"
+ }
+ if config.NegotiatedSerializer == nil {
+ // This codec config ensures the resources are not converted. Therefore, resources
+ // will not be round-tripped through internal versions. Defaulting does not happen
+ // on the client.
+ config.NegotiatedSerializer = scheme.Codecs.WithoutConversion()
+ }
+
+ rest.SetKubernetesDefaults(config)
+ return config, nil
+}
+
+// ToRawKubeConfigLoader creates a client factory using the following rules:
+// 1. builds from the given kubeconfig path, if not empty
+// 2. use the in cluster factory if running in-cluster
+// 3. gets the factory from KUBECONFIG env var
+// 4. Uses $HOME/.kube/factory
+// It's required to implement the interface genericclioptions.RESTClientGetter
+func (f *factory) ToRawKubeConfigLoader() clientcmd.ClientConfig {
+ loadingRules := clientcmd.NewDefaultClientConfigLoadingRules()
+ loadingRules.DefaultClientConfig = &clientcmd.DefaultClientConfig
+ if len(f.KubeConfig) != 0 {
+ loadingRules.ExplicitPath = f.KubeConfig
+ }
+ configOverrides := &clientcmd.ConfigOverrides{
+ ClusterDefaults: clientcmd.ClusterDefaults,
+ }
+ if len(f.Context) != 0 {
+ configOverrides.CurrentContext = f.Context
+ }
+
+ return clientcmd.NewNonInteractiveDeferredLoadingClientConfig(loadingRules, configOverrides)
+}
+
+// overlyCautiousIllegalFileCharacters matches characters that *might* not be supported. Windows is really restrictive, so this is really restrictive
+var overlyCautiousIllegalFileCharacters = regexp.MustCompile(`[^(\w/\.)]`)
+
+// ToDiscoveryClient returns a CachedDiscoveryInterface using a computed RESTConfig
+// It's required to implement the interface genericclioptions.RESTClientGetter
+func (f *factory) ToDiscoveryClient() (discovery.CachedDiscoveryInterface, error) {
+ // From: k8s.io/cli-runtime/pkg/genericclioptions/config_flags.go > func (*configFlags) ToDiscoveryClient()
+ factory, err := f.ToRESTConfig()
+ if err != nil {
+ return nil, err
+ }
+ factory.Burst = 100
+ defaultHTTPCacheDir := filepath.Join(homedir.HomeDir(), ".kube", "http-cache")
+
+ // takes the parentDir and the host and comes up with a "usually non-colliding" name for the discoveryCacheDir
+ parentDir := filepath.Join(homedir.HomeDir(), ".kube", "cache", "discovery")
+ // strip the optional scheme from host if its there:
+ schemelessHost := strings.Replace(strings.Replace(factory.Host, "https://", "", 1), "http://", "", 1)
+ // now do a simple collapse of non-AZ09 characters. Collisions are possible but unlikely. Even if we do collide the problem is short lived
+ safeHost := overlyCautiousIllegalFileCharacters.ReplaceAllString(schemelessHost, "_")
+ discoveryCacheDir := filepath.Join(parentDir, safeHost)
+
+ return diskcached.NewCachedDiscoveryClientForConfig(factory, discoveryCacheDir, defaultHTTPCacheDir, time.Duration(10*time.Minute))
+}
+
+// ToRESTMapper returns a mapper
+// It's required to implement the interface genericclioptions.RESTClientGetter
+func (f *factory) ToRESTMapper() (meta.RESTMapper, error) {
+ // From: k8s.io/cli-runtime/pkg/genericclioptions/config_flags.go > func (*configFlags) ToRESTMapper()
+ discoveryClient, err := f.ToDiscoveryClient()
+ if err != nil {
+ return nil, err
+ }
+
+ mapper := restmapper.NewDeferredDiscoveryRESTMapper(discoveryClient)
+ expander := restmapper.NewShortcutExpander(mapper, discoveryClient)
+ return expander, nil
+}
+
+// KubernetesClientSet creates a kubernetes clientset from the configuration
+// It's required to implement the Factory interface
+func (f *factory) KubernetesClientSet() (*kubernetes.Clientset, error) {
+ // From: k8s.io/kubectl/pkg/cmd/util/factory_client_access.go > func (*factoryImpl) KubernetesClientSet()
+ factory, err := f.ToRESTConfig()
+ if err != nil {
+ return nil, err
+ }
+ return kubernetes.NewForConfig(factory)
+}
+
+// DynamicClient creates a dynamic client from the configuration
+// It's required to implement the Factory interface
+func (f *factory) DynamicClient() (dynamic.Interface, error) {
+ // From: k8s.io/kubectl/pkg/cmd/util/factory_client_access.go > func (*factoryImpl) DynamicClient()
+ factory, err := f.ToRESTConfig()
+ if err != nil {
+ return nil, err
+ }
+ return dynamic.NewForConfig(factory)
+}
+
+// NewBuilder returns a new resource builder for structured api objects.
+// It's required to implement the Factory interface
+func (f *factory) NewBuilder() *resource.Builder {
+ // From: k8s.io/kubectl/pkg/cmd/util/factory_client_access.go > func (*factoryImpl) NewBuilder()
+ return resource.NewBuilder(f)
+}
+
+// RESTClient creates a REST client from the configuration
+// It's required to implement the Factory interface
+func (f *factory) RESTClient() (*rest.RESTClient, error) {
+ // From: k8s.io/kubectl/pkg/cmd/util/factory_client_access.go > func (*factoryImpl) RESTClient()
+ factory, err := f.ToRESTConfig()
+ if err != nil {
+ return nil, err
+ }
+ return rest.RESTClientFor(factory)
+}
+
+func (f *factory) configForMapping(mapping *meta.RESTMapping) (*rest.Config, error) {
+ factory, err := f.ToRESTConfig()
+ if err != nil {
+ return nil, err
+ }
+
+ gvk := mapping.GroupVersionKind
+ factory.APIPath = "/apis"
+ if gvk.Group == corev1.GroupName {
+ factory.APIPath = "/api"
+ }
+ gv := gvk.GroupVersion()
+ factory.GroupVersion = &gv
+
+ return factory, nil
+}
+
+// ClientForMapping creates a resource REST client from the given mappings
+// It's required to implement the Factory interface
+func (f *factory) ClientForMapping(mapping *meta.RESTMapping) (resource.RESTClient, error) {
+ // From: k8s.io/kubectl/pkg/cmd/util/factory_client_access.go > func (*factoryImpl) ClientForMapping()
+ factory, err := f.configForMapping(mapping)
+ if err != nil {
+ return nil, err
+ }
+
+ return rest.RESTClientFor(factory)
+}
+
+// UnstructuredClientForMapping creates a unstructured resource REST client from the given mappings
+// It's required to implement the Factory interface
+func (f *factory) UnstructuredClientForMapping(mapping *meta.RESTMapping) (resource.RESTClient, error) {
+ // From: k8s.io/kubectl/pkg/cmd/util/factory_client_access.go > func (*factoryImpl) UnstructuredClientForMapping()
+ factory, err := f.configForMapping(mapping)
+ if err != nil {
+ return nil, err
+ }
+ factory.ContentConfig = resource.UnstructuredPlusDefaultContentConfig()
+
+ return rest.RESTClientFor(factory)
+}
+
+// Validator returns a schema that can validate objects stored on disk.
+// It's required to implement the Factory interface
+func (f *factory) Validator(validate bool) (validation.Schema, error) {
+ // From: k8s.io/kubectl/pkg/cmd/util/factory_client_access.go > func (*factoryImpl) Validator(bool)
+ if !validate {
+ return validation.NullSchema{}, nil
+ }
+
+ resources, err := f.OpenAPISchema()
+ if err != nil {
+ return nil, err
+ }
+
+ return validation.ConjunctiveSchema{
+ openapivalidation.NewSchemaValidation(resources),
+ validation.NoDoubleKeySchema{},
+ }, nil
+}
+
+// OpenAPISchema returns metadata and structural information about Kubernetes object definitions.
+// It's required to implement the Factory interface
+func (f *factory) OpenAPISchema() (openapi.Resources, error) {
+ // From: k8s.io/kubectl/pkg/cmd/util/factory_client-access.go > func (*factoryImpl) OpenAPISchema()
+ discovery, err := f.ToDiscoveryClient()
+ if err != nil {
+ return nil, err
+ }
+
+ f.initOpenAPIGetterOnce.Do(func() {
+ // Create the caching OpenAPIGetter
+ f.openAPIGetter = openapi.NewOpenAPIGetter(discovery)
+ })
+
+ // Delegate to the OpenAPIGetter
+ return f.openAPIGetter.Get()
+}
diff --git a/src/rsync/pkg/client/helpers.go b/src/rsync/pkg/client/helpers.go
new file mode 100644
index 00000000..de6ed93f
--- /dev/null
+++ b/src/rsync/pkg/client/helpers.go
@@ -0,0 +1,75 @@
+/*
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+ http://www.apache.org/licenses/LICENSE-2.0
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+// Based on Code: https://github.com/johandry/klient
+package client
+
+import (
+ v1 "k8s.io/api/core/v1"
+ metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
+)
+
+// CreateNamespace creates a namespace with the given name
+func (c *Client) CreateNamespace(namespace string) error {
+ ns := &v1.Namespace{
+ ObjectMeta: metav1.ObjectMeta{
+ Name: namespace,
+ Labels: map[string]string{
+ "name": namespace,
+ },
+ },
+ }
+ _, err := c.Clientset.CoreV1().Namespaces().Create(ns)
+ // if errors.IsAlreadyExists(err) {
+ // // If it failed because the NS is already there, then do not return such error
+ // return nil
+ // }
+
+ return err
+}
+
+// DeleteNamespace deletes the namespace with the given name
+func (c *Client) DeleteNamespace(namespace string) error {
+ return c.Clientset.CoreV1().Namespaces().Delete(namespace, &metav1.DeleteOptions{})
+}
+
+// NodesReady returns the number of nodes ready
+func (c *Client) NodesReady() (ready int, total int, err error) {
+ nodes, err := c.Clientset.CoreV1().Nodes().List(metav1.ListOptions{})
+ if err != nil {
+ return 0, 0, err
+ }
+ total = len(nodes.Items)
+ if total == 0 {
+ return 0, 0, nil
+ }
+ for _, n := range nodes.Items {
+ for _, c := range n.Status.Conditions {
+ if c.Type == "Ready" && c.Status == "True" {
+ ready++
+ break
+ }
+ }
+ }
+
+ return ready, len(nodes.Items), nil
+}
+
+// Version returns the cluster version. It can be used to verify if the cluster
+// is reachable. It will return an error if failed to connect.
+func (c *Client) Version() (string, error) {
+ v, err := c.Clientset.ServerVersion()
+ if err != nil {
+ return "", err
+ }
+
+ return v.String(), nil
+}
diff --git a/src/rsync/pkg/client/patch.go b/src/rsync/pkg/client/patch.go
new file mode 100644
index 00000000..3a620f49
--- /dev/null
+++ b/src/rsync/pkg/client/patch.go
@@ -0,0 +1,216 @@
+/*
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+ http://www.apache.org/licenses/LICENSE-2.0
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+// Based on Code: https://github.com/johandry/klient
+package client
+
+import (
+ "fmt"
+ "os"
+ "time"
+
+ "github.com/jonboulle/clockwork"
+ corev1 "k8s.io/api/core/v1"
+ "k8s.io/apimachinery/pkg/api/errors"
+ "k8s.io/apimachinery/pkg/api/meta"
+ metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
+ "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
+ "k8s.io/apimachinery/pkg/runtime"
+ "k8s.io/apimachinery/pkg/types"
+ "k8s.io/apimachinery/pkg/util/jsonmergepatch"
+ "k8s.io/apimachinery/pkg/util/mergepatch"
+ "k8s.io/apimachinery/pkg/util/strategicpatch"
+ "k8s.io/apimachinery/pkg/util/wait"
+ "k8s.io/cli-runtime/pkg/resource"
+ oapi "k8s.io/kube-openapi/pkg/util/proto"
+ "k8s.io/kubectl/pkg/scheme"
+ "k8s.io/kubectl/pkg/util"
+ "k8s.io/kubectl/pkg/util/openapi"
+)
+
+const (
+ // overwrite if true, automatically resolve conflicts between the modified and live configuration by using values from the modified configuration
+ overwrite = true
+ // maxPatchRetry is the maximum number of conflicts retry for during a patch operation before returning failure
+ maxPatchRetry = 5
+ // backOffPeriod is the period to back off when apply patch results in error.
+ backOffPeriod = 1 * time.Second
+ // how many times we can retry before back off
+ triesBeforeBackOff = 1
+ // force if true, immediately remove resources from API and bypass graceful deletion. Note that immediate deletion of some resources may result in inconsistency or data loss and requires confirmation.
+ force = false
+ // timeout waiting for the resource to be delete if it needs to be recreated
+ timeout = 0
+)
+
+// patch tries to patch an OpenAPI resource
+func patch(info *resource.Info, current runtime.Object) error {
+ // From: k8s.io/kubectl/pkg/cmd/apply/apply.go & patcher.go
+ modified, err := util.GetModifiedConfiguration(info.Object, true, unstructured.UnstructuredJSONScheme)
+ if err != nil {
+ return fmt.Errorf("retrieving modified configuration. %s", err)
+ }
+
+ metadata, _ := meta.Accessor(current)
+ annotationMap := metadata.GetAnnotations()
+ if _, ok := annotationMap[corev1.LastAppliedConfigAnnotation]; !ok {
+ // TODO: Find what to do with the warnings, they should not be printed
+ fmt.Fprintf(os.Stderr, "Warning: apply should be used on resource created by apply")
+ }
+
+ patchBytes, patchObject, err := patchSimple(current, modified, info)
+
+ var getErr error
+ for i := 1; i <= maxPatchRetry && errors.IsConflict(err); i++ {
+ if i > triesBeforeBackOff {
+ clockwork.NewRealClock().Sleep(backOffPeriod)
+ }
+ current, getErr = resource.NewHelper(info.Client, info.Mapping).Get(info.Namespace, info.Name, false)
+ if getErr != nil {
+ return getErr
+ }
+ patchBytes, patchObject, err = patchSimple(current, modified, info)
+ }
+ if err != nil && (errors.IsConflict(err) || errors.IsInvalid(err)) && force {
+ patchBytes, patchObject, err = deleteAndCreate(info, patchBytes)
+ }
+
+ info.Refresh(patchObject, true)
+
+ return nil
+}
+
+func patchSimple(currentObj runtime.Object, modified []byte, info *resource.Info) ([]byte, runtime.Object, error) {
+ // Serialize the current configuration of the object from the server.
+ current, err := runtime.Encode(unstructured.UnstructuredJSONScheme, currentObj)
+ if err != nil {
+ return nil, nil, fmt.Errorf("serializing current configuration. %s", err)
+ }
+
+ // Retrieve the original configuration of the object from the annotation.
+ original, err := util.GetOriginalConfiguration(currentObj)
+ if err != nil {
+ return nil, nil, fmt.Errorf("retrieving original configuration. %s", err)
+ }
+
+ var patchType types.PatchType
+ var patch []byte
+ var lookupPatchMeta strategicpatch.LookupPatchMeta
+ var schema oapi.Schema
+
+ // Create the versioned struct from the type defined in the restmapping
+ // (which is the API version we'll be submitting the patch to)
+ versionedObject, err := scheme.Scheme.New(info.Mapping.GroupVersionKind)
+
+ // DEBUG:
+ // fmt.Printf("Modified: %v\n", string(modified))
+ // fmt.Printf("Current: %v\n", string(current))
+ // fmt.Printf("Original: %v\n", string(original))
+ // fmt.Printf("versionedObj: %v\n", versionedObject)
+ // fmt.Printf("Error: %+v\nIsNotRegisteredError: %t\n", err, runtime.IsNotRegisteredError(err))
+
+ switch {
+ case runtime.IsNotRegisteredError(err):
+ // fall back to generic JSON merge patch
+ patchType = types.MergePatchType
+ preconditions := []mergepatch.PreconditionFunc{mergepatch.RequireKeyUnchanged("apiVersion"),
+ mergepatch.RequireKeyUnchanged("kind"), mergepatch.RequireMetadataKeyUnchanged("name")}
+ patch, err = jsonmergepatch.CreateThreeWayJSONMergePatch(original, modified, current, preconditions...)
+ if err != nil {
+ if mergepatch.IsPreconditionFailed(err) {
+ return nil, nil, fmt.Errorf("At least one of apiVersion, kind and name was changed")
+ }
+ return nil, nil, fmt.Errorf("creating patch. %s", err)
+ }
+ case err != nil:
+ return nil, nil, fmt.Errorf("getting instance of versioned object. %s", err)
+ case err == nil:
+ // Compute a three way strategic merge patch to send to server.
+ patchType = types.StrategicMergePatchType
+
+ // Try to use openapi first if the openapi spec is available and can successfully calculate the patch.
+ // Otherwise, fall back to baked-in types.
+ var openapiSchema openapi.Resources
+ if openapiSchema != nil {
+ if schema = openapiSchema.LookupResource(info.Mapping.GroupVersionKind); schema != nil {
+ lookupPatchMeta = strategicpatch.PatchMetaFromOpenAPI{Schema: schema}
+ if openapiPatch, err := strategicpatch.CreateThreeWayMergePatch(original, modified, current, lookupPatchMeta, overwrite); err == nil {
+ patchType = types.StrategicMergePatchType
+ patch = openapiPatch
+ // TODO: In case it's necessary to report warnings
+ // } else {
+ // log.Printf("Warning: error calculating patch from openapi spec: %s", err)
+ }
+ }
+ }
+
+ if patch == nil {
+ lookupPatchMeta, err = strategicpatch.NewPatchMetaFromStruct(versionedObject)
+ if err != nil {
+ return nil, nil, fmt.Errorf("creating patch. %s", err)
+ }
+ patch, err = strategicpatch.CreateThreeWayMergePatch(original, modified, current, lookupPatchMeta, overwrite)
+ if err != nil {
+ return nil, nil, fmt.Errorf("creating patch. %s", err)
+ }
+ }
+ }
+
+ if string(patch) == "{}" {
+ return patch, currentObj, nil
+ }
+
+ patchedObj, err := resource.NewHelper(info.Client, info.Mapping).Patch(info.Namespace, info.Name, patchType, patch, nil)
+ return patch, patchedObj, err
+}
+
+func deleteAndCreate(info *resource.Info, modified []byte) ([]byte, runtime.Object, error) {
+ delOptions := defaultDeleteOptions()
+ if _, err := deleteWithOptions(info, delOptions); err != nil {
+ return nil, nil, err
+ }
+
+ helper := resource.NewHelper(info.Client, info.Mapping)
+
+ // TODO: make a waiter and use it
+ if err := wait.PollImmediate(1*time.Second, time.Duration(timeout), func() (bool, error) {
+ if _, err := helper.Get(info.Namespace, info.Name, false); !errors.IsNotFound(err) {
+ return false, err
+ }
+ return true, nil
+ }); err != nil {
+ return nil, nil, err
+ }
+
+ // TODO: Check what GetModifiedConfiguration does, this could be an encode - decode waste of time
+ // modified, err := util.GetModifiedConfiguration(info.Object, true, unstructured.UnstructuredJSONScheme)
+ // if err != nil {
+ // return nil, nil, fmt.Errorf("retrieving modified configuration. %s", err)
+ // }
+ versionedObject, _, err := unstructured.UnstructuredJSONScheme.Decode(modified, nil, nil)
+ if err != nil {
+ return nil, nil, err
+ }
+
+ options := metav1.CreateOptions{}
+ createdObject, err := helper.Create(info.Namespace, true, versionedObject, &options)
+ if err != nil {
+ // restore the original object if we fail to create the new one
+ // but still propagate and advertise error to user
+ recreated, recreateErr := helper.Create(info.Namespace, true, info.Object, &options)
+ if recreateErr != nil {
+ err = fmt.Errorf("An error occurred force-replacing the existing object with the newly provided one. %v.\n\nAdditionally, an error occurred attempting to restore the original object: %v", err, recreateErr)
+ } else {
+ createdObject = recreated
+ }
+ }
+ return modified, createdObject, err
+}
diff --git a/src/rsync/pkg/client/replace.go b/src/rsync/pkg/client/replace.go
new file mode 100644
index 00000000..9aba8aca
--- /dev/null
+++ b/src/rsync/pkg/client/replace.go
@@ -0,0 +1,51 @@
+/*
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+ http://www.apache.org/licenses/LICENSE-2.0
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+// Based on Code: https://github.com/johandry/klient
+package client
+
+import (
+ "k8s.io/cli-runtime/pkg/resource"
+)
+
+// Replace creates a resource with the given content
+func (c *Client) Replace(content []byte) error {
+ r := c.ResultForContent(content, nil)
+ return c.ReplaceResource(r)
+}
+
+// ReplaceFiles create the resource(s) from the given filenames (file, directory or STDIN) or HTTP URLs
+func (c *Client) ReplaceFiles(filenames ...string) error {
+ r := c.ResultForFilenameParam(filenames, nil)
+ return c.ReplaceResource(r)
+}
+
+// ReplaceResource applies the given resource. Create the resources with `ResultForFilenameParam` or `ResultForContent`
+func (c *Client) ReplaceResource(r *resource.Result) error {
+ if err := r.Err(); err != nil {
+ return err
+ }
+ return r.Visit(replace)
+}
+
+func replace(info *resource.Info, err error) error {
+ if err != nil {
+ return failedTo("replace", info, err)
+ }
+
+ obj, err := resource.NewHelper(info.Client, info.Mapping).Replace(info.Namespace, info.Name, true, info.Object)
+ if err != nil {
+ return failedTo("replace", info, err)
+ }
+ info.Refresh(obj, true)
+
+ return nil
+}