summaryrefslogtreecommitdiffstats
path: root/src/rsync/pkg/client/patch.go
diff options
context:
space:
mode:
Diffstat (limited to 'src/rsync/pkg/client/patch.go')
-rw-r--r--src/rsync/pkg/client/patch.go216
1 files changed, 216 insertions, 0 deletions
diff --git a/src/rsync/pkg/client/patch.go b/src/rsync/pkg/client/patch.go
new file mode 100644
index 00000000..3a620f49
--- /dev/null
+++ b/src/rsync/pkg/client/patch.go
@@ -0,0 +1,216 @@
+/*
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+ http://www.apache.org/licenses/LICENSE-2.0
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+// Based on Code: https://github.com/johandry/klient
+package client
+
+import (
+ "fmt"
+ "os"
+ "time"
+
+ "github.com/jonboulle/clockwork"
+ corev1 "k8s.io/api/core/v1"
+ "k8s.io/apimachinery/pkg/api/errors"
+ "k8s.io/apimachinery/pkg/api/meta"
+ metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
+ "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
+ "k8s.io/apimachinery/pkg/runtime"
+ "k8s.io/apimachinery/pkg/types"
+ "k8s.io/apimachinery/pkg/util/jsonmergepatch"
+ "k8s.io/apimachinery/pkg/util/mergepatch"
+ "k8s.io/apimachinery/pkg/util/strategicpatch"
+ "k8s.io/apimachinery/pkg/util/wait"
+ "k8s.io/cli-runtime/pkg/resource"
+ oapi "k8s.io/kube-openapi/pkg/util/proto"
+ "k8s.io/kubectl/pkg/scheme"
+ "k8s.io/kubectl/pkg/util"
+ "k8s.io/kubectl/pkg/util/openapi"
+)
+
+const (
+ // overwrite if true, automatically resolve conflicts between the modified and live configuration by using values from the modified configuration
+ overwrite = true
+ // maxPatchRetry is the maximum number of conflicts retry for during a patch operation before returning failure
+ maxPatchRetry = 5
+ // backOffPeriod is the period to back off when apply patch results in error.
+ backOffPeriod = 1 * time.Second
+ // how many times we can retry before back off
+ triesBeforeBackOff = 1
+ // force if true, immediately remove resources from API and bypass graceful deletion. Note that immediate deletion of some resources may result in inconsistency or data loss and requires confirmation.
+ force = false
+ // timeout waiting for the resource to be delete if it needs to be recreated
+ timeout = 0
+)
+
+// patch tries to patch an OpenAPI resource
+func patch(info *resource.Info, current runtime.Object) error {
+ // From: k8s.io/kubectl/pkg/cmd/apply/apply.go & patcher.go
+ modified, err := util.GetModifiedConfiguration(info.Object, true, unstructured.UnstructuredJSONScheme)
+ if err != nil {
+ return fmt.Errorf("retrieving modified configuration. %s", err)
+ }
+
+ metadata, _ := meta.Accessor(current)
+ annotationMap := metadata.GetAnnotations()
+ if _, ok := annotationMap[corev1.LastAppliedConfigAnnotation]; !ok {
+ // TODO: Find what to do with the warnings, they should not be printed
+ fmt.Fprintf(os.Stderr, "Warning: apply should be used on resource created by apply")
+ }
+
+ patchBytes, patchObject, err := patchSimple(current, modified, info)
+
+ var getErr error
+ for i := 1; i <= maxPatchRetry && errors.IsConflict(err); i++ {
+ if i > triesBeforeBackOff {
+ clockwork.NewRealClock().Sleep(backOffPeriod)
+ }
+ current, getErr = resource.NewHelper(info.Client, info.Mapping).Get(info.Namespace, info.Name, false)
+ if getErr != nil {
+ return getErr
+ }
+ patchBytes, patchObject, err = patchSimple(current, modified, info)
+ }
+ if err != nil && (errors.IsConflict(err) || errors.IsInvalid(err)) && force {
+ patchBytes, patchObject, err = deleteAndCreate(info, patchBytes)
+ }
+
+ info.Refresh(patchObject, true)
+
+ return nil
+}
+
+func patchSimple(currentObj runtime.Object, modified []byte, info *resource.Info) ([]byte, runtime.Object, error) {
+ // Serialize the current configuration of the object from the server.
+ current, err := runtime.Encode(unstructured.UnstructuredJSONScheme, currentObj)
+ if err != nil {
+ return nil, nil, fmt.Errorf("serializing current configuration. %s", err)
+ }
+
+ // Retrieve the original configuration of the object from the annotation.
+ original, err := util.GetOriginalConfiguration(currentObj)
+ if err != nil {
+ return nil, nil, fmt.Errorf("retrieving original configuration. %s", err)
+ }
+
+ var patchType types.PatchType
+ var patch []byte
+ var lookupPatchMeta strategicpatch.LookupPatchMeta
+ var schema oapi.Schema
+
+ // Create the versioned struct from the type defined in the restmapping
+ // (which is the API version we'll be submitting the patch to)
+ versionedObject, err := scheme.Scheme.New(info.Mapping.GroupVersionKind)
+
+ // DEBUG:
+ // fmt.Printf("Modified: %v\n", string(modified))
+ // fmt.Printf("Current: %v\n", string(current))
+ // fmt.Printf("Original: %v\n", string(original))
+ // fmt.Printf("versionedObj: %v\n", versionedObject)
+ // fmt.Printf("Error: %+v\nIsNotRegisteredError: %t\n", err, runtime.IsNotRegisteredError(err))
+
+ switch {
+ case runtime.IsNotRegisteredError(err):
+ // fall back to generic JSON merge patch
+ patchType = types.MergePatchType
+ preconditions := []mergepatch.PreconditionFunc{mergepatch.RequireKeyUnchanged("apiVersion"),
+ mergepatch.RequireKeyUnchanged("kind"), mergepatch.RequireMetadataKeyUnchanged("name")}
+ patch, err = jsonmergepatch.CreateThreeWayJSONMergePatch(original, modified, current, preconditions...)
+ if err != nil {
+ if mergepatch.IsPreconditionFailed(err) {
+ return nil, nil, fmt.Errorf("At least one of apiVersion, kind and name was changed")
+ }
+ return nil, nil, fmt.Errorf("creating patch. %s", err)
+ }
+ case err != nil:
+ return nil, nil, fmt.Errorf("getting instance of versioned object. %s", err)
+ case err == nil:
+ // Compute a three way strategic merge patch to send to server.
+ patchType = types.StrategicMergePatchType
+
+ // Try to use openapi first if the openapi spec is available and can successfully calculate the patch.
+ // Otherwise, fall back to baked-in types.
+ var openapiSchema openapi.Resources
+ if openapiSchema != nil {
+ if schema = openapiSchema.LookupResource(info.Mapping.GroupVersionKind); schema != nil {
+ lookupPatchMeta = strategicpatch.PatchMetaFromOpenAPI{Schema: schema}
+ if openapiPatch, err := strategicpatch.CreateThreeWayMergePatch(original, modified, current, lookupPatchMeta, overwrite); err == nil {
+ patchType = types.StrategicMergePatchType
+ patch = openapiPatch
+ // TODO: In case it's necessary to report warnings
+ // } else {
+ // log.Printf("Warning: error calculating patch from openapi spec: %s", err)
+ }
+ }
+ }
+
+ if patch == nil {
+ lookupPatchMeta, err = strategicpatch.NewPatchMetaFromStruct(versionedObject)
+ if err != nil {
+ return nil, nil, fmt.Errorf("creating patch. %s", err)
+ }
+ patch, err = strategicpatch.CreateThreeWayMergePatch(original, modified, current, lookupPatchMeta, overwrite)
+ if err != nil {
+ return nil, nil, fmt.Errorf("creating patch. %s", err)
+ }
+ }
+ }
+
+ if string(patch) == "{}" {
+ return patch, currentObj, nil
+ }
+
+ patchedObj, err := resource.NewHelper(info.Client, info.Mapping).Patch(info.Namespace, info.Name, patchType, patch, nil)
+ return patch, patchedObj, err
+}
+
+func deleteAndCreate(info *resource.Info, modified []byte) ([]byte, runtime.Object, error) {
+ delOptions := defaultDeleteOptions()
+ if _, err := deleteWithOptions(info, delOptions); err != nil {
+ return nil, nil, err
+ }
+
+ helper := resource.NewHelper(info.Client, info.Mapping)
+
+ // TODO: make a waiter and use it
+ if err := wait.PollImmediate(1*time.Second, time.Duration(timeout), func() (bool, error) {
+ if _, err := helper.Get(info.Namespace, info.Name, false); !errors.IsNotFound(err) {
+ return false, err
+ }
+ return true, nil
+ }); err != nil {
+ return nil, nil, err
+ }
+
+ // TODO: Check what GetModifiedConfiguration does, this could be an encode - decode waste of time
+ // modified, err := util.GetModifiedConfiguration(info.Object, true, unstructured.UnstructuredJSONScheme)
+ // if err != nil {
+ // return nil, nil, fmt.Errorf("retrieving modified configuration. %s", err)
+ // }
+ versionedObject, _, err := unstructured.UnstructuredJSONScheme.Decode(modified, nil, nil)
+ if err != nil {
+ return nil, nil, err
+ }
+
+ options := metav1.CreateOptions{}
+ createdObject, err := helper.Create(info.Namespace, true, versionedObject, &options)
+ if err != nil {
+ // restore the original object if we fail to create the new one
+ // but still propagate and advertise error to user
+ recreated, recreateErr := helper.Create(info.Namespace, true, info.Object, &options)
+ if recreateErr != nil {
+ err = fmt.Errorf("An error occurred force-replacing the existing object with the newly provided one. %v.\n\nAdditionally, an error occurred attempting to restore the original object: %v", err, recreateErr)
+ } else {
+ createdObject = recreated
+ }
+ }
+ return modified, createdObject, err
+}