From f9290a57d3ecdbbc48913eca33742029a0944cf6 Mon Sep 17 00:00:00 2001 From: hthieu Date: Wed, 4 Aug 2021 19:50:24 +0200 Subject: Update status check endpoint  MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit   Update status check endpoint to use helm (3.5) official implementation of resource status check. Move utils to new module and update import. Issue-ID: MULTICLOUD-1372 Signed-off-by: hthieu Change-Id: I57a827d09466f5f554c89c2fa5533696285f9c37 --- src/k8splugin/cmd/main.go | 2 +- src/k8splugin/go.mod | 2 + src/k8splugin/internal/app/client.go | 84 ++++ src/k8splugin/internal/app/client_test.go | 2 +- src/k8splugin/internal/app/instance.go | 90 +++- src/k8splugin/internal/app/instance_test.go | 2 +- src/k8splugin/internal/helm/helm.go | 3 +- src/k8splugin/internal/plugin/helpers.go | 2 +- src/k8splugin/internal/plugin/helpers_test.go | 2 +- src/k8splugin/internal/rb/archive.go | 3 +- src/k8splugin/internal/statuscheck/converter.go | 69 +++ src/k8splugin/internal/statuscheck/ready.go | 393 ++++++++++++++++ src/k8splugin/internal/statuscheck/ready_test.go | 517 +++++++++++++++++++++ src/k8splugin/internal/statuscheck/resource.go | 85 ++++ .../internal/statuscheck/resource_test.go | 61 +++ src/k8splugin/internal/statuscheck/wait.go | 109 +++++ src/k8splugin/internal/utils.go | 140 ------ src/k8splugin/internal/utils/deploymentutil.go | 178 +++++++ src/k8splugin/internal/utils/utils.go | 140 ++++++ src/k8splugin/internal/utils/utils_test.go | 93 ++++ src/k8splugin/internal/utils_test.go | 93 ---- src/k8splugin/plugins/generic/plugin.go | 2 +- src/k8splugin/plugins/namespace/plugin.go | 2 +- src/k8splugin/plugins/service/plugin.go | 2 +- 24 files changed, 1827 insertions(+), 249 deletions(-) create mode 100644 src/k8splugin/internal/statuscheck/converter.go create mode 100644 src/k8splugin/internal/statuscheck/ready.go create mode 100644 src/k8splugin/internal/statuscheck/ready_test.go create mode 100644 src/k8splugin/internal/statuscheck/resource.go create mode 100644 src/k8splugin/internal/statuscheck/resource_test.go create mode 100644 src/k8splugin/internal/statuscheck/wait.go delete mode 100644 src/k8splugin/internal/utils.go create mode 100644 src/k8splugin/internal/utils/deploymentutil.go create mode 100644 src/k8splugin/internal/utils/utils.go create mode 100644 src/k8splugin/internal/utils/utils_test.go delete mode 100644 src/k8splugin/internal/utils_test.go diff --git a/src/k8splugin/cmd/main.go b/src/k8splugin/cmd/main.go index c37354e0..ff00613e 100644 --- a/src/k8splugin/cmd/main.go +++ b/src/k8splugin/cmd/main.go @@ -16,6 +16,7 @@ package main import ( "context" + "github.com/onap/multicloud-k8s/src/k8splugin/internal/utils" "log" "math/rand" "net/http" @@ -24,7 +25,6 @@ import ( "time" "github.com/onap/multicloud-k8s/src/k8splugin/api" - utils "github.com/onap/multicloud-k8s/src/k8splugin/internal" "github.com/onap/multicloud-k8s/src/k8splugin/internal/auth" "github.com/onap/multicloud-k8s/src/k8splugin/internal/config" diff --git a/src/k8splugin/go.mod b/src/k8splugin/go.mod index 9efd1905..671b64a9 100644 --- a/src/k8splugin/go.mod +++ b/src/k8splugin/go.mod @@ -45,7 +45,9 @@ require ( gopkg.in/yaml.v2 v2.3.0 helm.sh/helm/v3 v3.5.0 k8s.io/api v0.20.1 + k8s.io/apiextensions-apiserver v0.20.1 k8s.io/apimachinery v0.20.1 + k8s.io/cli-runtime v0.20.1 k8s.io/client-go v11.0.1-0.20190409021438-1a26190bd76a+incompatible rsc.io/letsencrypt v0.0.3 // indirect ) diff --git a/src/k8splugin/internal/app/client.go b/src/k8splugin/internal/app/client.go index 4c5f7e1c..623a8dc7 100644 --- a/src/k8splugin/internal/app/client.go +++ b/src/k8splugin/internal/app/client.go @@ -62,6 +62,90 @@ type ResourceStatus struct { Status unstructured.Unstructured `json:"status"` } +func (k *KubernetesClient) getRestApi(apiVersion string) (rest.Interface, error) { + //based on kubectl api-versions + switch apiVersion { + case "admissionregistration.k8s.io/v1": + return k.clientSet.AdmissionregistrationV1().RESTClient(), nil + case "admissionregistration.k8s.io/v1beta1": + return k.clientSet.AdmissionregistrationV1beta1().RESTClient(), nil + case "apps/v1": + return k.clientSet.AppsV1().RESTClient(), nil + case "apps/v1beta1": + return k.clientSet.AppsV1beta1().RESTClient(), nil + case "apps/v1beta2": + return k.clientSet.AppsV1beta2().RESTClient(), nil + case "authentication.k8s.io/v1": + return k.clientSet.AuthenticationV1().RESTClient(), nil + case "authentication.k8s.io/v1beta1": + return k.clientSet.AuthenticationV1beta1().RESTClient(), nil + case "authorization.k8s.io/v1": + return k.clientSet.AuthorizationV1().RESTClient(), nil + case "authorization.k8s.io/v1beta1": + return k.clientSet.AuthorizationV1beta1().RESTClient(), nil + case "autoscaling/v1": + return k.clientSet.AutoscalingV1().RESTClient(), nil + case "autoscaling/v2beta1": + return k.clientSet.AutoscalingV2beta1().RESTClient(), nil + case "autoscaling/v2beta2": + return k.clientSet.AutoscalingV2beta2().RESTClient(), nil + case "batch/v1": + return k.clientSet.BatchV1().RESTClient(), nil + case "batch/v1beta1": + return k.clientSet.BatchV1beta1().RESTClient(), nil + case "certificates.k8s.io/v1": + return k.clientSet.CertificatesV1().RESTClient(), nil + case "certificates.k8s.io/v1beta1": + return k.clientSet.CertificatesV1beta1().RESTClient(), nil + case "coordination.k8s.io/v1": + return k.clientSet.CoordinationV1().RESTClient(), nil + case "coordination.k8s.io/v1beta1": + return k.clientSet.CoordinationV1beta1().RESTClient(), nil + case "v1": + return k.clientSet.CoreV1().RESTClient(), nil + case "discovery.k8s.io/v1beta1": + return k.clientSet.DiscoveryV1beta1().RESTClient(), nil + case "events.k8s.io/v1": + return k.clientSet.EventsV1().RESTClient(), nil + case "events.k8s.io/v1beta1": + return k.clientSet.EventsV1beta1().RESTClient(), nil + case "extensions/v1beta1": + return k.clientSet.ExtensionsV1beta1().RESTClient(), nil + case "flowcontrol.apiserver.k8s.io/v1alpha1": + return k.clientSet.FlowcontrolV1alpha1().RESTClient(), nil + case "networking.k8s.io/v1": + return k.clientSet.NetworkingV1().RESTClient(), nil + case "networking.k8s.io/v1beta1": + return k.clientSet.NetworkingV1beta1().RESTClient(), nil + case "node.k8s.io/v1alpha1": + return k.clientSet.NodeV1alpha1().RESTClient(), nil + case "node.k8s.io/v1beta1": + return k.clientSet.NodeV1beta1().RESTClient(), nil + case "policy/v1beta1": + return k.clientSet.PolicyV1beta1().RESTClient(), nil + case "rbac.authorization.k8s.io/v1": + return k.clientSet.RbacV1().RESTClient(), nil + case "rbac.authorization.k8s.io/v1alpha1": + return k.clientSet.RbacV1alpha1().RESTClient(), nil + case "rbac.authorization.k8s.io/v1beta1": + return k.clientSet.RbacV1beta1().RESTClient(), nil + case "scheduling.k8s.io/v1": + return k.clientSet.SchedulingV1().RESTClient(), nil + case "scheduling.k8s.io/v1alpha1": + return k.clientSet.SchedulingV1alpha1().RESTClient(), nil + case "scheduling.k8s.io/v1beta1": + return k.clientSet.SchedulingV1beta1().RESTClient(), nil + case "storage.k8s.io/v1": + return k.clientSet.StorageV1().RESTClient(), nil + case "storage.k8s.io/v1alpha1": + return k.clientSet.StorageV1alpha1().RESTClient(), nil + case "storage.k8s.io/v1beta1": + return k.clientSet.StorageV1beta1().RESTClient(), nil + default: + return nil, pkgerrors.New("Api version " + apiVersion + " unknown") + } +} + // getPodsByLabel yields status of all pods under given instance ID func (k *KubernetesClient) getPodsByLabel(namespace string) ([]ResourceStatus, error) { client := k.GetStandardClient().CoreV1().Pods(namespace) diff --git a/src/k8splugin/internal/app/client_test.go b/src/k8splugin/internal/app/client_test.go index 6db541a4..0ba244d2 100644 --- a/src/k8splugin/internal/app/client_test.go +++ b/src/k8splugin/internal/app/client_test.go @@ -15,13 +15,13 @@ package app import ( "encoding/base64" + "github.com/onap/multicloud-k8s/src/k8splugin/internal/utils" "io/ioutil" "os" "plugin" "reflect" "testing" - utils "github.com/onap/multicloud-k8s/src/k8splugin/internal" "github.com/onap/multicloud-k8s/src/k8splugin/internal/connection" "github.com/onap/multicloud-k8s/src/k8splugin/internal/db" "github.com/onap/multicloud-k8s/src/k8splugin/internal/helm" diff --git a/src/k8splugin/internal/app/instance.go b/src/k8splugin/internal/app/instance.go index 01d924f9..caf756a0 100644 --- a/src/k8splugin/internal/app/instance.go +++ b/src/k8splugin/internal/app/instance.go @@ -19,14 +19,25 @@ package app import ( + "context" "encoding/json" + + appsv1 "k8s.io/api/apps/v1" + batchv1 "k8s.io/api/batch/v1" + corev1 "k8s.io/api/core/v1" + apiextv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/cli-runtime/pkg/resource" "log" "strings" + "time" "github.com/onap/multicloud-k8s/src/k8splugin/internal/db" "github.com/onap/multicloud-k8s/src/k8splugin/internal/helm" "github.com/onap/multicloud-k8s/src/k8splugin/internal/namegenerator" "github.com/onap/multicloud-k8s/src/k8splugin/internal/rb" + "github.com/onap/multicloud-k8s/src/k8splugin/internal/statuscheck" pkgerrors "github.com/pkg/errors" ) @@ -282,25 +293,34 @@ func (v *InstanceClient) Status(id string) (InstanceStatus, error) { cumulatedErrorMsg = append(cumulatedErrorMsg, err.Error()) } + isReady := true generalStatus := make([]ResourceStatus, 0, len(resResp.Resources)) Main: - for _, resource := range resResp.Resources { + for _, oneResource := range resResp.Resources { for _, pod := range podsStatus { - if resource.GVK == pod.GVK && resource.Name == pod.Name { + if oneResource.GVK == pod.GVK && oneResource.Name == pod.Name { continue Main //Don't double check pods if someone decided to define pod explicitly in helm chart } } - status, err := k8sClient.GetResourceStatus(resource, resResp.Namespace) + status, err := k8sClient.GetResourceStatus(oneResource, resResp.Namespace) if err != nil { cumulatedErrorMsg = append(cumulatedErrorMsg, err.Error()) } else { generalStatus = append(generalStatus, status) } + + ready, err := v.checkRssStatus(oneResource, k8sClient, resResp.Namespace, status) + + if !ready || err != nil { + isReady = false + cumulatedErrorMsg = append(cumulatedErrorMsg, err.Error()) + break + } } resp := InstanceStatus{ Request: resResp.Request, ResourceCount: int32(len(generalStatus) + len(podsStatus)), - Ready: false, //FIXME To determine readiness, some parsing of status fields is necessary + Ready: isReady, //FIXME To determine readiness, some parsing of status fields is necessary ResourcesStatus: append(generalStatus, podsStatus...), } @@ -314,6 +334,68 @@ Main: return resp, nil } +func (v *InstanceClient) checkRssStatus(rss helm.KubernetesResource, k8sClient KubernetesClient, namespace string, status ResourceStatus) (bool, error){ + readyChecker := statuscheck.NewReadyChecker(k8sClient.clientSet, statuscheck.PausedAsReady(true), statuscheck.CheckJobs(true)) + ctx, cancel := context.WithTimeout(context.Background(), time.Duration(60)*time.Second) + defer cancel() + + apiVersion, kind := rss.GVK.ToAPIVersionAndKind() + log.Printf("apiVersion: %s, Kind: %s", apiVersion, kind) + restClient, err := k8sClient.getRestApi(apiVersion) + if err != nil { + return false, err + } + mapper := k8sClient.GetMapper() + mapping, err := mapper.RESTMapping(schema.GroupKind{ + Group: rss.GVK.Group, + Kind: rss.GVK.Kind, + }, rss.GVK.Version) + resourceInfo := resource.Info{ + Client: restClient, + Mapping: mapping, + Namespace: namespace, + Name: rss.Name, + Source: "", + Object: nil, + ResourceVersion: "", + } + + var parsedRes runtime.Object + //TODO: Should we care about different api version for a same kind? + switch kind { + case "Pod": + parsedRes = new(corev1.Pod) + case "Job": + parsedRes = new(batchv1.Job) + case "Deployment": + parsedRes = new(appsv1.Deployment) + case "PersistentVolumeClaim": + parsedRes = new(corev1.PersistentVolume) + case "Service": + parsedRes = new(corev1.Service) + case "DaemonSet": + parsedRes = new(appsv1.DaemonSet) + case "CustomResourceDefinition": + parsedRes = new(apiextv1.CustomResourceDefinition) + case "StatefulSet": + parsedRes = new(appsv1.StatefulSet) + case "ReplicationController": + parsedRes = new(corev1.ReplicationController) + case "ReplicaSet": + parsedRes = new(appsv1.ReplicaSet) + default: + //For not listed resource, consider ready + return true, nil + } + err = runtime.DefaultUnstructuredConverter.FromUnstructured(status.Status.Object, parsedRes) + if err != nil { + return false, err + } + resourceInfo.Object = parsedRes + ready, err := readyChecker.IsReady(ctx, &resourceInfo) + return ready, err +} + // List returns the instance for corresponding ID // Empty string returns all func (v *InstanceClient) List(rbname, rbversion, profilename string) ([]InstanceMiniResponse, error) { diff --git a/src/k8splugin/internal/app/instance_test.go b/src/k8splugin/internal/app/instance_test.go index 2711a52f..099e3b7f 100644 --- a/src/k8splugin/internal/app/instance_test.go +++ b/src/k8splugin/internal/app/instance_test.go @@ -15,13 +15,13 @@ package app import ( "encoding/base64" + "github.com/onap/multicloud-k8s/src/k8splugin/internal/utils" "io/ioutil" "log" "reflect" "sort" "testing" - utils "github.com/onap/multicloud-k8s/src/k8splugin/internal" "github.com/onap/multicloud-k8s/src/k8splugin/internal/connection" "github.com/onap/multicloud-k8s/src/k8splugin/internal/db" "github.com/onap/multicloud-k8s/src/k8splugin/internal/helm" diff --git a/src/k8splugin/internal/helm/helm.go b/src/k8splugin/internal/helm/helm.go index 3c25ac8c..849674a9 100644 --- a/src/k8splugin/internal/helm/helm.go +++ b/src/k8splugin/internal/helm/helm.go @@ -19,14 +19,13 @@ package helm import ( "fmt" + "github.com/onap/multicloud-k8s/src/k8splugin/internal/utils" "io/ioutil" "os" "path/filepath" "regexp" "strings" - utils "github.com/onap/multicloud-k8s/src/k8splugin/internal" - pkgerrors "github.com/pkg/errors" "helm.sh/helm/v3/pkg/action" "helm.sh/helm/v3/pkg/chart/loader" diff --git a/src/k8splugin/internal/plugin/helpers.go b/src/k8splugin/internal/plugin/helpers.go index 7078b813..98a2f5c8 100644 --- a/src/k8splugin/internal/plugin/helpers.go +++ b/src/k8splugin/internal/plugin/helpers.go @@ -17,10 +17,10 @@ package plugin import ( + "github.com/onap/multicloud-k8s/src/k8splugin/internal/utils" "log" "strings" - utils "github.com/onap/multicloud-k8s/src/k8splugin/internal" "github.com/onap/multicloud-k8s/src/k8splugin/internal/config" "github.com/onap/multicloud-k8s/src/k8splugin/internal/helm" diff --git a/src/k8splugin/internal/plugin/helpers_test.go b/src/k8splugin/internal/plugin/helpers_test.go index b968072f..34faf9a5 100644 --- a/src/k8splugin/internal/plugin/helpers_test.go +++ b/src/k8splugin/internal/plugin/helpers_test.go @@ -17,11 +17,11 @@ package plugin import ( + "github.com/onap/multicloud-k8s/src/k8splugin/internal/utils" "testing" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" - utils "github.com/onap/multicloud-k8s/src/k8splugin/internal" "github.com/onap/multicloud-k8s/src/k8splugin/internal/config" ) diff --git a/src/k8splugin/internal/rb/archive.go b/src/k8splugin/internal/rb/archive.go index 267c7cd2..c70dfd6c 100644 --- a/src/k8splugin/internal/rb/archive.go +++ b/src/k8splugin/internal/rb/archive.go @@ -19,13 +19,12 @@ package rb import ( "archive/tar" "compress/gzip" + "github.com/onap/multicloud-k8s/src/k8splugin/internal/utils" pkgerrors "github.com/pkg/errors" "io" "io/ioutil" "os" "path/filepath" - - utils "github.com/onap/multicloud-k8s/src/k8splugin/internal" ) func isTarGz(r io.Reader) error { diff --git a/src/k8splugin/internal/statuscheck/converter.go b/src/k8splugin/internal/statuscheck/converter.go new file mode 100644 index 00000000..8f411c41 --- /dev/null +++ b/src/k8splugin/internal/statuscheck/converter.go @@ -0,0 +1,69 @@ +/* +Copyright The Helm Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package statuscheck // import "helm.sh/helm/v3/pkg/kube" + +import ( + "sync" + + apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1" + apiextensionsv1beta1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1beta1" + "k8s.io/apimachinery/pkg/api/meta" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/cli-runtime/pkg/resource" + "k8s.io/client-go/kubernetes/scheme" +) + +var k8sNativeScheme *runtime.Scheme +var k8sNativeSchemeOnce sync.Once + +// AsVersioned converts the given info into a runtime.Object with the correct +// group and version set +func AsVersioned(info *resource.Info) runtime.Object { + return convertWithMapper(info.Object, info.Mapping) +} + +// convertWithMapper converts the given object with the optional provided +// RESTMapping. If no mapping is provided, the default schema versioner is used +func convertWithMapper(obj runtime.Object, mapping *meta.RESTMapping) runtime.Object { + s := kubernetesNativeScheme() + var gv = runtime.GroupVersioner(schema.GroupVersions(s.PrioritizedVersionsAllGroups())) + if mapping != nil { + gv = mapping.GroupVersionKind.GroupVersion() + } + if obj, err := runtime.ObjectConvertor(s).ConvertToVersion(obj, gv); err == nil { + return obj + } + return obj +} + +// kubernetesNativeScheme returns a clean *runtime.Scheme with _only_ Kubernetes +// native resources added to it. This is required to break free of custom resources +// that may have been added to scheme.Scheme due to Helm being used as a package in +// combination with e.g. a versioned kube client. If we would not do this, the client +// may attempt to perform e.g. a 3-way-merge strategy patch for custom resources. +func kubernetesNativeScheme() *runtime.Scheme { + k8sNativeSchemeOnce.Do(func() { + k8sNativeScheme = runtime.NewScheme() + scheme.AddToScheme(k8sNativeScheme) + // API extensions are not in the above scheme set, + // and must thus be added separately. + apiextensionsv1beta1.AddToScheme(k8sNativeScheme) + apiextensionsv1.AddToScheme(k8sNativeScheme) + }) + return k8sNativeScheme +} diff --git a/src/k8splugin/internal/statuscheck/ready.go b/src/k8splugin/internal/statuscheck/ready.go new file mode 100644 index 00000000..7bea536a --- /dev/null +++ b/src/k8splugin/internal/statuscheck/ready.go @@ -0,0 +1,393 @@ +/* +Copyright The Helm Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package statuscheck // import "helm.sh/helm/v3/pkg/kube" + +import ( + "context" + "github.com/onap/multicloud-k8s/src/k8splugin/internal/utils" + appsv1 "k8s.io/api/apps/v1" + appsv1beta1 "k8s.io/api/apps/v1beta1" + appsv1beta2 "k8s.io/api/apps/v1beta2" + batchv1 "k8s.io/api/batch/v1" + corev1 "k8s.io/api/core/v1" + extensionsv1beta1 "k8s.io/api/extensions/v1beta1" + apiextv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1" + apiextv1beta1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1beta1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/util/intstr" + "k8s.io/cli-runtime/pkg/resource" + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/kubernetes/scheme" + "log" +) + +// ReadyCheckerOption is a function that configures a ReadyChecker. +type ReadyCheckerOption func(*ReadyChecker) + +// PausedAsReady returns a ReadyCheckerOption that configures a ReadyChecker +// to consider paused resources to be ready. For example a Deployment +// with spec.paused equal to true would be considered ready. +func PausedAsReady(pausedAsReady bool) ReadyCheckerOption { + return func(c *ReadyChecker) { + c.pausedAsReady = pausedAsReady + } +} + +// CheckJobs returns a ReadyCheckerOption that configures a ReadyChecker +// to consider readiness of Job resources. +func CheckJobs(checkJobs bool) ReadyCheckerOption { + return func(c *ReadyChecker) { + c.checkJobs = checkJobs + } +} + +// NewReadyChecker creates a new checker. Passed ReadyCheckerOptions can +// be used to override defaults. +func NewReadyChecker(cl kubernetes.Interface, opts ...ReadyCheckerOption) ReadyChecker { + c := ReadyChecker{ + client: cl, + } + + for _, opt := range opts { + opt(&c) + } + + return c +} + +// ReadyChecker is a type that can check core Kubernetes types for readiness. +type ReadyChecker struct { + client kubernetes.Interface + checkJobs bool + pausedAsReady bool +} + +// IsReady checks if v is ready. It supports checking readiness for pods, +// deployments, persistent volume claims, services, daemon sets, custom +// resource definitions, stateful sets, replication controllers, and replica +// sets. All other resource kinds are always considered ready. +// +// IsReady will fetch the latest state of the object from the server prior to +// performing readiness checks, and it will return any error encountered. +func (c *ReadyChecker) IsReady(ctx context.Context, v *resource.Info) (bool, error) { + var ( + // This defaults to true, otherwise we get to a point where + // things will always return false unless one of the objects + // that manages pods has been hit + ok = true + err error + ) + switch value := AsVersioned(v).(type) { + case *corev1.Pod: + pod, err := c.client.CoreV1().Pods(v.Namespace).Get(ctx, v.Name, metav1.GetOptions{}) + if err != nil || !c.isPodReady(pod) { + return false, err + } + case *batchv1.Job: + if c.checkJobs { + job, err := c.client.BatchV1().Jobs(v.Namespace).Get(ctx, v.Name, metav1.GetOptions{}) + if err != nil || !c.jobReady(job) { + return false, err + } + } + case *appsv1.Deployment, *appsv1beta1.Deployment, *appsv1beta2.Deployment, *extensionsv1beta1.Deployment: + currentDeployment, err := c.client.AppsV1().Deployments(v.Namespace).Get(ctx, v.Name, metav1.GetOptions{}) + if err != nil { + return false, err + } + // If paused deployment will never be ready + if currentDeployment.Spec.Paused { + return c.pausedAsReady, nil + } + // Find RS associated with deployment + newReplicaSet, err := utils.GetNewReplicaSet(currentDeployment, c.client.AppsV1()) + if err != nil || newReplicaSet == nil { + return false, err + } + if !c.deploymentReady(newReplicaSet, currentDeployment) { + return false, nil + } + case *corev1.PersistentVolumeClaim: + claim, err := c.client.CoreV1().PersistentVolumeClaims(v.Namespace).Get(ctx, v.Name, metav1.GetOptions{}) + if err != nil { + return false, err + } + if !c.volumeReady(claim) { + return false, nil + } + case *corev1.Service: + svc, err := c.client.CoreV1().Services(v.Namespace).Get(ctx, v.Name, metav1.GetOptions{}) + if err != nil { + return false, err + } + if !c.serviceReady(svc) { + return false, nil + } + case *extensionsv1beta1.DaemonSet, *appsv1.DaemonSet, *appsv1beta2.DaemonSet: + ds, err := c.client.AppsV1().DaemonSets(v.Namespace).Get(ctx, v.Name, metav1.GetOptions{}) + if err != nil { + return false, err + } + if !c.daemonSetReady(ds) { + return false, nil + } + case *apiextv1beta1.CustomResourceDefinition: + if err := v.Get(); err != nil { + return false, err + } + crd := &apiextv1beta1.CustomResourceDefinition{} + if err := scheme.Scheme.Convert(v.Object, crd, nil); err != nil { + return false, err + } + if !c.crdBetaReady(*crd) { + return false, nil + } + case *apiextv1.CustomResourceDefinition: + if err := v.Get(); err != nil { + return false, err + } + crd := &apiextv1.CustomResourceDefinition{} + if err := scheme.Scheme.Convert(v.Object, crd, nil); err != nil { + return false, err + } + if !c.crdReady(*crd) { + return false, nil + } + case *appsv1.StatefulSet, *appsv1beta1.StatefulSet, *appsv1beta2.StatefulSet: + sts, err := c.client.AppsV1().StatefulSets(v.Namespace).Get(ctx, v.Name, metav1.GetOptions{}) + if err != nil { + return false, err + } + if !c.statefulSetReady(sts) { + return false, nil + } + case *corev1.ReplicationController, *extensionsv1beta1.ReplicaSet, *appsv1beta2.ReplicaSet, *appsv1.ReplicaSet: + ok, err = c.podsReadyForObject(ctx, v.Namespace, value) + } + if !ok || err != nil { + return false, err + } + return true, nil +} + +func (c *ReadyChecker) podsReadyForObject(ctx context.Context, namespace string, obj runtime.Object) (bool, error) { + pods, err := c.podsforObject(ctx, namespace, obj) + if err != nil { + return false, err + } + for _, pod := range pods { + if !c.isPodReady(&pod) { + return false, nil + } + } + return true, nil +} + +func (c *ReadyChecker) podsforObject(ctx context.Context, namespace string, obj runtime.Object) ([]corev1.Pod, error) { + selector, err := SelectorsForObject(obj) + if err != nil { + return nil, err + } + list, err := getPods(ctx, c.client, namespace, selector.String()) + return list, err +} + +// isPodReady returns true if a pod is ready; false otherwise. +func (c *ReadyChecker) isPodReady(pod *corev1.Pod) bool { + for _, c := range pod.Status.Conditions { + if c.Type == corev1.PodReady && c.Status == corev1.ConditionTrue { + return true + } + } + log.Printf("Pod is not ready: %s/%s", pod.GetNamespace(), pod.GetName()) + return false +} + +func (c *ReadyChecker) jobReady(job *batchv1.Job) bool { + if job.Status.Failed > *job.Spec.BackoffLimit { + log.Printf("Job is failed: %s/%s", job.GetNamespace(), job.GetName()) + return false + } + if job.Status.Succeeded < *job.Spec.Completions { + log.Printf("Job is not completed: %s/%s", job.GetNamespace(), job.GetName()) + return false + } + return true +} + +func (c *ReadyChecker) serviceReady(s *corev1.Service) bool { + // ExternalName Services are external to cluster so helm shouldn't be checking to see if they're 'ready' (i.e. have an IP Set) + if s.Spec.Type == corev1.ServiceTypeExternalName { + return true + } + + // Ensure that the service cluster IP is not empty + if s.Spec.ClusterIP == "" { + log.Printf("Service does not have cluster IP address: %s/%s", s.GetNamespace(), s.GetName()) + return false + } + + // This checks if the service has a LoadBalancer and that balancer has an Ingress defined + if s.Spec.Type == corev1.ServiceTypeLoadBalancer { + // do not wait when at least 1 external IP is set + if len(s.Spec.ExternalIPs) > 0 { + log.Printf("Service %s/%s has external IP addresses (%v), marking as ready", s.GetNamespace(), s.GetName(), s.Spec.ExternalIPs) + return true + } + + if s.Status.LoadBalancer.Ingress == nil { + log.Printf("Service does not have load balancer ingress IP address: %s/%s", s.GetNamespace(), s.GetName()) + return false + } + } + + return true +} + +func (c *ReadyChecker) volumeReady(v *corev1.PersistentVolumeClaim) bool { + if v.Status.Phase != corev1.ClaimBound { + log.Printf("PersistentVolumeClaim is not bound: %s/%s", v.GetNamespace(), v.GetName()) + return false + } + return true +} + +func (c *ReadyChecker) deploymentReady(rs *appsv1.ReplicaSet, dep *appsv1.Deployment) bool { + expectedReady := *dep.Spec.Replicas - utils.MaxUnavailable(*dep) + if !(rs.Status.ReadyReplicas >= expectedReady) { + log.Printf("Deployment is not ready: %s/%s. %d out of %d expected pods are ready", dep.Namespace, dep.Name, rs.Status.ReadyReplicas, expectedReady) + return false + } + return true +} + +func (c *ReadyChecker) daemonSetReady(ds *appsv1.DaemonSet) bool { + // If the update strategy is not a rolling update, there will be nothing to wait for + if ds.Spec.UpdateStrategy.Type != appsv1.RollingUpdateDaemonSetStrategyType { + return true + } + + // Make sure all the updated pods have been scheduled + if ds.Status.UpdatedNumberScheduled != ds.Status.DesiredNumberScheduled { + log.Printf("DaemonSet is not ready: %s/%s. %d out of %d expected pods have been scheduled", ds.Namespace, ds.Name, ds.Status.UpdatedNumberScheduled, ds.Status.DesiredNumberScheduled) + return false + } + maxUnavailable, err := intstr.GetValueFromIntOrPercent(ds.Spec.UpdateStrategy.RollingUpdate.MaxUnavailable, int(ds.Status.DesiredNumberScheduled), true) + if err != nil { + // If for some reason the value is invalid, set max unavailable to the + // number of desired replicas. This is the same behavior as the + // `MaxUnavailable` function in deploymentutil + maxUnavailable = int(ds.Status.DesiredNumberScheduled) + } + + expectedReady := int(ds.Status.DesiredNumberScheduled) - maxUnavailable + if !(int(ds.Status.NumberReady) >= expectedReady) { + log.Printf("DaemonSet is not ready: %s/%s. %d out of %d expected pods are ready", ds.Namespace, ds.Name, ds.Status.NumberReady, expectedReady) + return false + } + return true +} + +// Because the v1 extensions API is not available on all supported k8s versions +// yet and because Go doesn't support generics, we need to have a duplicate +// function to support the v1beta1 types +func (c *ReadyChecker) crdBetaReady(crd apiextv1beta1.CustomResourceDefinition) bool { + for _, cond := range crd.Status.Conditions { + switch cond.Type { + case apiextv1beta1.Established: + if cond.Status == apiextv1beta1.ConditionTrue { + return true + } + case apiextv1beta1.NamesAccepted: + if cond.Status == apiextv1beta1.ConditionFalse { + // This indicates a naming conflict, but it's probably not the + // job of this function to fail because of that. Instead, + // we treat it as a success, since the process should be able to + // continue. + return true + } + } + } + return false +} + +func (c *ReadyChecker) crdReady(crd apiextv1.CustomResourceDefinition) bool { + for _, cond := range crd.Status.Conditions { + switch cond.Type { + case apiextv1.Established: + if cond.Status == apiextv1.ConditionTrue { + return true + } + case apiextv1.NamesAccepted: + if cond.Status == apiextv1.ConditionFalse { + // This indicates a naming conflict, but it's probably not the + // job of this function to fail because of that. Instead, + // we treat it as a success, since the process should be able to + // continue. + return true + } + } + } + return false +} + +func (c *ReadyChecker) statefulSetReady(sts *appsv1.StatefulSet) bool { + // If the update strategy is not a rolling update, there will be nothing to wait for + if sts.Spec.UpdateStrategy.Type != appsv1.RollingUpdateStatefulSetStrategyType { + return true + } + + // Dereference all the pointers because StatefulSets like them + var partition int + // 1 is the default for replicas if not set + var replicas = 1 + // For some reason, even if the update strategy is a rolling update, the + // actual rollingUpdate field can be nil. If it is, we can safely assume + // there is no partition value + if sts.Spec.UpdateStrategy.RollingUpdate != nil && sts.Spec.UpdateStrategy.RollingUpdate.Partition != nil { + partition = int(*sts.Spec.UpdateStrategy.RollingUpdate.Partition) + } + if sts.Spec.Replicas != nil { + replicas = int(*sts.Spec.Replicas) + } + + // Because an update strategy can use partitioning, we need to calculate the + // number of updated replicas we should have. For example, if the replicas + // is set to 3 and the partition is 2, we'd expect only one pod to be + // updated + expectedReplicas := replicas - partition + + // Make sure all the updated pods have been scheduled + if int(sts.Status.UpdatedReplicas) != expectedReplicas { + log.Printf("StatefulSet is not ready: %s/%s. %d out of %d expected pods have been scheduled", sts.Namespace, sts.Name, sts.Status.UpdatedReplicas, expectedReplicas) + return false + } + + if int(sts.Status.ReadyReplicas) != replicas { + log.Printf("StatefulSet is not ready: %s/%s. %d out of %d expected pods are ready", sts.Namespace, sts.Name, sts.Status.ReadyReplicas, replicas) + return false + } + return true +} + +func getPods(ctx context.Context, client kubernetes.Interface, namespace, selector string) ([]corev1.Pod, error) { + list, err := client.CoreV1().Pods(namespace).List(ctx, metav1.ListOptions{ + LabelSelector: selector, + }) + return list.Items, err +} diff --git a/src/k8splugin/internal/statuscheck/ready_test.go b/src/k8splugin/internal/statuscheck/ready_test.go new file mode 100644 index 00000000..e1db16f4 --- /dev/null +++ b/src/k8splugin/internal/statuscheck/ready_test.go @@ -0,0 +1,517 @@ +/* +Copyright The Helm Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package statuscheck // import "helm.sh/helm/v3/pkg/kube" + +import ( + "context" + "testing" + + appsv1 "k8s.io/api/apps/v1" + batchv1 "k8s.io/api/batch/v1" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/util/intstr" + "k8s.io/client-go/kubernetes/fake" +) + +const defaultNamespace = metav1.NamespaceDefault + +func Test_ReadyChecker_deploymentReady(t *testing.T) { + type args struct { + rs *appsv1.ReplicaSet + dep *appsv1.Deployment + } + tests := []struct { + name string + args args + want bool + }{ + { + name: "deployment is ready", + args: args{ + rs: newReplicaSet("foo", 1, 1), + dep: newDeployment("foo", 1, 1, 0), + }, + want: true, + }, + { + name: "deployment is not ready", + args: args{ + rs: newReplicaSet("foo", 0, 0), + dep: newDeployment("foo", 1, 1, 0), + }, + want: false, + }, + { + name: "deployment is ready when maxUnavailable is set", + args: args{ + rs: newReplicaSet("foo", 2, 1), + dep: newDeployment("foo", 2, 1, 1), + }, + want: true, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + c := NewReadyChecker(fake.NewSimpleClientset(), PausedAsReady(false), CheckJobs(false)) + if got := c.deploymentReady(tt.args.rs, tt.args.dep); got != tt.want { + t.Errorf("deploymentReady() = %v, want %v", got, tt.want) + } + }) + } +} + +func Test_ReadyChecker_daemonSetReady(t *testing.T) { + type args struct { + ds *appsv1.DaemonSet + } + tests := []struct { + name string + args args + want bool + }{ + { + name: "daemonset is ready", + args: args{ + ds: newDaemonSet("foo", 0, 1, 1, 1), + }, + want: true, + }, + { + name: "daemonset is not ready", + args: args{ + ds: newDaemonSet("foo", 0, 0, 1, 1), + }, + want: false, + }, + { + name: "daemonset pods have not been scheduled successfully", + args: args{ + ds: newDaemonSet("foo", 0, 0, 1, 0), + }, + want: false, + }, + { + name: "daemonset is ready when maxUnavailable is set", + args: args{ + ds: newDaemonSet("foo", 1, 1, 2, 2), + }, + want: true, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + c := NewReadyChecker(fake.NewSimpleClientset(), PausedAsReady(false), CheckJobs(false)) + if got := c.daemonSetReady(tt.args.ds); got != tt.want { + t.Errorf("daemonSetReady() = %v, want %v", got, tt.want) + } + }) + } +} + +func Test_ReadyChecker_statefulSetReady(t *testing.T) { + type args struct { + sts *appsv1.StatefulSet + } + tests := []struct { + name string + args args + want bool + }{ + { + name: "statefulset is ready", + args: args{ + sts: newStatefulSet("foo", 1, 0, 1, 1), + }, + want: true, + }, + { + name: "statefulset is not ready", + args: args{ + sts: newStatefulSet("foo", 1, 0, 0, 1), + }, + want: false, + }, + { + name: "statefulset is ready when partition is specified", + args: args{ + sts: newStatefulSet("foo", 2, 1, 2, 1), + }, + want: true, + }, + { + name: "statefulset is not ready when partition is set", + args: args{ + sts: newStatefulSet("foo", 1, 1, 1, 1), + }, + want: false, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + c := NewReadyChecker(fake.NewSimpleClientset(), PausedAsReady(false), CheckJobs(false)) + if got := c.statefulSetReady(tt.args.sts); got != tt.want { + t.Errorf("statefulSetReady() = %v, want %v", got, tt.want) + } + }) + } +} + +func Test_ReadyChecker_podsReadyForObject(t *testing.T) { + type args struct { + namespace string + obj runtime.Object + } + tests := []struct { + name string + args args + existPods []corev1.Pod + want bool + wantErr bool + }{ + { + name: "pods ready for a replicaset", + args: args{ + namespace: defaultNamespace, + obj: newReplicaSet("foo", 1, 1), + }, + existPods: []corev1.Pod{ + *newPodWithCondition("foo", corev1.ConditionTrue), + }, + want: true, + wantErr: false, + }, + { + name: "pods not ready for a replicaset", + args: args{ + namespace: defaultNamespace, + obj: newReplicaSet("foo", 1, 1), + }, + existPods: []corev1.Pod{ + *newPodWithCondition("foo", corev1.ConditionFalse), + }, + want: false, + wantErr: false, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + c := NewReadyChecker(fake.NewSimpleClientset(), PausedAsReady(false), CheckJobs(false)) + for _, pod := range tt.existPods { + if _, err := c.client.CoreV1().Pods(defaultNamespace).Create(context.TODO(), &pod, metav1.CreateOptions{}); err != nil { + t.Errorf("Failed to create Pod error: %v", err) + return + } + } + got, err := c.podsReadyForObject(context.TODO(), tt.args.namespace, tt.args.obj) + if (err != nil) != tt.wantErr { + t.Errorf("podsReadyForObject() error = %v, wantErr %v", err, tt.wantErr) + return + } + if got != tt.want { + t.Errorf("podsReadyForObject() got = %v, want %v", got, tt.want) + } + }) + } +} + +func Test_ReadyChecker_jobReady(t *testing.T) { + type args struct { + job *batchv1.Job + } + tests := []struct { + name string + args args + want bool + }{ + { + name: "job is completed", + args: args{job: newJob("foo", 1, 1, 1, 0)}, + want: true, + }, + { + name: "job is incomplete", + args: args{job: newJob("foo", 1, 1, 0, 0)}, + want: false, + }, + { + name: "job is failed", + args: args{job: newJob("foo", 1, 1, 0, 1)}, + want: false, + }, + { + name: "job is completed with retry", + args: args{job: newJob("foo", 1, 1, 1, 1)}, + want: true, + }, + { + name: "job is failed with retry", + args: args{job: newJob("foo", 1, 1, 0, 2)}, + want: false, + }, + { + name: "job is completed single run", + args: args{job: newJob("foo", 0, 1, 1, 0)}, + want: true, + }, + { + name: "job is failed single run", + args: args{job: newJob("foo", 0, 1, 0, 1)}, + want: false, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + c := NewReadyChecker(fake.NewSimpleClientset(), PausedAsReady(false), CheckJobs(false)) + if got := c.jobReady(tt.args.job); got != tt.want { + t.Errorf("jobReady() = %v, want %v", got, tt.want) + } + }) + } +} + +func Test_ReadyChecker_volumeReady(t *testing.T) { + type args struct { + v *corev1.PersistentVolumeClaim + } + tests := []struct { + name string + args args + want bool + }{ + { + name: "pvc is bound", + args: args{ + v: newPersistentVolumeClaim("foo", corev1.ClaimBound), + }, + want: true, + }, + { + name: "pvc is not ready", + args: args{ + v: newPersistentVolumeClaim("foo", corev1.ClaimPending), + }, + want: false, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + c := NewReadyChecker(fake.NewSimpleClientset(), PausedAsReady(false), CheckJobs(false)) + if got := c.volumeReady(tt.args.v); got != tt.want { + t.Errorf("volumeReady() = %v, want %v", got, tt.want) + } + }) + } +} + +func newDaemonSet(name string, maxUnavailable, numberReady, desiredNumberScheduled, updatedNumberScheduled int) *appsv1.DaemonSet { + return &appsv1.DaemonSet{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + Namespace: defaultNamespace, + }, + Spec: appsv1.DaemonSetSpec{ + UpdateStrategy: appsv1.DaemonSetUpdateStrategy{ + Type: appsv1.RollingUpdateDaemonSetStrategyType, + RollingUpdate: &appsv1.RollingUpdateDaemonSet{ + MaxUnavailable: func() *intstr.IntOrString { i := intstr.FromInt(maxUnavailable); return &i }(), + }, + }, + Selector: &metav1.LabelSelector{MatchLabels: map[string]string{"name": name}}, + Template: corev1.PodTemplateSpec{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + Labels: map[string]string{"name": name}, + }, + Spec: corev1.PodSpec{ + Containers: []corev1.Container{ + { + Image: "nginx", + }, + }, + }, + }, + }, + Status: appsv1.DaemonSetStatus{ + DesiredNumberScheduled: int32(desiredNumberScheduled), + NumberReady: int32(numberReady), + UpdatedNumberScheduled: int32(updatedNumberScheduled), + }, + } +} + +func newStatefulSet(name string, replicas, partition, readyReplicas, updatedReplicas int) *appsv1.StatefulSet { + return &appsv1.StatefulSet{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + Namespace: defaultNamespace, + }, + Spec: appsv1.StatefulSetSpec{ + UpdateStrategy: appsv1.StatefulSetUpdateStrategy{ + Type: appsv1.RollingUpdateStatefulSetStrategyType, + RollingUpdate: &appsv1.RollingUpdateStatefulSetStrategy{ + Partition: intToInt32(partition), + }, + }, + Replicas: intToInt32(replicas), + Selector: &metav1.LabelSelector{MatchLabels: map[string]string{"name": name}}, + Template: corev1.PodTemplateSpec{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + Labels: map[string]string{"name": name}, + }, + Spec: corev1.PodSpec{ + Containers: []corev1.Container{ + { + Image: "nginx", + }, + }, + }, + }, + }, + Status: appsv1.StatefulSetStatus{ + UpdatedReplicas: int32(updatedReplicas), + ReadyReplicas: int32(readyReplicas), + }, + } +} + +func newDeployment(name string, replicas, maxSurge, maxUnavailable int) *appsv1.Deployment { + return &appsv1.Deployment{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + Namespace: defaultNamespace, + }, + Spec: appsv1.DeploymentSpec{ + Strategy: appsv1.DeploymentStrategy{ + Type: appsv1.RollingUpdateDeploymentStrategyType, + RollingUpdate: &appsv1.RollingUpdateDeployment{ + MaxUnavailable: func() *intstr.IntOrString { i := intstr.FromInt(maxUnavailable); return &i }(), + MaxSurge: func() *intstr.IntOrString { i := intstr.FromInt(maxSurge); return &i }(), + }, + }, + Replicas: intToInt32(replicas), + Selector: &metav1.LabelSelector{MatchLabels: map[string]string{"name": name}}, + Template: corev1.PodTemplateSpec{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + Labels: map[string]string{"name": name}, + }, + Spec: corev1.PodSpec{ + Containers: []corev1.Container{ + { + Image: "nginx", + }, + }, + }, + }, + }, + } +} + +func newReplicaSet(name string, replicas int, readyReplicas int) *appsv1.ReplicaSet { + d := newDeployment(name, replicas, 0, 0) + return &appsv1.ReplicaSet{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + Namespace: defaultNamespace, + Labels: d.Spec.Selector.MatchLabels, + OwnerReferences: []metav1.OwnerReference{*metav1.NewControllerRef(d, d.GroupVersionKind())}, + }, + Spec: appsv1.ReplicaSetSpec{ + Selector: d.Spec.Selector, + Replicas: intToInt32(replicas), + Template: d.Spec.Template, + }, + Status: appsv1.ReplicaSetStatus{ + ReadyReplicas: int32(readyReplicas), + }, + } +} + +func newPodWithCondition(name string, podReadyCondition corev1.ConditionStatus) *corev1.Pod { + return &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + Namespace: defaultNamespace, + Labels: map[string]string{"name": name}, + }, + Spec: corev1.PodSpec{ + Containers: []corev1.Container{ + { + Image: "nginx", + }, + }, + }, + Status: corev1.PodStatus{ + Conditions: []corev1.PodCondition{ + { + Type: corev1.PodReady, + Status: podReadyCondition, + }, + }, + }, + } +} + +func newPersistentVolumeClaim(name string, phase corev1.PersistentVolumeClaimPhase) *corev1.PersistentVolumeClaim { + return &corev1.PersistentVolumeClaim{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + Namespace: defaultNamespace, + }, + Status: corev1.PersistentVolumeClaimStatus{ + Phase: phase, + }, + } +} + +func newJob(name string, backoffLimit, completions, succeeded, failed int) *batchv1.Job { + return &batchv1.Job{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + Namespace: defaultNamespace, + }, + Spec: batchv1.JobSpec{ + BackoffLimit: intToInt32(backoffLimit), + Completions: intToInt32(completions), + Template: corev1.PodTemplateSpec{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + Labels: map[string]string{"name": name}, + }, + Spec: corev1.PodSpec{ + Containers: []corev1.Container{ + { + Image: "nginx", + }, + }, + }, + }, + }, + Status: batchv1.JobStatus{ + Succeeded: int32(succeeded), + Failed: int32(failed), + }, + } +} + +func intToInt32(i int) *int32 { + i32 := int32(i) + return &i32 +} diff --git a/src/k8splugin/internal/statuscheck/resource.go b/src/k8splugin/internal/statuscheck/resource.go new file mode 100644 index 00000000..598af2fb --- /dev/null +++ b/src/k8splugin/internal/statuscheck/resource.go @@ -0,0 +1,85 @@ +/* +Copyright The Helm Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package statuscheck // import "helm.sh/helm/v3/pkg/kube" + +import "k8s.io/cli-runtime/pkg/resource" + +// ResourceList provides convenience methods for comparing collections of Infos. +type ResourceList []*resource.Info + +// Append adds an Info to the Result. +func (r *ResourceList) Append(val *resource.Info) { + *r = append(*r, val) +} + +// Visit implements resource.Visitor. +func (r ResourceList) Visit(fn resource.VisitorFunc) error { + for _, i := range r { + if err := fn(i, nil); err != nil { + return err + } + } + return nil +} + +// Filter returns a new Result with Infos that satisfy the predicate fn. +func (r ResourceList) Filter(fn func(*resource.Info) bool) ResourceList { + var result ResourceList + for _, i := range r { + if fn(i) { + result.Append(i) + } + } + return result +} + +// Get returns the Info from the result that matches the name and kind. +func (r ResourceList) Get(info *resource.Info) *resource.Info { + for _, i := range r { + if isMatchingInfo(i, info) { + return i + } + } + return nil +} + +// Contains checks to see if an object exists. +func (r ResourceList) Contains(info *resource.Info) bool { + for _, i := range r { + if isMatchingInfo(i, info) { + return true + } + } + return false +} + +// Difference will return a new Result with objects not contained in rs. +func (r ResourceList) Difference(rs ResourceList) ResourceList { + return r.Filter(func(info *resource.Info) bool { + return !rs.Contains(info) + }) +} + +// Intersect will return a new Result with objects contained in both Results. +func (r ResourceList) Intersect(rs ResourceList) ResourceList { + return r.Filter(rs.Contains) +} + +// isMatchingInfo returns true if infos match on Name and GroupVersionKind. +func isMatchingInfo(a, b *resource.Info) bool { + return a.Name == b.Name && a.Namespace == b.Namespace && a.Mapping.GroupVersionKind.Kind == b.Mapping.GroupVersionKind.Kind +} diff --git a/src/k8splugin/internal/statuscheck/resource_test.go b/src/k8splugin/internal/statuscheck/resource_test.go new file mode 100644 index 00000000..532cebfd --- /dev/null +++ b/src/k8splugin/internal/statuscheck/resource_test.go @@ -0,0 +1,61 @@ +/* +Copyright The Helm Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package statuscheck // import "helm.sh/helm/v3/pkg/kube" + +import ( + "testing" + + "k8s.io/apimachinery/pkg/api/meta" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/cli-runtime/pkg/resource" +) + +func TestResourceList(t *testing.T) { + mapping := &meta.RESTMapping{ + Resource: schema.GroupVersionResource{Group: "group", Version: "version", Resource: "pod"}, + } + + info := func(name string) *resource.Info { + return &resource.Info{Name: name, Mapping: mapping} + } + + var r1, r2 ResourceList + r1 = []*resource.Info{info("foo"), info("bar")} + r2 = []*resource.Info{info("bar")} + + if r1.Get(info("bar")).Mapping.Resource.Resource != "pod" { + t.Error("expected get pod") + } + + diff := r1.Difference(r2) + if len(diff) != 1 { + t.Error("expected 1 result") + } + + if !diff.Contains(info("foo")) { + t.Error("expected diff to return foo") + } + + inter := r1.Intersect(r2) + if len(inter) != 1 { + t.Error("expected 1 result") + } + + if !inter.Contains(info("bar")) { + t.Error("expected intersect to return bar") + } +} diff --git a/src/k8splugin/internal/statuscheck/wait.go b/src/k8splugin/internal/statuscheck/wait.go new file mode 100644 index 00000000..41d90d91 --- /dev/null +++ b/src/k8splugin/internal/statuscheck/wait.go @@ -0,0 +1,109 @@ +/* +Copyright The Helm Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package statuscheck // import "helm.sh/helm/v3/pkg/kube" + +import ( + "context" + "fmt" + "time" + + "github.com/pkg/errors" + appsv1 "k8s.io/api/apps/v1" + appsv1beta1 "k8s.io/api/apps/v1beta1" + appsv1beta2 "k8s.io/api/apps/v1beta2" + batchv1 "k8s.io/api/batch/v1" + corev1 "k8s.io/api/core/v1" + extensionsv1beta1 "k8s.io/api/extensions/v1beta1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/labels" + "k8s.io/apimachinery/pkg/runtime" + + "k8s.io/apimachinery/pkg/util/wait" +) + +type waiter struct { + c ReadyChecker + timeout time.Duration + log func(string, ...interface{}) +} + +// waitForResources polls to get the current status of all pods, PVCs, Services and +// Jobs(optional) until all are ready or a timeout is reached +func (w *waiter) waitForResources(created ResourceList) error { + w.log("beginning wait for %d resources with timeout of %v", len(created), w.timeout) + + ctx, cancel := context.WithTimeout(context.Background(), w.timeout) + defer cancel() + + return wait.PollImmediateUntil(2*time.Second, func() (bool, error) { + for _, v := range created { + ready, err := w.c.IsReady(ctx, v) + if !ready || err != nil { + return false, err + } + } + return true, nil + }, ctx.Done()) +} + +// SelectorsForObject returns the pod label selector for a given object +// +// Modified version of https://github.com/kubernetes/kubernetes/blob/v1.14.1/pkg/kubectl/polymorphichelpers/helpers.go#L84 +func SelectorsForObject(object runtime.Object) (selector labels.Selector, err error) { + switch t := object.(type) { + case *extensionsv1beta1.ReplicaSet: + selector, err = metav1.LabelSelectorAsSelector(t.Spec.Selector) + case *appsv1.ReplicaSet: + selector, err = metav1.LabelSelectorAsSelector(t.Spec.Selector) + case *appsv1beta2.ReplicaSet: + selector, err = metav1.LabelSelectorAsSelector(t.Spec.Selector) + case *corev1.ReplicationController: + selector = labels.SelectorFromSet(t.Spec.Selector) + case *appsv1.StatefulSet: + selector, err = metav1.LabelSelectorAsSelector(t.Spec.Selector) + case *appsv1beta1.StatefulSet: + selector, err = metav1.LabelSelectorAsSelector(t.Spec.Selector) + case *appsv1beta2.StatefulSet: + selector, err = metav1.LabelSelectorAsSelector(t.Spec.Selector) + case *extensionsv1beta1.DaemonSet: + selector, err = metav1.LabelSelectorAsSelector(t.Spec.Selector) + case *appsv1.DaemonSet: + selector, err = metav1.LabelSelectorAsSelector(t.Spec.Selector) + case *appsv1beta2.DaemonSet: + selector, err = metav1.LabelSelectorAsSelector(t.Spec.Selector) + case *extensionsv1beta1.Deployment: + selector, err = metav1.LabelSelectorAsSelector(t.Spec.Selector) + case *appsv1.Deployment: + selector, err = metav1.LabelSelectorAsSelector(t.Spec.Selector) + case *appsv1beta1.Deployment: + selector, err = metav1.LabelSelectorAsSelector(t.Spec.Selector) + case *appsv1beta2.Deployment: + selector, err = metav1.LabelSelectorAsSelector(t.Spec.Selector) + case *batchv1.Job: + selector, err = metav1.LabelSelectorAsSelector(t.Spec.Selector) + case *corev1.Service: + if t.Spec.Selector == nil || len(t.Spec.Selector) == 0 { + return nil, fmt.Errorf("invalid service '%s': Service is defined without a selector", t.Name) + } + selector = labels.SelectorFromSet(t.Spec.Selector) + + default: + return nil, fmt.Errorf("selector for %T not implemented", object) + } + + return selector, errors.Wrap(err, "invalid label selector") +} diff --git a/src/k8splugin/internal/utils.go b/src/k8splugin/internal/utils.go deleted file mode 100644 index 174f8e79..00000000 --- a/src/k8splugin/internal/utils.go +++ /dev/null @@ -1,140 +0,0 @@ -/* -Copyright 2018 Intel Corporation. -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - http://www.apache.org/licenses/LICENSE-2.0 -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package utils - -import ( - "io/ioutil" - "log" - "os" - "path" - "path/filepath" - "plugin" - "strings" - - "github.com/onap/multicloud-k8s/src/k8splugin/internal/config" - "github.com/onap/multicloud-k8s/src/k8splugin/internal/db" - - pkgerrors "github.com/pkg/errors" - "k8s.io/apimachinery/pkg/runtime" - "k8s.io/client-go/kubernetes/scheme" -) - -// LoadedPlugins stores references to the stored plugins -var LoadedPlugins = map[string]*plugin.Plugin{} - -const ResourcesListLimit = 10 - -// ResourceData stores all supported Kubernetes plugin types -type ResourceData struct { - YamlFilePath string - Namespace string - VnfId string -} - -// DecodeYAML reads a YAMl file to extract the Kubernetes object definition -func DecodeYAML(path string, into runtime.Object) (runtime.Object, error) { - if _, err := os.Stat(path); err != nil { - if os.IsNotExist(err) { - return nil, pkgerrors.New("File " + path + " not found") - } else { - return nil, pkgerrors.Wrap(err, "Stat file error") - } - } - - rawBytes, err := ioutil.ReadFile(path) - if err != nil { - return nil, pkgerrors.Wrap(err, "Read YAML file error") - } - - decode := scheme.Codecs.UniversalDeserializer().Decode - obj, _, err := decode(rawBytes, nil, into) - if err != nil { - return nil, pkgerrors.Wrap(err, "Deserialize YAML error") - } - - return obj, nil -} - -// CheckDatabaseConnection checks if the database is up and running and -// plugin can talk to it -func CheckDatabaseConnection() error { - err := db.CreateDBClient(config.GetConfiguration().DatabaseType) - if err != nil { - return pkgerrors.Cause(err) - } - - err = db.DBconn.HealthCheck() - if err != nil { - return pkgerrors.Cause(err) - } - // TODO Convert these to configuration files instead of environment variables. - c := db.EtcdConfig{ - Endpoint: config.GetConfiguration().EtcdIP, - CertFile: config.GetConfiguration().EtcdCert, - KeyFile: config.GetConfiguration().EtcdKey, - CAFile: config.GetConfiguration().EtcdCAFile, - } - err = db.NewEtcdClient(nil, c) - if err != nil { - log.Printf("Etcd Client Initialization failed with error: %s", err.Error()) - } - return nil -} - -// LoadPlugins loads all the compiled .so plugins -func LoadPlugins() error { - pluginsDir := config.GetConfiguration().PluginDir - err := filepath.Walk(pluginsDir, - func(path string, info os.FileInfo, err error) error { - if strings.Contains(path, ".so") { - p, err := plugin.Open(path) - if err != nil { - return pkgerrors.Cause(err) - } - LoadedPlugins[info.Name()[:len(info.Name())-3]] = p - } - return err - }) - if err != nil { - return err - } - - return nil -} - -// CheckInitialSettings is used to check initial settings required to start api -func CheckInitialSettings() error { - err := CheckDatabaseConnection() - if err != nil { - return pkgerrors.Cause(err) - } - - err = LoadPlugins() - if err != nil { - return pkgerrors.Cause(err) - } - - return nil -} - -//EnsureDirectory makes sure that the directories specified in the path exist -//If not, it will create them, if possible. -func EnsureDirectory(f string) error { - base := path.Dir(f) - _, err := os.Stat(base) - if err != nil && !os.IsNotExist(err) { - return err - } - return os.MkdirAll(base, 0755) -} diff --git a/src/k8splugin/internal/utils/deploymentutil.go b/src/k8splugin/internal/utils/deploymentutil.go new file mode 100644 index 00000000..b5159c41 --- /dev/null +++ b/src/k8splugin/internal/utils/deploymentutil.go @@ -0,0 +1,178 @@ +/* +Copyright 2016 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package utils + +import ( + "context" + "sort" + + apps "k8s.io/api/apps/v1" + v1 "k8s.io/api/core/v1" + apiequality "k8s.io/apimachinery/pkg/api/equality" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + intstrutil "k8s.io/apimachinery/pkg/util/intstr" + appsclient "k8s.io/client-go/kubernetes/typed/apps/v1" +) + +// deploymentutil contains a copy of a few functions from Kubernetes controller code to avoid a dependency on k8s.io/kubernetes. +// This code is copied from https://github.com/kubernetes/kubernetes/blob/e856613dd5bb00bcfaca6974431151b5c06cbed5/pkg/controller/deployment/util/deployment_util.go +// No changes to the code were made other than removing some unused functions + +// RsListFunc returns the ReplicaSet from the ReplicaSet namespace and the List metav1.ListOptions. +type RsListFunc func(string, metav1.ListOptions) ([]*apps.ReplicaSet, error) + +// ListReplicaSets returns a slice of RSes the given deployment targets. +// Note that this does NOT attempt to reconcile ControllerRef (adopt/orphan), +// because only the controller itself should do that. +// However, it does filter out anything whose ControllerRef doesn't match. +func ListReplicaSets(deployment *apps.Deployment, getRSList RsListFunc) ([]*apps.ReplicaSet, error) { + // TODO: Right now we list replica sets by their labels. We should list them by selector, i.e. the replica set's selector + // should be a superset of the deployment's selector, see https://github.com/kubernetes/kubernetes/issues/19830. + namespace := deployment.Namespace + selector, err := metav1.LabelSelectorAsSelector(deployment.Spec.Selector) + if err != nil { + return nil, err + } + options := metav1.ListOptions{LabelSelector: selector.String()} + all, err := getRSList(namespace, options) + if err != nil { + return nil, err + } + // Only include those whose ControllerRef matches the Deployment. + owned := make([]*apps.ReplicaSet, 0, len(all)) + for _, rs := range all { + if metav1.IsControlledBy(rs, deployment) { + owned = append(owned, rs) + } + } + return owned, nil +} + +// ReplicaSetsByCreationTimestamp sorts a list of ReplicaSet by creation timestamp, using their names as a tie breaker. +type ReplicaSetsByCreationTimestamp []*apps.ReplicaSet + +func (o ReplicaSetsByCreationTimestamp) Len() int { return len(o) } +func (o ReplicaSetsByCreationTimestamp) Swap(i, j int) { o[i], o[j] = o[j], o[i] } +func (o ReplicaSetsByCreationTimestamp) Less(i, j int) bool { + if o[i].CreationTimestamp.Equal(&o[j].CreationTimestamp) { + return o[i].Name < o[j].Name + } + return o[i].CreationTimestamp.Before(&o[j].CreationTimestamp) +} + +// FindNewReplicaSet returns the new RS this given deployment targets (the one with the same pod template). +func FindNewReplicaSet(deployment *apps.Deployment, rsList []*apps.ReplicaSet) *apps.ReplicaSet { + sort.Sort(ReplicaSetsByCreationTimestamp(rsList)) + for i := range rsList { + if EqualIgnoreHash(&rsList[i].Spec.Template, &deployment.Spec.Template) { + // In rare cases, such as after cluster upgrades, Deployment may end up with + // having more than one new ReplicaSets that have the same template as its template, + // see https://github.com/kubernetes/kubernetes/issues/40415 + // We deterministically choose the oldest new ReplicaSet. + return rsList[i] + } + } + // new ReplicaSet does not exist. + return nil +} + +// EqualIgnoreHash returns true if two given podTemplateSpec are equal, ignoring the diff in value of Labels[pod-template-hash] +// We ignore pod-template-hash because: +// 1. The hash result would be different upon podTemplateSpec API changes +// (e.g. the addition of a new field will cause the hash code to change) +// 2. The deployment template won't have hash labels +func EqualIgnoreHash(template1, template2 *v1.PodTemplateSpec) bool { + t1Copy := template1.DeepCopy() + t2Copy := template2.DeepCopy() + // Remove hash labels from template.Labels before comparing + delete(t1Copy.Labels, apps.DefaultDeploymentUniqueLabelKey) + delete(t2Copy.Labels, apps.DefaultDeploymentUniqueLabelKey) + return apiequality.Semantic.DeepEqual(t1Copy, t2Copy) +} + +// GetNewReplicaSet returns a replica set that matches the intent of the given deployment; get ReplicaSetList from client interface. +// Returns nil if the new replica set doesn't exist yet. +func GetNewReplicaSet(deployment *apps.Deployment, c appsclient.AppsV1Interface) (*apps.ReplicaSet, error) { + rsList, err := ListReplicaSets(deployment, RsListFromClient(c)) + if err != nil { + return nil, err + } + return FindNewReplicaSet(deployment, rsList), nil +} + +// RsListFromClient returns an rsListFunc that wraps the given client. +func RsListFromClient(c appsclient.AppsV1Interface) RsListFunc { + return func(namespace string, options metav1.ListOptions) ([]*apps.ReplicaSet, error) { + rsList, err := c.ReplicaSets(namespace).List(context.Background(), options) + if err != nil { + return nil, err + } + var ret []*apps.ReplicaSet + for i := range rsList.Items { + ret = append(ret, &rsList.Items[i]) + } + return ret, err + } +} + +// IsRollingUpdate returns true if the strategy type is a rolling update. +func IsRollingUpdate(deployment *apps.Deployment) bool { + return deployment.Spec.Strategy.Type == apps.RollingUpdateDeploymentStrategyType +} + +// MaxUnavailable returns the maximum unavailable pods a rolling deployment can take. +func MaxUnavailable(deployment apps.Deployment) int32 { + if !IsRollingUpdate(&deployment) || *(deployment.Spec.Replicas) == 0 { + return int32(0) + } + // Error caught by validation + _, maxUnavailable, _ := ResolveFenceposts(deployment.Spec.Strategy.RollingUpdate.MaxSurge, deployment.Spec.Strategy.RollingUpdate.MaxUnavailable, *(deployment.Spec.Replicas)) + if maxUnavailable > *deployment.Spec.Replicas { + return *deployment.Spec.Replicas + } + return maxUnavailable +} + +// ResolveFenceposts resolves both maxSurge and maxUnavailable. This needs to happen in one +// step. For example: +// +// 2 desired, max unavailable 1%, surge 0% - should scale old(-1), then new(+1), then old(-1), then new(+1) +// 1 desired, max unavailable 1%, surge 0% - should scale old(-1), then new(+1) +// 2 desired, max unavailable 25%, surge 1% - should scale new(+1), then old(-1), then new(+1), then old(-1) +// 1 desired, max unavailable 25%, surge 1% - should scale new(+1), then old(-1) +// 2 desired, max unavailable 0%, surge 1% - should scale new(+1), then old(-1), then new(+1), then old(-1) +// 1 desired, max unavailable 0%, surge 1% - should scale new(+1), then old(-1) +func ResolveFenceposts(maxSurge, maxUnavailable *intstrutil.IntOrString, desired int32) (int32, int32, error) { + surge, err := intstrutil.GetValueFromIntOrPercent(intstrutil.ValueOrDefault(maxSurge, intstrutil.FromInt(0)), int(desired), true) + if err != nil { + return 0, 0, err + } + unavailable, err := intstrutil.GetValueFromIntOrPercent(intstrutil.ValueOrDefault(maxUnavailable, intstrutil.FromInt(0)), int(desired), false) + if err != nil { + return 0, 0, err + } + + if surge == 0 && unavailable == 0 { + // Validation should never allow the user to explicitly use zero values for both maxSurge + // maxUnavailable. Due to rounding down maxUnavailable though, it may resolve to zero. + // If both fenceposts resolve to zero, then we should set maxUnavailable to 1 on the + // theory that surge might not work due to quota. + unavailable = 1 + } + + return int32(surge), int32(unavailable), nil +} diff --git a/src/k8splugin/internal/utils/utils.go b/src/k8splugin/internal/utils/utils.go new file mode 100644 index 00000000..174f8e79 --- /dev/null +++ b/src/k8splugin/internal/utils/utils.go @@ -0,0 +1,140 @@ +/* +Copyright 2018 Intel Corporation. +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package utils + +import ( + "io/ioutil" + "log" + "os" + "path" + "path/filepath" + "plugin" + "strings" + + "github.com/onap/multicloud-k8s/src/k8splugin/internal/config" + "github.com/onap/multicloud-k8s/src/k8splugin/internal/db" + + pkgerrors "github.com/pkg/errors" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/client-go/kubernetes/scheme" +) + +// LoadedPlugins stores references to the stored plugins +var LoadedPlugins = map[string]*plugin.Plugin{} + +const ResourcesListLimit = 10 + +// ResourceData stores all supported Kubernetes plugin types +type ResourceData struct { + YamlFilePath string + Namespace string + VnfId string +} + +// DecodeYAML reads a YAMl file to extract the Kubernetes object definition +func DecodeYAML(path string, into runtime.Object) (runtime.Object, error) { + if _, err := os.Stat(path); err != nil { + if os.IsNotExist(err) { + return nil, pkgerrors.New("File " + path + " not found") + } else { + return nil, pkgerrors.Wrap(err, "Stat file error") + } + } + + rawBytes, err := ioutil.ReadFile(path) + if err != nil { + return nil, pkgerrors.Wrap(err, "Read YAML file error") + } + + decode := scheme.Codecs.UniversalDeserializer().Decode + obj, _, err := decode(rawBytes, nil, into) + if err != nil { + return nil, pkgerrors.Wrap(err, "Deserialize YAML error") + } + + return obj, nil +} + +// CheckDatabaseConnection checks if the database is up and running and +// plugin can talk to it +func CheckDatabaseConnection() error { + err := db.CreateDBClient(config.GetConfiguration().DatabaseType) + if err != nil { + return pkgerrors.Cause(err) + } + + err = db.DBconn.HealthCheck() + if err != nil { + return pkgerrors.Cause(err) + } + // TODO Convert these to configuration files instead of environment variables. + c := db.EtcdConfig{ + Endpoint: config.GetConfiguration().EtcdIP, + CertFile: config.GetConfiguration().EtcdCert, + KeyFile: config.GetConfiguration().EtcdKey, + CAFile: config.GetConfiguration().EtcdCAFile, + } + err = db.NewEtcdClient(nil, c) + if err != nil { + log.Printf("Etcd Client Initialization failed with error: %s", err.Error()) + } + return nil +} + +// LoadPlugins loads all the compiled .so plugins +func LoadPlugins() error { + pluginsDir := config.GetConfiguration().PluginDir + err := filepath.Walk(pluginsDir, + func(path string, info os.FileInfo, err error) error { + if strings.Contains(path, ".so") { + p, err := plugin.Open(path) + if err != nil { + return pkgerrors.Cause(err) + } + LoadedPlugins[info.Name()[:len(info.Name())-3]] = p + } + return err + }) + if err != nil { + return err + } + + return nil +} + +// CheckInitialSettings is used to check initial settings required to start api +func CheckInitialSettings() error { + err := CheckDatabaseConnection() + if err != nil { + return pkgerrors.Cause(err) + } + + err = LoadPlugins() + if err != nil { + return pkgerrors.Cause(err) + } + + return nil +} + +//EnsureDirectory makes sure that the directories specified in the path exist +//If not, it will create them, if possible. +func EnsureDirectory(f string) error { + base := path.Dir(f) + _, err := os.Stat(base) + if err != nil && !os.IsNotExist(err) { + return err + } + return os.MkdirAll(base, 0755) +} diff --git a/src/k8splugin/internal/utils/utils_test.go b/src/k8splugin/internal/utils/utils_test.go new file mode 100644 index 00000000..908ce92e --- /dev/null +++ b/src/k8splugin/internal/utils/utils_test.go @@ -0,0 +1,93 @@ +/* +Copyright 2018 Intel Corporation. +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package utils + +import ( + "strings" + "testing" + + appsV1 "k8s.io/api/apps/v1" + coreV1 "k8s.io/api/core/v1" + metaV1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" +) + +func TestDecodeYAML(t *testing.T) { + testCases := []struct { + label string + input string + expectedResult runtime.Object + expectedError string + }{ + { + label: "Fail to read non-existing YAML file", + input: "unexisting-file.yaml", + expectedError: "not found", + }, + { + label: "Fail to read invalid YAML format", + input: "./utils_test.go", + expectedError: "mapping values are not allowed in this contex", + }, + { + label: "Successfully read YAML file", + input: "../../mock_files/mock_yamls/deployment.yaml", + expectedResult: &appsV1.Deployment{ + ObjectMeta: metaV1.ObjectMeta{ + Name: "mock-deployment", + }, + Spec: appsV1.DeploymentSpec{ + Template: coreV1.PodTemplateSpec{ + ObjectMeta: metaV1.ObjectMeta{ + Labels: map[string]string{"app": "sise"}, + }, + Spec: coreV1.PodSpec{ + Containers: []coreV1.Container{ + coreV1.Container{ + Name: "sise", + Image: "mhausenblas/simpleservice:0.5.0", + }, + }, + }, + }, + }, + }, + }, + } + + for _, testCase := range testCases { + t.Run(testCase.label, func(t *testing.T) { + result, err := DecodeYAML(testCase.input, nil) + if err != nil { + if testCase.expectedError == "" { + t.Fatalf("Decode YAML method return an un-expected (%s)", err) + } + if !strings.Contains(string(err.Error()), testCase.expectedError) { + t.Fatalf("Decode YAML method returned an error (%s)", err) + } + } else { + if testCase.expectedError != "" && testCase.expectedResult == nil { + t.Fatalf("Decode YAML method was expecting \"%s\" error message", testCase.expectedError) + } + if result == nil { + t.Fatal("Decode YAML method returned nil result") + } + // if !reflect.DeepEqual(testCase.expectedResult, result) { + + // t.Fatalf("Decode YAML method returned: \n%v\n and it was expected: \n%v", result, testCase.expectedResult) + // } + } + }) + } +} diff --git a/src/k8splugin/internal/utils_test.go b/src/k8splugin/internal/utils_test.go deleted file mode 100644 index 58b17bc6..00000000 --- a/src/k8splugin/internal/utils_test.go +++ /dev/null @@ -1,93 +0,0 @@ -/* -Copyright 2018 Intel Corporation. -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - http://www.apache.org/licenses/LICENSE-2.0 -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package utils - -import ( - "strings" - "testing" - - appsV1 "k8s.io/api/apps/v1" - coreV1 "k8s.io/api/core/v1" - metaV1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/runtime" -) - -func TestDecodeYAML(t *testing.T) { - testCases := []struct { - label string - input string - expectedResult runtime.Object - expectedError string - }{ - { - label: "Fail to read non-existing YAML file", - input: "unexisting-file.yaml", - expectedError: "not found", - }, - { - label: "Fail to read invalid YAML format", - input: "./utils_test.go", - expectedError: "mapping values are not allowed in this contex", - }, - { - label: "Successfully read YAML file", - input: "../mock_files/mock_yamls/deployment.yaml", - expectedResult: &appsV1.Deployment{ - ObjectMeta: metaV1.ObjectMeta{ - Name: "mock-deployment", - }, - Spec: appsV1.DeploymentSpec{ - Template: coreV1.PodTemplateSpec{ - ObjectMeta: metaV1.ObjectMeta{ - Labels: map[string]string{"app": "sise"}, - }, - Spec: coreV1.PodSpec{ - Containers: []coreV1.Container{ - coreV1.Container{ - Name: "sise", - Image: "mhausenblas/simpleservice:0.5.0", - }, - }, - }, - }, - }, - }, - }, - } - - for _, testCase := range testCases { - t.Run(testCase.label, func(t *testing.T) { - result, err := DecodeYAML(testCase.input, nil) - if err != nil { - if testCase.expectedError == "" { - t.Fatalf("Decode YAML method return an un-expected (%s)", err) - } - if !strings.Contains(string(err.Error()), testCase.expectedError) { - t.Fatalf("Decode YAML method returned an error (%s)", err) - } - } else { - if testCase.expectedError != "" && testCase.expectedResult == nil { - t.Fatalf("Decode YAML method was expecting \"%s\" error message", testCase.expectedError) - } - if result == nil { - t.Fatal("Decode YAML method returned nil result") - } - // if !reflect.DeepEqual(testCase.expectedResult, result) { - - // t.Fatalf("Decode YAML method returned: \n%v\n and it was expected: \n%v", result, testCase.expectedResult) - // } - } - }) - } -} diff --git a/src/k8splugin/plugins/generic/plugin.go b/src/k8splugin/plugins/generic/plugin.go index f38fee78..61d84fe3 100644 --- a/src/k8splugin/plugins/generic/plugin.go +++ b/src/k8splugin/plugins/generic/plugin.go @@ -15,13 +15,13 @@ package main import ( "context" + "github.com/onap/multicloud-k8s/src/k8splugin/internal/utils" pkgerrors "github.com/pkg/errors" "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/runtime/schema" - utils "github.com/onap/multicloud-k8s/src/k8splugin/internal" "github.com/onap/multicloud-k8s/src/k8splugin/internal/config" "github.com/onap/multicloud-k8s/src/k8splugin/internal/helm" "github.com/onap/multicloud-k8s/src/k8splugin/internal/plugin" diff --git a/src/k8splugin/plugins/namespace/plugin.go b/src/k8splugin/plugins/namespace/plugin.go index 851a5568..acbfb00d 100644 --- a/src/k8splugin/plugins/namespace/plugin.go +++ b/src/k8splugin/plugins/namespace/plugin.go @@ -15,6 +15,7 @@ package main import ( "context" + "github.com/onap/multicloud-k8s/src/k8splugin/internal/utils" "log" pkgerrors "github.com/pkg/errors" @@ -22,7 +23,6 @@ import ( metaV1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime/schema" - utils "github.com/onap/multicloud-k8s/src/k8splugin/internal" "github.com/onap/multicloud-k8s/src/k8splugin/internal/helm" "github.com/onap/multicloud-k8s/src/k8splugin/internal/plugin" ) diff --git a/src/k8splugin/plugins/service/plugin.go b/src/k8splugin/plugins/service/plugin.go index ba1decbb..4837ea56 100644 --- a/src/k8splugin/plugins/service/plugin.go +++ b/src/k8splugin/plugins/service/plugin.go @@ -15,6 +15,7 @@ package main import ( "context" + "github.com/onap/multicloud-k8s/src/k8splugin/internal/utils" "log" pkgerrors "github.com/pkg/errors" @@ -22,7 +23,6 @@ import ( metaV1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime/schema" - utils "github.com/onap/multicloud-k8s/src/k8splugin/internal" "github.com/onap/multicloud-k8s/src/k8splugin/internal/config" "github.com/onap/multicloud-k8s/src/k8splugin/internal/helm" "github.com/onap/multicloud-k8s/src/k8splugin/internal/plugin" -- cgit 1.2.3-korg