diff options
Diffstat (limited to 'src/rsync/pkg/client')
-rw-r--r-- | src/rsync/pkg/client/apply.go | 93 | ||||
-rw-r--r-- | src/rsync/pkg/client/client.go | 190 | ||||
-rw-r--r-- | src/rsync/pkg/client/create.go | 55 | ||||
-rw-r--r-- | src/rsync/pkg/client/delete.go | 95 | ||||
-rw-r--r-- | src/rsync/pkg/client/factory.go | 291 | ||||
-rw-r--r-- | src/rsync/pkg/client/helpers.go | 75 | ||||
-rw-r--r-- | src/rsync/pkg/client/patch.go | 216 | ||||
-rw-r--r-- | src/rsync/pkg/client/replace.go | 51 |
8 files changed, 1066 insertions, 0 deletions
diff --git a/src/rsync/pkg/client/apply.go b/src/rsync/pkg/client/apply.go new file mode 100644 index 00000000..96233a31 --- /dev/null +++ b/src/rsync/pkg/client/apply.go @@ -0,0 +1,93 @@ +/* +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ +// Based on Code: https://github.com/johandry/klient +package client + +import ( + "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/types" + "k8s.io/cli-runtime/pkg/resource" + "k8s.io/kubectl/pkg/util" +) + +// Apply creates a resource with the given content +func (c *Client) Apply(content []byte) error { + r := c.ResultForContent(content, nil) + return c.ApplyResource(r) +} + +// ApplyFiles create the resource(s) from the given filenames (file, directory or STDIN) or HTTP URLs +func (c *Client) ApplyFiles(filenames ...string) error { + r := c.ResultForFilenameParam(filenames, nil) + return c.ApplyResource(r) +} + +// ApplyResource applies the given resource. Create the resources with `ResultForFilenameParam` or `ResultForContent` +func (c *Client) ApplyResource(r *resource.Result) error { + if err := r.Err(); err != nil { + return err + } + + // Is ServerSideApply requested + if c.ServerSideApply { + return r.Visit(serverSideApply) + } + + return r.Visit(apply) +} + +func apply(info *resource.Info, err error) error { + if err != nil { + return failedTo("apply", info, err) + } + + // If it does not exists, just create it + current, err := resource.NewHelper(info.Client, info.Mapping).Get(info.Namespace, info.Name, info.Export) + if err != nil { + if !errors.IsNotFound(err) { + return failedTo("retrieve current configuration", info, err) + } + if err := util.CreateApplyAnnotation(info.Object, unstructured.UnstructuredJSONScheme); err != nil { + return failedTo("set annotation", info, err) + } + return create(info, nil) + } + + // If exists, patch it + return patch(info, current) +} + +func serverSideApply(info *resource.Info, err error) error { + if err != nil { + return failedTo("serverside apply", info, err) + } + + data, err := runtime.Encode(unstructured.UnstructuredJSONScheme, info.Object) + if err != nil { + return failedTo("encode for the serverside apply", info, err) + } + + options := metav1.PatchOptions{ + // TODO: Find out how to get the force conflict flag + // Force: &forceConflicts, + // FieldManager: FieldManager, + } + obj, err := resource.NewHelper(info.Client, info.Mapping).Patch(info.Namespace, info.Name, types.ApplyPatchType, data, &options) + if err != nil { + return failedTo("serverside patch", info, err) + } + info.Refresh(obj, true) + return nil +} diff --git a/src/rsync/pkg/client/client.go b/src/rsync/pkg/client/client.go new file mode 100644 index 00000000..0eaded22 --- /dev/null +++ b/src/rsync/pkg/client/client.go @@ -0,0 +1,190 @@ +/* +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ +// Based on Code: https://github.com/johandry/klient +package client + +import ( + "bytes" + "fmt" + "io" + + v1 "k8s.io/api/core/v1" + "k8s.io/cli-runtime/pkg/resource" + "k8s.io/client-go/kubernetes" + "k8s.io/kubectl/pkg/validation" +) + +// DefaultValidation default action to validate. If `true` all resources by +// default will be validated. +const DefaultValidation = true + +// Client is a kubernetes client, like `kubectl` +type Client struct { + Clientset *kubernetes.Clientset + factory *factory + validator validation.Schema + namespace string + enforceNamespace bool + forceConflicts bool + ServerSideApply bool +} + +// Result is an alias for the Kubernetes CLI runtime resource.Result +type Result = resource.Result + +// BuilderOptions parameters to create a Resource Builder +type BuilderOptions struct { + Unstructured bool + Validate bool + Namespace string + LabelSelector string + FieldSelector string + All bool + AllNamespaces bool +} + +// NewBuilderOptions creates a BuilderOptions with the default values for +// the parameters to create a Resource Builder +func NewBuilderOptions() *BuilderOptions { + return &BuilderOptions{ + Unstructured: true, + Validate: true, + } +} + +// NewE creates a kubernetes client, returns an error if fail +func NewE(context, kubeconfig string, ns string) (*Client, error) { + var namespace string + var enforceNamespace bool + var err error + factory := newFactory(context, kubeconfig) + + // If `true` it will always validate the given objects/resources + // Unless something different is specified in the NewBuilderOptions + validator, _ := factory.Validator(DefaultValidation) + + if ns == "" { + namespace, enforceNamespace, err = factory.ToRawKubeConfigLoader().Namespace() + if err != nil { + namespace = v1.NamespaceDefault + enforceNamespace = true + } + } else { + namespace = ns + enforceNamespace = false + } + clientset, err := factory.KubernetesClientSet() + if err != nil { + return nil, err + } + if clientset == nil { + return nil, fmt.Errorf("cannot create a clientset from given context and kubeconfig") + } + + return &Client{ + factory: factory, + Clientset: clientset, + validator: validator, + namespace: namespace, + enforceNamespace: enforceNamespace, + }, nil +} + +// New creates a kubernetes client +func New(context, kubeconfig string, namespace string) *Client { + client, _ := NewE(context, kubeconfig, namespace) + return client +} + +// Builder creates a resource builder +func (c *Client) builder(opt *BuilderOptions) *resource.Builder { + validator := c.validator + namespace := c.namespace + + if opt == nil { + opt = NewBuilderOptions() + } else { + if opt.Validate != DefaultValidation { + validator, _ = c.factory.Validator(opt.Validate) + } + if opt.Namespace != "" { + namespace = opt.Namespace + } + } + + b := c.factory.NewBuilder() + if opt.Unstructured { + b = b.Unstructured() + } + + return b. + Schema(validator). + ContinueOnError(). + NamespaceParam(namespace).DefaultNamespace() +} + +// ResultForFilenameParam returns the builder results for the given list of files or URLs +func (c *Client) ResultForFilenameParam(filenames []string, opt *BuilderOptions) *Result { + filenameOptions := &resource.FilenameOptions{ + Recursive: false, + Filenames: filenames, + } + + return c.builder(opt). + FilenameParam(c.enforceNamespace, filenameOptions). + Flatten(). + Do() +} + +// ResultForReader returns the builder results for the given reader +func (c *Client) ResultForReader(r io.Reader, opt *BuilderOptions) *Result { + return c.builder(opt). + Stream(r, ""). + Flatten(). + Do() +} + +// func (c *Client) ResultForName(opt *BuilderOptions, names ...string) *Result { +// return c.builder(opt). +// LabelSelectorParam(opt.LabelSelector). +// FieldSelectorParam(opt.FieldSelector). +// SelectAllParam(opt.All). +// AllNamespaces(opt.AllNamespaces). +// ResourceTypeOrNameArgs(false, names...).RequireObject(false). +// Flatten(). +// Do() +// } + +// ResultForContent returns the builder results for the given content +func (c *Client) ResultForContent(content []byte, opt *BuilderOptions) *Result { + b := bytes.NewBuffer(content) + return c.ResultForReader(b, opt) +} + +func failedTo(action string, info *resource.Info, err error) error { + var resKind string + if info.Mapping != nil { + resKind = info.Mapping.GroupVersionKind.Kind + " " + } + + return fmt.Errorf("cannot %s object Kind: %q, Name: %q, Namespace: %q. %s", action, resKind, info.Name, info.Namespace, err) +} + +// IsReachable tests connectivity to the cluster +func (c *Client) IsReachable() error { + client, _ := c.factory.KubernetesClientSet() + _, err := client.ServerVersion() + if err != nil { + return fmt.Errorf("Kubernetes cluster unreachable") + } + return nil +}
\ No newline at end of file diff --git a/src/rsync/pkg/client/create.go b/src/rsync/pkg/client/create.go new file mode 100644 index 00000000..755420ca --- /dev/null +++ b/src/rsync/pkg/client/create.go @@ -0,0 +1,55 @@ +/* +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ +// Based on Code: https://github.com/johandry/klient +package client + +import ( + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/cli-runtime/pkg/resource" +) + +// Create creates a resource with the given content +func (c *Client) Create(content []byte) error { + r := c.ResultForContent(content, nil) + return c.CreateResource(r) +} + +// CreateFile creates a resource with the given content +func (c *Client) CreateFile(filenames ...string) error { + r := c.ResultForFilenameParam(filenames, nil) + return c.CreateResource(r) +} + +// CreateResource creates the given resource. Create the resources with `ResultForFilenameParam` or `ResultForContent` +func (c *Client) CreateResource(r *resource.Result) error { + if err := r.Err(); err != nil { + return err + } + return r.Visit(create) +} + +func create(info *resource.Info, err error) error { + if err != nil { + return failedTo("create", info, err) + } + + // TODO: If will be allow to do create then apply, here must be added the annotation as in Apply/Patch + + options := metav1.CreateOptions{} + obj, err := resource.NewHelper(info.Client, info.Mapping).Create(info.Namespace, true, info.Object, &options) + if err != nil { + return failedTo("create", info, err) + } + info.Refresh(obj, true) + + return nil +} diff --git a/src/rsync/pkg/client/delete.go b/src/rsync/pkg/client/delete.go new file mode 100644 index 00000000..1e6aa46a --- /dev/null +++ b/src/rsync/pkg/client/delete.go @@ -0,0 +1,95 @@ +/* +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ +// Based on Code: https://github.com/johandry/klient +package client + +import ( + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/cli-runtime/pkg/resource" +) + +const ( + // Period of time in seconds given to the resource to terminate gracefully when delete it (used when require to recreate the resource). Ignored if negative. Set to 1 for immediate shutdown. Can only be set to 0 when force is true (force deletion) + gracePeriod = -1 + // If true, cascade the deletion of the resources managed by this resource (e.g. Pods created by a ReplicationController). + cascade = true +) + +// Delete creates a resource with the given content +func (c *Client) Delete(content []byte) error { + r := c.ResultForContent(content, nil) + return c.DeleteResource(r) +} + +// DeleteFiles create the resource(s) from the given filenames (file, directory or STDIN) or HTTP URLs +func (c *Client) DeleteFiles(filenames ...string) error { + r := c.ResultForFilenameParam(filenames, nil) + return c.DeleteResource(r) +} + +// DeleteResource applies the given resource. Create the resources with `ResultForFilenameParam` or `ResultForContent` +func (c *Client) DeleteResource(r *resource.Result) error { + if err := r.Err(); err != nil { + return err + } + return r.Visit(delete) +} + +func delete(info *resource.Info, err error) error { + if err != nil { + return failedTo("delete", info, err) + } + + // TODO: Background or Foreground? + // policy := metav1.DeletePropagationForeground + policy := metav1.DeletePropagationBackground + options := metav1.DeleteOptions{ + PropagationPolicy: &policy, + } + + if _, err := deleteWithOptions(info, &options); err != nil { + return failedTo("delete", info, err) + } + return nil +} + +func defaultDeleteOptions() *metav1.DeleteOptions { + // TODO: Change DryRun value when DryRun is implemented + dryRun := false + + options := &metav1.DeleteOptions{} + if gracePeriod >= 0 { + options = metav1.NewDeleteOptions(int64(gracePeriod)) + } + + if dryRun { + options.DryRun = []string{metav1.DryRunAll} + } + + // TODO: Background or Foreground? + // policy := metav1.DeletePropagationBackground + policy := metav1.DeletePropagationForeground + if !cascade { + policy = metav1.DeletePropagationOrphan + } + options.PropagationPolicy = &policy + + return options +} + +func deleteWithOptions(info *resource.Info, options *metav1.DeleteOptions) (runtime.Object, error) { + if options == nil { + options = defaultDeleteOptions() + } + return resource.NewHelper(info.Client, info.Mapping).DeleteWithOptions(info.Namespace, info.Name, options) +} diff --git a/src/rsync/pkg/client/factory.go b/src/rsync/pkg/client/factory.go new file mode 100644 index 00000000..39c1a177 --- /dev/null +++ b/src/rsync/pkg/client/factory.go @@ -0,0 +1,291 @@ +/* +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ +// Based on Code: https://github.com/johandry/klient +package client + +import ( + "path/filepath" + "regexp" + "strings" + "sync" + "time" + + corev1 "k8s.io/api/core/v1" + apiextv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1" + apiextv1beta1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1beta1" + "k8s.io/apimachinery/pkg/api/meta" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/cli-runtime/pkg/genericclioptions" + "k8s.io/cli-runtime/pkg/resource" + "k8s.io/client-go/discovery" + diskcached "k8s.io/client-go/discovery/cached/disk" + "k8s.io/client-go/dynamic" + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/kubernetes/scheme" + "k8s.io/client-go/rest" + "k8s.io/client-go/restmapper" + "k8s.io/client-go/tools/clientcmd" + "k8s.io/client-go/util/homedir" + "k8s.io/kubectl/pkg/util/openapi" + openapivalidation "k8s.io/kubectl/pkg/util/openapi/validation" + "k8s.io/kubectl/pkg/validation" +) + +// factory implements the kubectl Factory interface which also requieres to +// implement the genericclioptions.RESTClientGetter interface. +// The Factory inerface provides abstractions that allow the Kubectl command to +// be extended across multiple types of resources and different API sets. +type factory struct { + KubeConfig string + Context string + initOpenAPIGetterOnce sync.Once + openAPIGetter openapi.Getter +} + +// If multiple clients are created, this sync.once make sure the CRDs are added +// only once into the API extensions v1 and v1beta schemes +var addToSchemeOnce sync.Once + +var _ genericclioptions.RESTClientGetter = &factory{} + +// newFactory creates a new client factory which encapsulate a REST client getter +func newFactory(context, kubeconfig string) *factory { + factory := &factory{ + KubeConfig: kubeconfig, + Context: context, + } + + // From: helm/pkg/kube/client.go > func New() + // Add CRDs to the scheme. They are missing by default. + addToSchemeOnce.Do(func() { + if err := apiextv1.AddToScheme(scheme.Scheme); err != nil { + panic(err) + } + if err := apiextv1beta1.AddToScheme(scheme.Scheme); err != nil { + panic(err) + } + }) + + return factory +} + +// BuildRESTConfig builds a kubernetes REST client factory using the following +// rules from ToRawKubeConfigLoader() +// func BuildRESTConfig(context, kubeconfig string) (*rest.Config, error) { +// return newFactory(context, kubeconfig).ToRESTConfig() +// } + +// ToRESTConfig creates a kubernetes REST client factory. +// It's required to implement the interface genericclioptions.RESTClientGetter +func (f *factory) ToRESTConfig() (*rest.Config, error) { + // From: k8s.io/kubectl/pkg/cmd/util/kubectl_match_version.go > func setKubernetesDefaults() + config, err := f.ToRawKubeConfigLoader().ClientConfig() + if err != nil { + return nil, err + } + + if config.GroupVersion == nil { + config.GroupVersion = &schema.GroupVersion{Group: "", Version: "v1"} + } + if config.APIPath == "" { + config.APIPath = "/api" + } + if config.NegotiatedSerializer == nil { + // This codec config ensures the resources are not converted. Therefore, resources + // will not be round-tripped through internal versions. Defaulting does not happen + // on the client. + config.NegotiatedSerializer = scheme.Codecs.WithoutConversion() + } + + rest.SetKubernetesDefaults(config) + return config, nil +} + +// ToRawKubeConfigLoader creates a client factory using the following rules: +// 1. builds from the given kubeconfig path, if not empty +// 2. use the in cluster factory if running in-cluster +// 3. gets the factory from KUBECONFIG env var +// 4. Uses $HOME/.kube/factory +// It's required to implement the interface genericclioptions.RESTClientGetter +func (f *factory) ToRawKubeConfigLoader() clientcmd.ClientConfig { + loadingRules := clientcmd.NewDefaultClientConfigLoadingRules() + loadingRules.DefaultClientConfig = &clientcmd.DefaultClientConfig + if len(f.KubeConfig) != 0 { + loadingRules.ExplicitPath = f.KubeConfig + } + configOverrides := &clientcmd.ConfigOverrides{ + ClusterDefaults: clientcmd.ClusterDefaults, + } + if len(f.Context) != 0 { + configOverrides.CurrentContext = f.Context + } + + return clientcmd.NewNonInteractiveDeferredLoadingClientConfig(loadingRules, configOverrides) +} + +// overlyCautiousIllegalFileCharacters matches characters that *might* not be supported. Windows is really restrictive, so this is really restrictive +var overlyCautiousIllegalFileCharacters = regexp.MustCompile(`[^(\w/\.)]`) + +// ToDiscoveryClient returns a CachedDiscoveryInterface using a computed RESTConfig +// It's required to implement the interface genericclioptions.RESTClientGetter +func (f *factory) ToDiscoveryClient() (discovery.CachedDiscoveryInterface, error) { + // From: k8s.io/cli-runtime/pkg/genericclioptions/config_flags.go > func (*configFlags) ToDiscoveryClient() + factory, err := f.ToRESTConfig() + if err != nil { + return nil, err + } + factory.Burst = 100 + defaultHTTPCacheDir := filepath.Join(homedir.HomeDir(), ".kube", "http-cache") + + // takes the parentDir and the host and comes up with a "usually non-colliding" name for the discoveryCacheDir + parentDir := filepath.Join(homedir.HomeDir(), ".kube", "cache", "discovery") + // strip the optional scheme from host if its there: + schemelessHost := strings.Replace(strings.Replace(factory.Host, "https://", "", 1), "http://", "", 1) + // now do a simple collapse of non-AZ09 characters. Collisions are possible but unlikely. Even if we do collide the problem is short lived + safeHost := overlyCautiousIllegalFileCharacters.ReplaceAllString(schemelessHost, "_") + discoveryCacheDir := filepath.Join(parentDir, safeHost) + + return diskcached.NewCachedDiscoveryClientForConfig(factory, discoveryCacheDir, defaultHTTPCacheDir, time.Duration(10*time.Minute)) +} + +// ToRESTMapper returns a mapper +// It's required to implement the interface genericclioptions.RESTClientGetter +func (f *factory) ToRESTMapper() (meta.RESTMapper, error) { + // From: k8s.io/cli-runtime/pkg/genericclioptions/config_flags.go > func (*configFlags) ToRESTMapper() + discoveryClient, err := f.ToDiscoveryClient() + if err != nil { + return nil, err + } + + mapper := restmapper.NewDeferredDiscoveryRESTMapper(discoveryClient) + expander := restmapper.NewShortcutExpander(mapper, discoveryClient) + return expander, nil +} + +// KubernetesClientSet creates a kubernetes clientset from the configuration +// It's required to implement the Factory interface +func (f *factory) KubernetesClientSet() (*kubernetes.Clientset, error) { + // From: k8s.io/kubectl/pkg/cmd/util/factory_client_access.go > func (*factoryImpl) KubernetesClientSet() + factory, err := f.ToRESTConfig() + if err != nil { + return nil, err + } + return kubernetes.NewForConfig(factory) +} + +// DynamicClient creates a dynamic client from the configuration +// It's required to implement the Factory interface +func (f *factory) DynamicClient() (dynamic.Interface, error) { + // From: k8s.io/kubectl/pkg/cmd/util/factory_client_access.go > func (*factoryImpl) DynamicClient() + factory, err := f.ToRESTConfig() + if err != nil { + return nil, err + } + return dynamic.NewForConfig(factory) +} + +// NewBuilder returns a new resource builder for structured api objects. +// It's required to implement the Factory interface +func (f *factory) NewBuilder() *resource.Builder { + // From: k8s.io/kubectl/pkg/cmd/util/factory_client_access.go > func (*factoryImpl) NewBuilder() + return resource.NewBuilder(f) +} + +// RESTClient creates a REST client from the configuration +// It's required to implement the Factory interface +func (f *factory) RESTClient() (*rest.RESTClient, error) { + // From: k8s.io/kubectl/pkg/cmd/util/factory_client_access.go > func (*factoryImpl) RESTClient() + factory, err := f.ToRESTConfig() + if err != nil { + return nil, err + } + return rest.RESTClientFor(factory) +} + +func (f *factory) configForMapping(mapping *meta.RESTMapping) (*rest.Config, error) { + factory, err := f.ToRESTConfig() + if err != nil { + return nil, err + } + + gvk := mapping.GroupVersionKind + factory.APIPath = "/apis" + if gvk.Group == corev1.GroupName { + factory.APIPath = "/api" + } + gv := gvk.GroupVersion() + factory.GroupVersion = &gv + + return factory, nil +} + +// ClientForMapping creates a resource REST client from the given mappings +// It's required to implement the Factory interface +func (f *factory) ClientForMapping(mapping *meta.RESTMapping) (resource.RESTClient, error) { + // From: k8s.io/kubectl/pkg/cmd/util/factory_client_access.go > func (*factoryImpl) ClientForMapping() + factory, err := f.configForMapping(mapping) + if err != nil { + return nil, err + } + + return rest.RESTClientFor(factory) +} + +// UnstructuredClientForMapping creates a unstructured resource REST client from the given mappings +// It's required to implement the Factory interface +func (f *factory) UnstructuredClientForMapping(mapping *meta.RESTMapping) (resource.RESTClient, error) { + // From: k8s.io/kubectl/pkg/cmd/util/factory_client_access.go > func (*factoryImpl) UnstructuredClientForMapping() + factory, err := f.configForMapping(mapping) + if err != nil { + return nil, err + } + factory.ContentConfig = resource.UnstructuredPlusDefaultContentConfig() + + return rest.RESTClientFor(factory) +} + +// Validator returns a schema that can validate objects stored on disk. +// It's required to implement the Factory interface +func (f *factory) Validator(validate bool) (validation.Schema, error) { + // From: k8s.io/kubectl/pkg/cmd/util/factory_client_access.go > func (*factoryImpl) Validator(bool) + if !validate { + return validation.NullSchema{}, nil + } + + resources, err := f.OpenAPISchema() + if err != nil { + return nil, err + } + + return validation.ConjunctiveSchema{ + openapivalidation.NewSchemaValidation(resources), + validation.NoDoubleKeySchema{}, + }, nil +} + +// OpenAPISchema returns metadata and structural information about Kubernetes object definitions. +// It's required to implement the Factory interface +func (f *factory) OpenAPISchema() (openapi.Resources, error) { + // From: k8s.io/kubectl/pkg/cmd/util/factory_client-access.go > func (*factoryImpl) OpenAPISchema() + discovery, err := f.ToDiscoveryClient() + if err != nil { + return nil, err + } + + f.initOpenAPIGetterOnce.Do(func() { + // Create the caching OpenAPIGetter + f.openAPIGetter = openapi.NewOpenAPIGetter(discovery) + }) + + // Delegate to the OpenAPIGetter + return f.openAPIGetter.Get() +} diff --git a/src/rsync/pkg/client/helpers.go b/src/rsync/pkg/client/helpers.go new file mode 100644 index 00000000..de6ed93f --- /dev/null +++ b/src/rsync/pkg/client/helpers.go @@ -0,0 +1,75 @@ +/* +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ +// Based on Code: https://github.com/johandry/klient +package client + +import ( + v1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" +) + +// CreateNamespace creates a namespace with the given name +func (c *Client) CreateNamespace(namespace string) error { + ns := &v1.Namespace{ + ObjectMeta: metav1.ObjectMeta{ + Name: namespace, + Labels: map[string]string{ + "name": namespace, + }, + }, + } + _, err := c.Clientset.CoreV1().Namespaces().Create(ns) + // if errors.IsAlreadyExists(err) { + // // If it failed because the NS is already there, then do not return such error + // return nil + // } + + return err +} + +// DeleteNamespace deletes the namespace with the given name +func (c *Client) DeleteNamespace(namespace string) error { + return c.Clientset.CoreV1().Namespaces().Delete(namespace, &metav1.DeleteOptions{}) +} + +// NodesReady returns the number of nodes ready +func (c *Client) NodesReady() (ready int, total int, err error) { + nodes, err := c.Clientset.CoreV1().Nodes().List(metav1.ListOptions{}) + if err != nil { + return 0, 0, err + } + total = len(nodes.Items) + if total == 0 { + return 0, 0, nil + } + for _, n := range nodes.Items { + for _, c := range n.Status.Conditions { + if c.Type == "Ready" && c.Status == "True" { + ready++ + break + } + } + } + + return ready, len(nodes.Items), nil +} + +// Version returns the cluster version. It can be used to verify if the cluster +// is reachable. It will return an error if failed to connect. +func (c *Client) Version() (string, error) { + v, err := c.Clientset.ServerVersion() + if err != nil { + return "", err + } + + return v.String(), nil +} diff --git a/src/rsync/pkg/client/patch.go b/src/rsync/pkg/client/patch.go new file mode 100644 index 00000000..3a620f49 --- /dev/null +++ b/src/rsync/pkg/client/patch.go @@ -0,0 +1,216 @@ +/* +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ +// Based on Code: https://github.com/johandry/klient +package client + +import ( + "fmt" + "os" + "time" + + "github.com/jonboulle/clockwork" + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/api/meta" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/util/jsonmergepatch" + "k8s.io/apimachinery/pkg/util/mergepatch" + "k8s.io/apimachinery/pkg/util/strategicpatch" + "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/cli-runtime/pkg/resource" + oapi "k8s.io/kube-openapi/pkg/util/proto" + "k8s.io/kubectl/pkg/scheme" + "k8s.io/kubectl/pkg/util" + "k8s.io/kubectl/pkg/util/openapi" +) + +const ( + // overwrite if true, automatically resolve conflicts between the modified and live configuration by using values from the modified configuration + overwrite = true + // maxPatchRetry is the maximum number of conflicts retry for during a patch operation before returning failure + maxPatchRetry = 5 + // backOffPeriod is the period to back off when apply patch results in error. + backOffPeriod = 1 * time.Second + // how many times we can retry before back off + triesBeforeBackOff = 1 + // force if true, immediately remove resources from API and bypass graceful deletion. Note that immediate deletion of some resources may result in inconsistency or data loss and requires confirmation. + force = false + // timeout waiting for the resource to be delete if it needs to be recreated + timeout = 0 +) + +// patch tries to patch an OpenAPI resource +func patch(info *resource.Info, current runtime.Object) error { + // From: k8s.io/kubectl/pkg/cmd/apply/apply.go & patcher.go + modified, err := util.GetModifiedConfiguration(info.Object, true, unstructured.UnstructuredJSONScheme) + if err != nil { + return fmt.Errorf("retrieving modified configuration. %s", err) + } + + metadata, _ := meta.Accessor(current) + annotationMap := metadata.GetAnnotations() + if _, ok := annotationMap[corev1.LastAppliedConfigAnnotation]; !ok { + // TODO: Find what to do with the warnings, they should not be printed + fmt.Fprintf(os.Stderr, "Warning: apply should be used on resource created by apply") + } + + patchBytes, patchObject, err := patchSimple(current, modified, info) + + var getErr error + for i := 1; i <= maxPatchRetry && errors.IsConflict(err); i++ { + if i > triesBeforeBackOff { + clockwork.NewRealClock().Sleep(backOffPeriod) + } + current, getErr = resource.NewHelper(info.Client, info.Mapping).Get(info.Namespace, info.Name, false) + if getErr != nil { + return getErr + } + patchBytes, patchObject, err = patchSimple(current, modified, info) + } + if err != nil && (errors.IsConflict(err) || errors.IsInvalid(err)) && force { + patchBytes, patchObject, err = deleteAndCreate(info, patchBytes) + } + + info.Refresh(patchObject, true) + + return nil +} + +func patchSimple(currentObj runtime.Object, modified []byte, info *resource.Info) ([]byte, runtime.Object, error) { + // Serialize the current configuration of the object from the server. + current, err := runtime.Encode(unstructured.UnstructuredJSONScheme, currentObj) + if err != nil { + return nil, nil, fmt.Errorf("serializing current configuration. %s", err) + } + + // Retrieve the original configuration of the object from the annotation. + original, err := util.GetOriginalConfiguration(currentObj) + if err != nil { + return nil, nil, fmt.Errorf("retrieving original configuration. %s", err) + } + + var patchType types.PatchType + var patch []byte + var lookupPatchMeta strategicpatch.LookupPatchMeta + var schema oapi.Schema + + // Create the versioned struct from the type defined in the restmapping + // (which is the API version we'll be submitting the patch to) + versionedObject, err := scheme.Scheme.New(info.Mapping.GroupVersionKind) + + // DEBUG: + // fmt.Printf("Modified: %v\n", string(modified)) + // fmt.Printf("Current: %v\n", string(current)) + // fmt.Printf("Original: %v\n", string(original)) + // fmt.Printf("versionedObj: %v\n", versionedObject) + // fmt.Printf("Error: %+v\nIsNotRegisteredError: %t\n", err, runtime.IsNotRegisteredError(err)) + + switch { + case runtime.IsNotRegisteredError(err): + // fall back to generic JSON merge patch + patchType = types.MergePatchType + preconditions := []mergepatch.PreconditionFunc{mergepatch.RequireKeyUnchanged("apiVersion"), + mergepatch.RequireKeyUnchanged("kind"), mergepatch.RequireMetadataKeyUnchanged("name")} + patch, err = jsonmergepatch.CreateThreeWayJSONMergePatch(original, modified, current, preconditions...) + if err != nil { + if mergepatch.IsPreconditionFailed(err) { + return nil, nil, fmt.Errorf("At least one of apiVersion, kind and name was changed") + } + return nil, nil, fmt.Errorf("creating patch. %s", err) + } + case err != nil: + return nil, nil, fmt.Errorf("getting instance of versioned object. %s", err) + case err == nil: + // Compute a three way strategic merge patch to send to server. + patchType = types.StrategicMergePatchType + + // Try to use openapi first if the openapi spec is available and can successfully calculate the patch. + // Otherwise, fall back to baked-in types. + var openapiSchema openapi.Resources + if openapiSchema != nil { + if schema = openapiSchema.LookupResource(info.Mapping.GroupVersionKind); schema != nil { + lookupPatchMeta = strategicpatch.PatchMetaFromOpenAPI{Schema: schema} + if openapiPatch, err := strategicpatch.CreateThreeWayMergePatch(original, modified, current, lookupPatchMeta, overwrite); err == nil { + patchType = types.StrategicMergePatchType + patch = openapiPatch + // TODO: In case it's necessary to report warnings + // } else { + // log.Printf("Warning: error calculating patch from openapi spec: %s", err) + } + } + } + + if patch == nil { + lookupPatchMeta, err = strategicpatch.NewPatchMetaFromStruct(versionedObject) + if err != nil { + return nil, nil, fmt.Errorf("creating patch. %s", err) + } + patch, err = strategicpatch.CreateThreeWayMergePatch(original, modified, current, lookupPatchMeta, overwrite) + if err != nil { + return nil, nil, fmt.Errorf("creating patch. %s", err) + } + } + } + + if string(patch) == "{}" { + return patch, currentObj, nil + } + + patchedObj, err := resource.NewHelper(info.Client, info.Mapping).Patch(info.Namespace, info.Name, patchType, patch, nil) + return patch, patchedObj, err +} + +func deleteAndCreate(info *resource.Info, modified []byte) ([]byte, runtime.Object, error) { + delOptions := defaultDeleteOptions() + if _, err := deleteWithOptions(info, delOptions); err != nil { + return nil, nil, err + } + + helper := resource.NewHelper(info.Client, info.Mapping) + + // TODO: make a waiter and use it + if err := wait.PollImmediate(1*time.Second, time.Duration(timeout), func() (bool, error) { + if _, err := helper.Get(info.Namespace, info.Name, false); !errors.IsNotFound(err) { + return false, err + } + return true, nil + }); err != nil { + return nil, nil, err + } + + // TODO: Check what GetModifiedConfiguration does, this could be an encode - decode waste of time + // modified, err := util.GetModifiedConfiguration(info.Object, true, unstructured.UnstructuredJSONScheme) + // if err != nil { + // return nil, nil, fmt.Errorf("retrieving modified configuration. %s", err) + // } + versionedObject, _, err := unstructured.UnstructuredJSONScheme.Decode(modified, nil, nil) + if err != nil { + return nil, nil, err + } + + options := metav1.CreateOptions{} + createdObject, err := helper.Create(info.Namespace, true, versionedObject, &options) + if err != nil { + // restore the original object if we fail to create the new one + // but still propagate and advertise error to user + recreated, recreateErr := helper.Create(info.Namespace, true, info.Object, &options) + if recreateErr != nil { + err = fmt.Errorf("An error occurred force-replacing the existing object with the newly provided one. %v.\n\nAdditionally, an error occurred attempting to restore the original object: %v", err, recreateErr) + } else { + createdObject = recreated + } + } + return modified, createdObject, err +} diff --git a/src/rsync/pkg/client/replace.go b/src/rsync/pkg/client/replace.go new file mode 100644 index 00000000..9aba8aca --- /dev/null +++ b/src/rsync/pkg/client/replace.go @@ -0,0 +1,51 @@ +/* +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ +// Based on Code: https://github.com/johandry/klient +package client + +import ( + "k8s.io/cli-runtime/pkg/resource" +) + +// Replace creates a resource with the given content +func (c *Client) Replace(content []byte) error { + r := c.ResultForContent(content, nil) + return c.ReplaceResource(r) +} + +// ReplaceFiles create the resource(s) from the given filenames (file, directory or STDIN) or HTTP URLs +func (c *Client) ReplaceFiles(filenames ...string) error { + r := c.ResultForFilenameParam(filenames, nil) + return c.ReplaceResource(r) +} + +// ReplaceResource applies the given resource. Create the resources with `ResultForFilenameParam` or `ResultForContent` +func (c *Client) ReplaceResource(r *resource.Result) error { + if err := r.Err(); err != nil { + return err + } + return r.Visit(replace) +} + +func replace(info *resource.Info, err error) error { + if err != nil { + return failedTo("replace", info, err) + } + + obj, err := resource.NewHelper(info.Client, info.Mapping).Replace(info.Namespace, info.Name, true, info.Object) + if err != nil { + return failedTo("replace", info, err) + } + info.Refresh(obj, true) + + return nil +} |