diff options
Diffstat (limited to 'src/k8splugin/internal/statuscheck')
-rw-r--r-- | src/k8splugin/internal/statuscheck/converter.go | 69 | ||||
-rw-r--r-- | src/k8splugin/internal/statuscheck/ready.go | 393 | ||||
-rw-r--r-- | src/k8splugin/internal/statuscheck/ready_test.go | 517 | ||||
-rw-r--r-- | src/k8splugin/internal/statuscheck/resource.go | 85 | ||||
-rw-r--r-- | src/k8splugin/internal/statuscheck/resource_test.go | 61 | ||||
-rw-r--r-- | src/k8splugin/internal/statuscheck/wait.go | 109 |
6 files changed, 1234 insertions, 0 deletions
diff --git a/src/k8splugin/internal/statuscheck/converter.go b/src/k8splugin/internal/statuscheck/converter.go new file mode 100644 index 00000000..8f411c41 --- /dev/null +++ b/src/k8splugin/internal/statuscheck/converter.go @@ -0,0 +1,69 @@ +/* +Copyright The Helm Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package statuscheck // import "helm.sh/helm/v3/pkg/kube" + +import ( + "sync" + + apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1" + apiextensionsv1beta1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1beta1" + "k8s.io/apimachinery/pkg/api/meta" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/cli-runtime/pkg/resource" + "k8s.io/client-go/kubernetes/scheme" +) + +var k8sNativeScheme *runtime.Scheme +var k8sNativeSchemeOnce sync.Once + +// AsVersioned converts the given info into a runtime.Object with the correct +// group and version set +func AsVersioned(info *resource.Info) runtime.Object { + return convertWithMapper(info.Object, info.Mapping) +} + +// convertWithMapper converts the given object with the optional provided +// RESTMapping. If no mapping is provided, the default schema versioner is used +func convertWithMapper(obj runtime.Object, mapping *meta.RESTMapping) runtime.Object { + s := kubernetesNativeScheme() + var gv = runtime.GroupVersioner(schema.GroupVersions(s.PrioritizedVersionsAllGroups())) + if mapping != nil { + gv = mapping.GroupVersionKind.GroupVersion() + } + if obj, err := runtime.ObjectConvertor(s).ConvertToVersion(obj, gv); err == nil { + return obj + } + return obj +} + +// kubernetesNativeScheme returns a clean *runtime.Scheme with _only_ Kubernetes +// native resources added to it. This is required to break free of custom resources +// that may have been added to scheme.Scheme due to Helm being used as a package in +// combination with e.g. a versioned kube client. If we would not do this, the client +// may attempt to perform e.g. a 3-way-merge strategy patch for custom resources. +func kubernetesNativeScheme() *runtime.Scheme { + k8sNativeSchemeOnce.Do(func() { + k8sNativeScheme = runtime.NewScheme() + scheme.AddToScheme(k8sNativeScheme) + // API extensions are not in the above scheme set, + // and must thus be added separately. + apiextensionsv1beta1.AddToScheme(k8sNativeScheme) + apiextensionsv1.AddToScheme(k8sNativeScheme) + }) + return k8sNativeScheme +} diff --git a/src/k8splugin/internal/statuscheck/ready.go b/src/k8splugin/internal/statuscheck/ready.go new file mode 100644 index 00000000..7bea536a --- /dev/null +++ b/src/k8splugin/internal/statuscheck/ready.go @@ -0,0 +1,393 @@ +/* +Copyright The Helm Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package statuscheck // import "helm.sh/helm/v3/pkg/kube" + +import ( + "context" + "github.com/onap/multicloud-k8s/src/k8splugin/internal/utils" + appsv1 "k8s.io/api/apps/v1" + appsv1beta1 "k8s.io/api/apps/v1beta1" + appsv1beta2 "k8s.io/api/apps/v1beta2" + batchv1 "k8s.io/api/batch/v1" + corev1 "k8s.io/api/core/v1" + extensionsv1beta1 "k8s.io/api/extensions/v1beta1" + apiextv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1" + apiextv1beta1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1beta1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/util/intstr" + "k8s.io/cli-runtime/pkg/resource" + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/kubernetes/scheme" + "log" +) + +// ReadyCheckerOption is a function that configures a ReadyChecker. +type ReadyCheckerOption func(*ReadyChecker) + +// PausedAsReady returns a ReadyCheckerOption that configures a ReadyChecker +// to consider paused resources to be ready. For example a Deployment +// with spec.paused equal to true would be considered ready. +func PausedAsReady(pausedAsReady bool) ReadyCheckerOption { + return func(c *ReadyChecker) { + c.pausedAsReady = pausedAsReady + } +} + +// CheckJobs returns a ReadyCheckerOption that configures a ReadyChecker +// to consider readiness of Job resources. +func CheckJobs(checkJobs bool) ReadyCheckerOption { + return func(c *ReadyChecker) { + c.checkJobs = checkJobs + } +} + +// NewReadyChecker creates a new checker. Passed ReadyCheckerOptions can +// be used to override defaults. +func NewReadyChecker(cl kubernetes.Interface, opts ...ReadyCheckerOption) ReadyChecker { + c := ReadyChecker{ + client: cl, + } + + for _, opt := range opts { + opt(&c) + } + + return c +} + +// ReadyChecker is a type that can check core Kubernetes types for readiness. +type ReadyChecker struct { + client kubernetes.Interface + checkJobs bool + pausedAsReady bool +} + +// IsReady checks if v is ready. It supports checking readiness for pods, +// deployments, persistent volume claims, services, daemon sets, custom +// resource definitions, stateful sets, replication controllers, and replica +// sets. All other resource kinds are always considered ready. +// +// IsReady will fetch the latest state of the object from the server prior to +// performing readiness checks, and it will return any error encountered. +func (c *ReadyChecker) IsReady(ctx context.Context, v *resource.Info) (bool, error) { + var ( + // This defaults to true, otherwise we get to a point where + // things will always return false unless one of the objects + // that manages pods has been hit + ok = true + err error + ) + switch value := AsVersioned(v).(type) { + case *corev1.Pod: + pod, err := c.client.CoreV1().Pods(v.Namespace).Get(ctx, v.Name, metav1.GetOptions{}) + if err != nil || !c.isPodReady(pod) { + return false, err + } + case *batchv1.Job: + if c.checkJobs { + job, err := c.client.BatchV1().Jobs(v.Namespace).Get(ctx, v.Name, metav1.GetOptions{}) + if err != nil || !c.jobReady(job) { + return false, err + } + } + case *appsv1.Deployment, *appsv1beta1.Deployment, *appsv1beta2.Deployment, *extensionsv1beta1.Deployment: + currentDeployment, err := c.client.AppsV1().Deployments(v.Namespace).Get(ctx, v.Name, metav1.GetOptions{}) + if err != nil { + return false, err + } + // If paused deployment will never be ready + if currentDeployment.Spec.Paused { + return c.pausedAsReady, nil + } + // Find RS associated with deployment + newReplicaSet, err := utils.GetNewReplicaSet(currentDeployment, c.client.AppsV1()) + if err != nil || newReplicaSet == nil { + return false, err + } + if !c.deploymentReady(newReplicaSet, currentDeployment) { + return false, nil + } + case *corev1.PersistentVolumeClaim: + claim, err := c.client.CoreV1().PersistentVolumeClaims(v.Namespace).Get(ctx, v.Name, metav1.GetOptions{}) + if err != nil { + return false, err + } + if !c.volumeReady(claim) { + return false, nil + } + case *corev1.Service: + svc, err := c.client.CoreV1().Services(v.Namespace).Get(ctx, v.Name, metav1.GetOptions{}) + if err != nil { + return false, err + } + if !c.serviceReady(svc) { + return false, nil + } + case *extensionsv1beta1.DaemonSet, *appsv1.DaemonSet, *appsv1beta2.DaemonSet: + ds, err := c.client.AppsV1().DaemonSets(v.Namespace).Get(ctx, v.Name, metav1.GetOptions{}) + if err != nil { + return false, err + } + if !c.daemonSetReady(ds) { + return false, nil + } + case *apiextv1beta1.CustomResourceDefinition: + if err := v.Get(); err != nil { + return false, err + } + crd := &apiextv1beta1.CustomResourceDefinition{} + if err := scheme.Scheme.Convert(v.Object, crd, nil); err != nil { + return false, err + } + if !c.crdBetaReady(*crd) { + return false, nil + } + case *apiextv1.CustomResourceDefinition: + if err := v.Get(); err != nil { + return false, err + } + crd := &apiextv1.CustomResourceDefinition{} + if err := scheme.Scheme.Convert(v.Object, crd, nil); err != nil { + return false, err + } + if !c.crdReady(*crd) { + return false, nil + } + case *appsv1.StatefulSet, *appsv1beta1.StatefulSet, *appsv1beta2.StatefulSet: + sts, err := c.client.AppsV1().StatefulSets(v.Namespace).Get(ctx, v.Name, metav1.GetOptions{}) + if err != nil { + return false, err + } + if !c.statefulSetReady(sts) { + return false, nil + } + case *corev1.ReplicationController, *extensionsv1beta1.ReplicaSet, *appsv1beta2.ReplicaSet, *appsv1.ReplicaSet: + ok, err = c.podsReadyForObject(ctx, v.Namespace, value) + } + if !ok || err != nil { + return false, err + } + return true, nil +} + +func (c *ReadyChecker) podsReadyForObject(ctx context.Context, namespace string, obj runtime.Object) (bool, error) { + pods, err := c.podsforObject(ctx, namespace, obj) + if err != nil { + return false, err + } + for _, pod := range pods { + if !c.isPodReady(&pod) { + return false, nil + } + } + return true, nil +} + +func (c *ReadyChecker) podsforObject(ctx context.Context, namespace string, obj runtime.Object) ([]corev1.Pod, error) { + selector, err := SelectorsForObject(obj) + if err != nil { + return nil, err + } + list, err := getPods(ctx, c.client, namespace, selector.String()) + return list, err +} + +// isPodReady returns true if a pod is ready; false otherwise. +func (c *ReadyChecker) isPodReady(pod *corev1.Pod) bool { + for _, c := range pod.Status.Conditions { + if c.Type == corev1.PodReady && c.Status == corev1.ConditionTrue { + return true + } + } + log.Printf("Pod is not ready: %s/%s", pod.GetNamespace(), pod.GetName()) + return false +} + +func (c *ReadyChecker) jobReady(job *batchv1.Job) bool { + if job.Status.Failed > *job.Spec.BackoffLimit { + log.Printf("Job is failed: %s/%s", job.GetNamespace(), job.GetName()) + return false + } + if job.Status.Succeeded < *job.Spec.Completions { + log.Printf("Job is not completed: %s/%s", job.GetNamespace(), job.GetName()) + return false + } + return true +} + +func (c *ReadyChecker) serviceReady(s *corev1.Service) bool { + // ExternalName Services are external to cluster so helm shouldn't be checking to see if they're 'ready' (i.e. have an IP Set) + if s.Spec.Type == corev1.ServiceTypeExternalName { + return true + } + + // Ensure that the service cluster IP is not empty + if s.Spec.ClusterIP == "" { + log.Printf("Service does not have cluster IP address: %s/%s", s.GetNamespace(), s.GetName()) + return false + } + + // This checks if the service has a LoadBalancer and that balancer has an Ingress defined + if s.Spec.Type == corev1.ServiceTypeLoadBalancer { + // do not wait when at least 1 external IP is set + if len(s.Spec.ExternalIPs) > 0 { + log.Printf("Service %s/%s has external IP addresses (%v), marking as ready", s.GetNamespace(), s.GetName(), s.Spec.ExternalIPs) + return true + } + + if s.Status.LoadBalancer.Ingress == nil { + log.Printf("Service does not have load balancer ingress IP address: %s/%s", s.GetNamespace(), s.GetName()) + return false + } + } + + return true +} + +func (c *ReadyChecker) volumeReady(v *corev1.PersistentVolumeClaim) bool { + if v.Status.Phase != corev1.ClaimBound { + log.Printf("PersistentVolumeClaim is not bound: %s/%s", v.GetNamespace(), v.GetName()) + return false + } + return true +} + +func (c *ReadyChecker) deploymentReady(rs *appsv1.ReplicaSet, dep *appsv1.Deployment) bool { + expectedReady := *dep.Spec.Replicas - utils.MaxUnavailable(*dep) + if !(rs.Status.ReadyReplicas >= expectedReady) { + log.Printf("Deployment is not ready: %s/%s. %d out of %d expected pods are ready", dep.Namespace, dep.Name, rs.Status.ReadyReplicas, expectedReady) + return false + } + return true +} + +func (c *ReadyChecker) daemonSetReady(ds *appsv1.DaemonSet) bool { + // If the update strategy is not a rolling update, there will be nothing to wait for + if ds.Spec.UpdateStrategy.Type != appsv1.RollingUpdateDaemonSetStrategyType { + return true + } + + // Make sure all the updated pods have been scheduled + if ds.Status.UpdatedNumberScheduled != ds.Status.DesiredNumberScheduled { + log.Printf("DaemonSet is not ready: %s/%s. %d out of %d expected pods have been scheduled", ds.Namespace, ds.Name, ds.Status.UpdatedNumberScheduled, ds.Status.DesiredNumberScheduled) + return false + } + maxUnavailable, err := intstr.GetValueFromIntOrPercent(ds.Spec.UpdateStrategy.RollingUpdate.MaxUnavailable, int(ds.Status.DesiredNumberScheduled), true) + if err != nil { + // If for some reason the value is invalid, set max unavailable to the + // number of desired replicas. This is the same behavior as the + // `MaxUnavailable` function in deploymentutil + maxUnavailable = int(ds.Status.DesiredNumberScheduled) + } + + expectedReady := int(ds.Status.DesiredNumberScheduled) - maxUnavailable + if !(int(ds.Status.NumberReady) >= expectedReady) { + log.Printf("DaemonSet is not ready: %s/%s. %d out of %d expected pods are ready", ds.Namespace, ds.Name, ds.Status.NumberReady, expectedReady) + return false + } + return true +} + +// Because the v1 extensions API is not available on all supported k8s versions +// yet and because Go doesn't support generics, we need to have a duplicate +// function to support the v1beta1 types +func (c *ReadyChecker) crdBetaReady(crd apiextv1beta1.CustomResourceDefinition) bool { + for _, cond := range crd.Status.Conditions { + switch cond.Type { + case apiextv1beta1.Established: + if cond.Status == apiextv1beta1.ConditionTrue { + return true + } + case apiextv1beta1.NamesAccepted: + if cond.Status == apiextv1beta1.ConditionFalse { + // This indicates a naming conflict, but it's probably not the + // job of this function to fail because of that. Instead, + // we treat it as a success, since the process should be able to + // continue. + return true + } + } + } + return false +} + +func (c *ReadyChecker) crdReady(crd apiextv1.CustomResourceDefinition) bool { + for _, cond := range crd.Status.Conditions { + switch cond.Type { + case apiextv1.Established: + if cond.Status == apiextv1.ConditionTrue { + return true + } + case apiextv1.NamesAccepted: + if cond.Status == apiextv1.ConditionFalse { + // This indicates a naming conflict, but it's probably not the + // job of this function to fail because of that. Instead, + // we treat it as a success, since the process should be able to + // continue. + return true + } + } + } + return false +} + +func (c *ReadyChecker) statefulSetReady(sts *appsv1.StatefulSet) bool { + // If the update strategy is not a rolling update, there will be nothing to wait for + if sts.Spec.UpdateStrategy.Type != appsv1.RollingUpdateStatefulSetStrategyType { + return true + } + + // Dereference all the pointers because StatefulSets like them + var partition int + // 1 is the default for replicas if not set + var replicas = 1 + // For some reason, even if the update strategy is a rolling update, the + // actual rollingUpdate field can be nil. If it is, we can safely assume + // there is no partition value + if sts.Spec.UpdateStrategy.RollingUpdate != nil && sts.Spec.UpdateStrategy.RollingUpdate.Partition != nil { + partition = int(*sts.Spec.UpdateStrategy.RollingUpdate.Partition) + } + if sts.Spec.Replicas != nil { + replicas = int(*sts.Spec.Replicas) + } + + // Because an update strategy can use partitioning, we need to calculate the + // number of updated replicas we should have. For example, if the replicas + // is set to 3 and the partition is 2, we'd expect only one pod to be + // updated + expectedReplicas := replicas - partition + + // Make sure all the updated pods have been scheduled + if int(sts.Status.UpdatedReplicas) != expectedReplicas { + log.Printf("StatefulSet is not ready: %s/%s. %d out of %d expected pods have been scheduled", sts.Namespace, sts.Name, sts.Status.UpdatedReplicas, expectedReplicas) + return false + } + + if int(sts.Status.ReadyReplicas) != replicas { + log.Printf("StatefulSet is not ready: %s/%s. %d out of %d expected pods are ready", sts.Namespace, sts.Name, sts.Status.ReadyReplicas, replicas) + return false + } + return true +} + +func getPods(ctx context.Context, client kubernetes.Interface, namespace, selector string) ([]corev1.Pod, error) { + list, err := client.CoreV1().Pods(namespace).List(ctx, metav1.ListOptions{ + LabelSelector: selector, + }) + return list.Items, err +} diff --git a/src/k8splugin/internal/statuscheck/ready_test.go b/src/k8splugin/internal/statuscheck/ready_test.go new file mode 100644 index 00000000..e1db16f4 --- /dev/null +++ b/src/k8splugin/internal/statuscheck/ready_test.go @@ -0,0 +1,517 @@ +/* +Copyright The Helm Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package statuscheck // import "helm.sh/helm/v3/pkg/kube" + +import ( + "context" + "testing" + + appsv1 "k8s.io/api/apps/v1" + batchv1 "k8s.io/api/batch/v1" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/util/intstr" + "k8s.io/client-go/kubernetes/fake" +) + +const defaultNamespace = metav1.NamespaceDefault + +func Test_ReadyChecker_deploymentReady(t *testing.T) { + type args struct { + rs *appsv1.ReplicaSet + dep *appsv1.Deployment + } + tests := []struct { + name string + args args + want bool + }{ + { + name: "deployment is ready", + args: args{ + rs: newReplicaSet("foo", 1, 1), + dep: newDeployment("foo", 1, 1, 0), + }, + want: true, + }, + { + name: "deployment is not ready", + args: args{ + rs: newReplicaSet("foo", 0, 0), + dep: newDeployment("foo", 1, 1, 0), + }, + want: false, + }, + { + name: "deployment is ready when maxUnavailable is set", + args: args{ + rs: newReplicaSet("foo", 2, 1), + dep: newDeployment("foo", 2, 1, 1), + }, + want: true, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + c := NewReadyChecker(fake.NewSimpleClientset(), PausedAsReady(false), CheckJobs(false)) + if got := c.deploymentReady(tt.args.rs, tt.args.dep); got != tt.want { + t.Errorf("deploymentReady() = %v, want %v", got, tt.want) + } + }) + } +} + +func Test_ReadyChecker_daemonSetReady(t *testing.T) { + type args struct { + ds *appsv1.DaemonSet + } + tests := []struct { + name string + args args + want bool + }{ + { + name: "daemonset is ready", + args: args{ + ds: newDaemonSet("foo", 0, 1, 1, 1), + }, + want: true, + }, + { + name: "daemonset is not ready", + args: args{ + ds: newDaemonSet("foo", 0, 0, 1, 1), + }, + want: false, + }, + { + name: "daemonset pods have not been scheduled successfully", + args: args{ + ds: newDaemonSet("foo", 0, 0, 1, 0), + }, + want: false, + }, + { + name: "daemonset is ready when maxUnavailable is set", + args: args{ + ds: newDaemonSet("foo", 1, 1, 2, 2), + }, + want: true, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + c := NewReadyChecker(fake.NewSimpleClientset(), PausedAsReady(false), CheckJobs(false)) + if got := c.daemonSetReady(tt.args.ds); got != tt.want { + t.Errorf("daemonSetReady() = %v, want %v", got, tt.want) + } + }) + } +} + +func Test_ReadyChecker_statefulSetReady(t *testing.T) { + type args struct { + sts *appsv1.StatefulSet + } + tests := []struct { + name string + args args + want bool + }{ + { + name: "statefulset is ready", + args: args{ + sts: newStatefulSet("foo", 1, 0, 1, 1), + }, + want: true, + }, + { + name: "statefulset is not ready", + args: args{ + sts: newStatefulSet("foo", 1, 0, 0, 1), + }, + want: false, + }, + { + name: "statefulset is ready when partition is specified", + args: args{ + sts: newStatefulSet("foo", 2, 1, 2, 1), + }, + want: true, + }, + { + name: "statefulset is not ready when partition is set", + args: args{ + sts: newStatefulSet("foo", 1, 1, 1, 1), + }, + want: false, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + c := NewReadyChecker(fake.NewSimpleClientset(), PausedAsReady(false), CheckJobs(false)) + if got := c.statefulSetReady(tt.args.sts); got != tt.want { + t.Errorf("statefulSetReady() = %v, want %v", got, tt.want) + } + }) + } +} + +func Test_ReadyChecker_podsReadyForObject(t *testing.T) { + type args struct { + namespace string + obj runtime.Object + } + tests := []struct { + name string + args args + existPods []corev1.Pod + want bool + wantErr bool + }{ + { + name: "pods ready for a replicaset", + args: args{ + namespace: defaultNamespace, + obj: newReplicaSet("foo", 1, 1), + }, + existPods: []corev1.Pod{ + *newPodWithCondition("foo", corev1.ConditionTrue), + }, + want: true, + wantErr: false, + }, + { + name: "pods not ready for a replicaset", + args: args{ + namespace: defaultNamespace, + obj: newReplicaSet("foo", 1, 1), + }, + existPods: []corev1.Pod{ + *newPodWithCondition("foo", corev1.ConditionFalse), + }, + want: false, + wantErr: false, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + c := NewReadyChecker(fake.NewSimpleClientset(), PausedAsReady(false), CheckJobs(false)) + for _, pod := range tt.existPods { + if _, err := c.client.CoreV1().Pods(defaultNamespace).Create(context.TODO(), &pod, metav1.CreateOptions{}); err != nil { + t.Errorf("Failed to create Pod error: %v", err) + return + } + } + got, err := c.podsReadyForObject(context.TODO(), tt.args.namespace, tt.args.obj) + if (err != nil) != tt.wantErr { + t.Errorf("podsReadyForObject() error = %v, wantErr %v", err, tt.wantErr) + return + } + if got != tt.want { + t.Errorf("podsReadyForObject() got = %v, want %v", got, tt.want) + } + }) + } +} + +func Test_ReadyChecker_jobReady(t *testing.T) { + type args struct { + job *batchv1.Job + } + tests := []struct { + name string + args args + want bool + }{ + { + name: "job is completed", + args: args{job: newJob("foo", 1, 1, 1, 0)}, + want: true, + }, + { + name: "job is incomplete", + args: args{job: newJob("foo", 1, 1, 0, 0)}, + want: false, + }, + { + name: "job is failed", + args: args{job: newJob("foo", 1, 1, 0, 1)}, + want: false, + }, + { + name: "job is completed with retry", + args: args{job: newJob("foo", 1, 1, 1, 1)}, + want: true, + }, + { + name: "job is failed with retry", + args: args{job: newJob("foo", 1, 1, 0, 2)}, + want: false, + }, + { + name: "job is completed single run", + args: args{job: newJob("foo", 0, 1, 1, 0)}, + want: true, + }, + { + name: "job is failed single run", + args: args{job: newJob("foo", 0, 1, 0, 1)}, + want: false, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + c := NewReadyChecker(fake.NewSimpleClientset(), PausedAsReady(false), CheckJobs(false)) + if got := c.jobReady(tt.args.job); got != tt.want { + t.Errorf("jobReady() = %v, want %v", got, tt.want) + } + }) + } +} + +func Test_ReadyChecker_volumeReady(t *testing.T) { + type args struct { + v *corev1.PersistentVolumeClaim + } + tests := []struct { + name string + args args + want bool + }{ + { + name: "pvc is bound", + args: args{ + v: newPersistentVolumeClaim("foo", corev1.ClaimBound), + }, + want: true, + }, + { + name: "pvc is not ready", + args: args{ + v: newPersistentVolumeClaim("foo", corev1.ClaimPending), + }, + want: false, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + c := NewReadyChecker(fake.NewSimpleClientset(), PausedAsReady(false), CheckJobs(false)) + if got := c.volumeReady(tt.args.v); got != tt.want { + t.Errorf("volumeReady() = %v, want %v", got, tt.want) + } + }) + } +} + +func newDaemonSet(name string, maxUnavailable, numberReady, desiredNumberScheduled, updatedNumberScheduled int) *appsv1.DaemonSet { + return &appsv1.DaemonSet{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + Namespace: defaultNamespace, + }, + Spec: appsv1.DaemonSetSpec{ + UpdateStrategy: appsv1.DaemonSetUpdateStrategy{ + Type: appsv1.RollingUpdateDaemonSetStrategyType, + RollingUpdate: &appsv1.RollingUpdateDaemonSet{ + MaxUnavailable: func() *intstr.IntOrString { i := intstr.FromInt(maxUnavailable); return &i }(), + }, + }, + Selector: &metav1.LabelSelector{MatchLabels: map[string]string{"name": name}}, + Template: corev1.PodTemplateSpec{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + Labels: map[string]string{"name": name}, + }, + Spec: corev1.PodSpec{ + Containers: []corev1.Container{ + { + Image: "nginx", + }, + }, + }, + }, + }, + Status: appsv1.DaemonSetStatus{ + DesiredNumberScheduled: int32(desiredNumberScheduled), + NumberReady: int32(numberReady), + UpdatedNumberScheduled: int32(updatedNumberScheduled), + }, + } +} + +func newStatefulSet(name string, replicas, partition, readyReplicas, updatedReplicas int) *appsv1.StatefulSet { + return &appsv1.StatefulSet{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + Namespace: defaultNamespace, + }, + Spec: appsv1.StatefulSetSpec{ + UpdateStrategy: appsv1.StatefulSetUpdateStrategy{ + Type: appsv1.RollingUpdateStatefulSetStrategyType, + RollingUpdate: &appsv1.RollingUpdateStatefulSetStrategy{ + Partition: intToInt32(partition), + }, + }, + Replicas: intToInt32(replicas), + Selector: &metav1.LabelSelector{MatchLabels: map[string]string{"name": name}}, + Template: corev1.PodTemplateSpec{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + Labels: map[string]string{"name": name}, + }, + Spec: corev1.PodSpec{ + Containers: []corev1.Container{ + { + Image: "nginx", + }, + }, + }, + }, + }, + Status: appsv1.StatefulSetStatus{ + UpdatedReplicas: int32(updatedReplicas), + ReadyReplicas: int32(readyReplicas), + }, + } +} + +func newDeployment(name string, replicas, maxSurge, maxUnavailable int) *appsv1.Deployment { + return &appsv1.Deployment{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + Namespace: defaultNamespace, + }, + Spec: appsv1.DeploymentSpec{ + Strategy: appsv1.DeploymentStrategy{ + Type: appsv1.RollingUpdateDeploymentStrategyType, + RollingUpdate: &appsv1.RollingUpdateDeployment{ + MaxUnavailable: func() *intstr.IntOrString { i := intstr.FromInt(maxUnavailable); return &i }(), + MaxSurge: func() *intstr.IntOrString { i := intstr.FromInt(maxSurge); return &i }(), + }, + }, + Replicas: intToInt32(replicas), + Selector: &metav1.LabelSelector{MatchLabels: map[string]string{"name": name}}, + Template: corev1.PodTemplateSpec{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + Labels: map[string]string{"name": name}, + }, + Spec: corev1.PodSpec{ + Containers: []corev1.Container{ + { + Image: "nginx", + }, + }, + }, + }, + }, + } +} + +func newReplicaSet(name string, replicas int, readyReplicas int) *appsv1.ReplicaSet { + d := newDeployment(name, replicas, 0, 0) + return &appsv1.ReplicaSet{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + Namespace: defaultNamespace, + Labels: d.Spec.Selector.MatchLabels, + OwnerReferences: []metav1.OwnerReference{*metav1.NewControllerRef(d, d.GroupVersionKind())}, + }, + Spec: appsv1.ReplicaSetSpec{ + Selector: d.Spec.Selector, + Replicas: intToInt32(replicas), + Template: d.Spec.Template, + }, + Status: appsv1.ReplicaSetStatus{ + ReadyReplicas: int32(readyReplicas), + }, + } +} + +func newPodWithCondition(name string, podReadyCondition corev1.ConditionStatus) *corev1.Pod { + return &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + Namespace: defaultNamespace, + Labels: map[string]string{"name": name}, + }, + Spec: corev1.PodSpec{ + Containers: []corev1.Container{ + { + Image: "nginx", + }, + }, + }, + Status: corev1.PodStatus{ + Conditions: []corev1.PodCondition{ + { + Type: corev1.PodReady, + Status: podReadyCondition, + }, + }, + }, + } +} + +func newPersistentVolumeClaim(name string, phase corev1.PersistentVolumeClaimPhase) *corev1.PersistentVolumeClaim { + return &corev1.PersistentVolumeClaim{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + Namespace: defaultNamespace, + }, + Status: corev1.PersistentVolumeClaimStatus{ + Phase: phase, + }, + } +} + +func newJob(name string, backoffLimit, completions, succeeded, failed int) *batchv1.Job { + return &batchv1.Job{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + Namespace: defaultNamespace, + }, + Spec: batchv1.JobSpec{ + BackoffLimit: intToInt32(backoffLimit), + Completions: intToInt32(completions), + Template: corev1.PodTemplateSpec{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + Labels: map[string]string{"name": name}, + }, + Spec: corev1.PodSpec{ + Containers: []corev1.Container{ + { + Image: "nginx", + }, + }, + }, + }, + }, + Status: batchv1.JobStatus{ + Succeeded: int32(succeeded), + Failed: int32(failed), + }, + } +} + +func intToInt32(i int) *int32 { + i32 := int32(i) + return &i32 +} diff --git a/src/k8splugin/internal/statuscheck/resource.go b/src/k8splugin/internal/statuscheck/resource.go new file mode 100644 index 00000000..598af2fb --- /dev/null +++ b/src/k8splugin/internal/statuscheck/resource.go @@ -0,0 +1,85 @@ +/* +Copyright The Helm Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package statuscheck // import "helm.sh/helm/v3/pkg/kube" + +import "k8s.io/cli-runtime/pkg/resource" + +// ResourceList provides convenience methods for comparing collections of Infos. +type ResourceList []*resource.Info + +// Append adds an Info to the Result. +func (r *ResourceList) Append(val *resource.Info) { + *r = append(*r, val) +} + +// Visit implements resource.Visitor. +func (r ResourceList) Visit(fn resource.VisitorFunc) error { + for _, i := range r { + if err := fn(i, nil); err != nil { + return err + } + } + return nil +} + +// Filter returns a new Result with Infos that satisfy the predicate fn. +func (r ResourceList) Filter(fn func(*resource.Info) bool) ResourceList { + var result ResourceList + for _, i := range r { + if fn(i) { + result.Append(i) + } + } + return result +} + +// Get returns the Info from the result that matches the name and kind. +func (r ResourceList) Get(info *resource.Info) *resource.Info { + for _, i := range r { + if isMatchingInfo(i, info) { + return i + } + } + return nil +} + +// Contains checks to see if an object exists. +func (r ResourceList) Contains(info *resource.Info) bool { + for _, i := range r { + if isMatchingInfo(i, info) { + return true + } + } + return false +} + +// Difference will return a new Result with objects not contained in rs. +func (r ResourceList) Difference(rs ResourceList) ResourceList { + return r.Filter(func(info *resource.Info) bool { + return !rs.Contains(info) + }) +} + +// Intersect will return a new Result with objects contained in both Results. +func (r ResourceList) Intersect(rs ResourceList) ResourceList { + return r.Filter(rs.Contains) +} + +// isMatchingInfo returns true if infos match on Name and GroupVersionKind. +func isMatchingInfo(a, b *resource.Info) bool { + return a.Name == b.Name && a.Namespace == b.Namespace && a.Mapping.GroupVersionKind.Kind == b.Mapping.GroupVersionKind.Kind +} diff --git a/src/k8splugin/internal/statuscheck/resource_test.go b/src/k8splugin/internal/statuscheck/resource_test.go new file mode 100644 index 00000000..532cebfd --- /dev/null +++ b/src/k8splugin/internal/statuscheck/resource_test.go @@ -0,0 +1,61 @@ +/* +Copyright The Helm Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package statuscheck // import "helm.sh/helm/v3/pkg/kube" + +import ( + "testing" + + "k8s.io/apimachinery/pkg/api/meta" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/cli-runtime/pkg/resource" +) + +func TestResourceList(t *testing.T) { + mapping := &meta.RESTMapping{ + Resource: schema.GroupVersionResource{Group: "group", Version: "version", Resource: "pod"}, + } + + info := func(name string) *resource.Info { + return &resource.Info{Name: name, Mapping: mapping} + } + + var r1, r2 ResourceList + r1 = []*resource.Info{info("foo"), info("bar")} + r2 = []*resource.Info{info("bar")} + + if r1.Get(info("bar")).Mapping.Resource.Resource != "pod" { + t.Error("expected get pod") + } + + diff := r1.Difference(r2) + if len(diff) != 1 { + t.Error("expected 1 result") + } + + if !diff.Contains(info("foo")) { + t.Error("expected diff to return foo") + } + + inter := r1.Intersect(r2) + if len(inter) != 1 { + t.Error("expected 1 result") + } + + if !inter.Contains(info("bar")) { + t.Error("expected intersect to return bar") + } +} diff --git a/src/k8splugin/internal/statuscheck/wait.go b/src/k8splugin/internal/statuscheck/wait.go new file mode 100644 index 00000000..41d90d91 --- /dev/null +++ b/src/k8splugin/internal/statuscheck/wait.go @@ -0,0 +1,109 @@ +/* +Copyright The Helm Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package statuscheck // import "helm.sh/helm/v3/pkg/kube" + +import ( + "context" + "fmt" + "time" + + "github.com/pkg/errors" + appsv1 "k8s.io/api/apps/v1" + appsv1beta1 "k8s.io/api/apps/v1beta1" + appsv1beta2 "k8s.io/api/apps/v1beta2" + batchv1 "k8s.io/api/batch/v1" + corev1 "k8s.io/api/core/v1" + extensionsv1beta1 "k8s.io/api/extensions/v1beta1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/labels" + "k8s.io/apimachinery/pkg/runtime" + + "k8s.io/apimachinery/pkg/util/wait" +) + +type waiter struct { + c ReadyChecker + timeout time.Duration + log func(string, ...interface{}) +} + +// waitForResources polls to get the current status of all pods, PVCs, Services and +// Jobs(optional) until all are ready or a timeout is reached +func (w *waiter) waitForResources(created ResourceList) error { + w.log("beginning wait for %d resources with timeout of %v", len(created), w.timeout) + + ctx, cancel := context.WithTimeout(context.Background(), w.timeout) + defer cancel() + + return wait.PollImmediateUntil(2*time.Second, func() (bool, error) { + for _, v := range created { + ready, err := w.c.IsReady(ctx, v) + if !ready || err != nil { + return false, err + } + } + return true, nil + }, ctx.Done()) +} + +// SelectorsForObject returns the pod label selector for a given object +// +// Modified version of https://github.com/kubernetes/kubernetes/blob/v1.14.1/pkg/kubectl/polymorphichelpers/helpers.go#L84 +func SelectorsForObject(object runtime.Object) (selector labels.Selector, err error) { + switch t := object.(type) { + case *extensionsv1beta1.ReplicaSet: + selector, err = metav1.LabelSelectorAsSelector(t.Spec.Selector) + case *appsv1.ReplicaSet: + selector, err = metav1.LabelSelectorAsSelector(t.Spec.Selector) + case *appsv1beta2.ReplicaSet: + selector, err = metav1.LabelSelectorAsSelector(t.Spec.Selector) + case *corev1.ReplicationController: + selector = labels.SelectorFromSet(t.Spec.Selector) + case *appsv1.StatefulSet: + selector, err = metav1.LabelSelectorAsSelector(t.Spec.Selector) + case *appsv1beta1.StatefulSet: + selector, err = metav1.LabelSelectorAsSelector(t.Spec.Selector) + case *appsv1beta2.StatefulSet: + selector, err = metav1.LabelSelectorAsSelector(t.Spec.Selector) + case *extensionsv1beta1.DaemonSet: + selector, err = metav1.LabelSelectorAsSelector(t.Spec.Selector) + case *appsv1.DaemonSet: + selector, err = metav1.LabelSelectorAsSelector(t.Spec.Selector) + case *appsv1beta2.DaemonSet: + selector, err = metav1.LabelSelectorAsSelector(t.Spec.Selector) + case *extensionsv1beta1.Deployment: + selector, err = metav1.LabelSelectorAsSelector(t.Spec.Selector) + case *appsv1.Deployment: + selector, err = metav1.LabelSelectorAsSelector(t.Spec.Selector) + case *appsv1beta1.Deployment: + selector, err = metav1.LabelSelectorAsSelector(t.Spec.Selector) + case *appsv1beta2.Deployment: + selector, err = metav1.LabelSelectorAsSelector(t.Spec.Selector) + case *batchv1.Job: + selector, err = metav1.LabelSelectorAsSelector(t.Spec.Selector) + case *corev1.Service: + if t.Spec.Selector == nil || len(t.Spec.Selector) == 0 { + return nil, fmt.Errorf("invalid service '%s': Service is defined without a selector", t.Name) + } + selector = labels.SelectorFromSet(t.Spec.Selector) + + default: + return nil, fmt.Errorf("selector for %T not implemented", object) + } + + return selector, errors.Wrap(err, "invalid label selector") +} |