aboutsummaryrefslogtreecommitdiffstats
path: root/src/k8splugin/internal/statuscheck
diff options
context:
space:
mode:
Diffstat (limited to 'src/k8splugin/internal/statuscheck')
-rw-r--r--src/k8splugin/internal/statuscheck/converter.go69
-rw-r--r--src/k8splugin/internal/statuscheck/ready.go393
-rw-r--r--src/k8splugin/internal/statuscheck/ready_test.go517
-rw-r--r--src/k8splugin/internal/statuscheck/resource.go85
-rw-r--r--src/k8splugin/internal/statuscheck/resource_test.go61
-rw-r--r--src/k8splugin/internal/statuscheck/wait.go109
6 files changed, 1234 insertions, 0 deletions
diff --git a/src/k8splugin/internal/statuscheck/converter.go b/src/k8splugin/internal/statuscheck/converter.go
new file mode 100644
index 00000000..8f411c41
--- /dev/null
+++ b/src/k8splugin/internal/statuscheck/converter.go
@@ -0,0 +1,69 @@
+/*
+Copyright The Helm Authors.
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package statuscheck // import "helm.sh/helm/v3/pkg/kube"
+
+import (
+ "sync"
+
+ apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
+ apiextensionsv1beta1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1beta1"
+ "k8s.io/apimachinery/pkg/api/meta"
+ "k8s.io/apimachinery/pkg/runtime"
+ "k8s.io/apimachinery/pkg/runtime/schema"
+ "k8s.io/cli-runtime/pkg/resource"
+ "k8s.io/client-go/kubernetes/scheme"
+)
+
+var k8sNativeScheme *runtime.Scheme
+var k8sNativeSchemeOnce sync.Once
+
+// AsVersioned converts the given info into a runtime.Object with the correct
+// group and version set
+func AsVersioned(info *resource.Info) runtime.Object {
+ return convertWithMapper(info.Object, info.Mapping)
+}
+
+// convertWithMapper converts the given object with the optional provided
+// RESTMapping. If no mapping is provided, the default schema versioner is used
+func convertWithMapper(obj runtime.Object, mapping *meta.RESTMapping) runtime.Object {
+ s := kubernetesNativeScheme()
+ var gv = runtime.GroupVersioner(schema.GroupVersions(s.PrioritizedVersionsAllGroups()))
+ if mapping != nil {
+ gv = mapping.GroupVersionKind.GroupVersion()
+ }
+ if obj, err := runtime.ObjectConvertor(s).ConvertToVersion(obj, gv); err == nil {
+ return obj
+ }
+ return obj
+}
+
+// kubernetesNativeScheme returns a clean *runtime.Scheme with _only_ Kubernetes
+// native resources added to it. This is required to break free of custom resources
+// that may have been added to scheme.Scheme due to Helm being used as a package in
+// combination with e.g. a versioned kube client. If we would not do this, the client
+// may attempt to perform e.g. a 3-way-merge strategy patch for custom resources.
+func kubernetesNativeScheme() *runtime.Scheme {
+ k8sNativeSchemeOnce.Do(func() {
+ k8sNativeScheme = runtime.NewScheme()
+ scheme.AddToScheme(k8sNativeScheme)
+ // API extensions are not in the above scheme set,
+ // and must thus be added separately.
+ apiextensionsv1beta1.AddToScheme(k8sNativeScheme)
+ apiextensionsv1.AddToScheme(k8sNativeScheme)
+ })
+ return k8sNativeScheme
+}
diff --git a/src/k8splugin/internal/statuscheck/ready.go b/src/k8splugin/internal/statuscheck/ready.go
new file mode 100644
index 00000000..7bea536a
--- /dev/null
+++ b/src/k8splugin/internal/statuscheck/ready.go
@@ -0,0 +1,393 @@
+/*
+Copyright The Helm Authors.
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package statuscheck // import "helm.sh/helm/v3/pkg/kube"
+
+import (
+ "context"
+ "github.com/onap/multicloud-k8s/src/k8splugin/internal/utils"
+ appsv1 "k8s.io/api/apps/v1"
+ appsv1beta1 "k8s.io/api/apps/v1beta1"
+ appsv1beta2 "k8s.io/api/apps/v1beta2"
+ batchv1 "k8s.io/api/batch/v1"
+ corev1 "k8s.io/api/core/v1"
+ extensionsv1beta1 "k8s.io/api/extensions/v1beta1"
+ apiextv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
+ apiextv1beta1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1beta1"
+ metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
+ "k8s.io/apimachinery/pkg/runtime"
+ "k8s.io/apimachinery/pkg/util/intstr"
+ "k8s.io/cli-runtime/pkg/resource"
+ "k8s.io/client-go/kubernetes"
+ "k8s.io/client-go/kubernetes/scheme"
+ "log"
+)
+
+// ReadyCheckerOption is a function that configures a ReadyChecker.
+type ReadyCheckerOption func(*ReadyChecker)
+
+// PausedAsReady returns a ReadyCheckerOption that configures a ReadyChecker
+// to consider paused resources to be ready. For example a Deployment
+// with spec.paused equal to true would be considered ready.
+func PausedAsReady(pausedAsReady bool) ReadyCheckerOption {
+ return func(c *ReadyChecker) {
+ c.pausedAsReady = pausedAsReady
+ }
+}
+
+// CheckJobs returns a ReadyCheckerOption that configures a ReadyChecker
+// to consider readiness of Job resources.
+func CheckJobs(checkJobs bool) ReadyCheckerOption {
+ return func(c *ReadyChecker) {
+ c.checkJobs = checkJobs
+ }
+}
+
+// NewReadyChecker creates a new checker. Passed ReadyCheckerOptions can
+// be used to override defaults.
+func NewReadyChecker(cl kubernetes.Interface, opts ...ReadyCheckerOption) ReadyChecker {
+ c := ReadyChecker{
+ client: cl,
+ }
+
+ for _, opt := range opts {
+ opt(&c)
+ }
+
+ return c
+}
+
+// ReadyChecker is a type that can check core Kubernetes types for readiness.
+type ReadyChecker struct {
+ client kubernetes.Interface
+ checkJobs bool
+ pausedAsReady bool
+}
+
+// IsReady checks if v is ready. It supports checking readiness for pods,
+// deployments, persistent volume claims, services, daemon sets, custom
+// resource definitions, stateful sets, replication controllers, and replica
+// sets. All other resource kinds are always considered ready.
+//
+// IsReady will fetch the latest state of the object from the server prior to
+// performing readiness checks, and it will return any error encountered.
+func (c *ReadyChecker) IsReady(ctx context.Context, v *resource.Info) (bool, error) {
+ var (
+ // This defaults to true, otherwise we get to a point where
+ // things will always return false unless one of the objects
+ // that manages pods has been hit
+ ok = true
+ err error
+ )
+ switch value := AsVersioned(v).(type) {
+ case *corev1.Pod:
+ pod, err := c.client.CoreV1().Pods(v.Namespace).Get(ctx, v.Name, metav1.GetOptions{})
+ if err != nil || !c.isPodReady(pod) {
+ return false, err
+ }
+ case *batchv1.Job:
+ if c.checkJobs {
+ job, err := c.client.BatchV1().Jobs(v.Namespace).Get(ctx, v.Name, metav1.GetOptions{})
+ if err != nil || !c.jobReady(job) {
+ return false, err
+ }
+ }
+ case *appsv1.Deployment, *appsv1beta1.Deployment, *appsv1beta2.Deployment, *extensionsv1beta1.Deployment:
+ currentDeployment, err := c.client.AppsV1().Deployments(v.Namespace).Get(ctx, v.Name, metav1.GetOptions{})
+ if err != nil {
+ return false, err
+ }
+ // If paused deployment will never be ready
+ if currentDeployment.Spec.Paused {
+ return c.pausedAsReady, nil
+ }
+ // Find RS associated with deployment
+ newReplicaSet, err := utils.GetNewReplicaSet(currentDeployment, c.client.AppsV1())
+ if err != nil || newReplicaSet == nil {
+ return false, err
+ }
+ if !c.deploymentReady(newReplicaSet, currentDeployment) {
+ return false, nil
+ }
+ case *corev1.PersistentVolumeClaim:
+ claim, err := c.client.CoreV1().PersistentVolumeClaims(v.Namespace).Get(ctx, v.Name, metav1.GetOptions{})
+ if err != nil {
+ return false, err
+ }
+ if !c.volumeReady(claim) {
+ return false, nil
+ }
+ case *corev1.Service:
+ svc, err := c.client.CoreV1().Services(v.Namespace).Get(ctx, v.Name, metav1.GetOptions{})
+ if err != nil {
+ return false, err
+ }
+ if !c.serviceReady(svc) {
+ return false, nil
+ }
+ case *extensionsv1beta1.DaemonSet, *appsv1.DaemonSet, *appsv1beta2.DaemonSet:
+ ds, err := c.client.AppsV1().DaemonSets(v.Namespace).Get(ctx, v.Name, metav1.GetOptions{})
+ if err != nil {
+ return false, err
+ }
+ if !c.daemonSetReady(ds) {
+ return false, nil
+ }
+ case *apiextv1beta1.CustomResourceDefinition:
+ if err := v.Get(); err != nil {
+ return false, err
+ }
+ crd := &apiextv1beta1.CustomResourceDefinition{}
+ if err := scheme.Scheme.Convert(v.Object, crd, nil); err != nil {
+ return false, err
+ }
+ if !c.crdBetaReady(*crd) {
+ return false, nil
+ }
+ case *apiextv1.CustomResourceDefinition:
+ if err := v.Get(); err != nil {
+ return false, err
+ }
+ crd := &apiextv1.CustomResourceDefinition{}
+ if err := scheme.Scheme.Convert(v.Object, crd, nil); err != nil {
+ return false, err
+ }
+ if !c.crdReady(*crd) {
+ return false, nil
+ }
+ case *appsv1.StatefulSet, *appsv1beta1.StatefulSet, *appsv1beta2.StatefulSet:
+ sts, err := c.client.AppsV1().StatefulSets(v.Namespace).Get(ctx, v.Name, metav1.GetOptions{})
+ if err != nil {
+ return false, err
+ }
+ if !c.statefulSetReady(sts) {
+ return false, nil
+ }
+ case *corev1.ReplicationController, *extensionsv1beta1.ReplicaSet, *appsv1beta2.ReplicaSet, *appsv1.ReplicaSet:
+ ok, err = c.podsReadyForObject(ctx, v.Namespace, value)
+ }
+ if !ok || err != nil {
+ return false, err
+ }
+ return true, nil
+}
+
+func (c *ReadyChecker) podsReadyForObject(ctx context.Context, namespace string, obj runtime.Object) (bool, error) {
+ pods, err := c.podsforObject(ctx, namespace, obj)
+ if err != nil {
+ return false, err
+ }
+ for _, pod := range pods {
+ if !c.isPodReady(&pod) {
+ return false, nil
+ }
+ }
+ return true, nil
+}
+
+func (c *ReadyChecker) podsforObject(ctx context.Context, namespace string, obj runtime.Object) ([]corev1.Pod, error) {
+ selector, err := SelectorsForObject(obj)
+ if err != nil {
+ return nil, err
+ }
+ list, err := getPods(ctx, c.client, namespace, selector.String())
+ return list, err
+}
+
+// isPodReady returns true if a pod is ready; false otherwise.
+func (c *ReadyChecker) isPodReady(pod *corev1.Pod) bool {
+ for _, c := range pod.Status.Conditions {
+ if c.Type == corev1.PodReady && c.Status == corev1.ConditionTrue {
+ return true
+ }
+ }
+ log.Printf("Pod is not ready: %s/%s", pod.GetNamespace(), pod.GetName())
+ return false
+}
+
+func (c *ReadyChecker) jobReady(job *batchv1.Job) bool {
+ if job.Status.Failed > *job.Spec.BackoffLimit {
+ log.Printf("Job is failed: %s/%s", job.GetNamespace(), job.GetName())
+ return false
+ }
+ if job.Status.Succeeded < *job.Spec.Completions {
+ log.Printf("Job is not completed: %s/%s", job.GetNamespace(), job.GetName())
+ return false
+ }
+ return true
+}
+
+func (c *ReadyChecker) serviceReady(s *corev1.Service) bool {
+ // ExternalName Services are external to cluster so helm shouldn't be checking to see if they're 'ready' (i.e. have an IP Set)
+ if s.Spec.Type == corev1.ServiceTypeExternalName {
+ return true
+ }
+
+ // Ensure that the service cluster IP is not empty
+ if s.Spec.ClusterIP == "" {
+ log.Printf("Service does not have cluster IP address: %s/%s", s.GetNamespace(), s.GetName())
+ return false
+ }
+
+ // This checks if the service has a LoadBalancer and that balancer has an Ingress defined
+ if s.Spec.Type == corev1.ServiceTypeLoadBalancer {
+ // do not wait when at least 1 external IP is set
+ if len(s.Spec.ExternalIPs) > 0 {
+ log.Printf("Service %s/%s has external IP addresses (%v), marking as ready", s.GetNamespace(), s.GetName(), s.Spec.ExternalIPs)
+ return true
+ }
+
+ if s.Status.LoadBalancer.Ingress == nil {
+ log.Printf("Service does not have load balancer ingress IP address: %s/%s", s.GetNamespace(), s.GetName())
+ return false
+ }
+ }
+
+ return true
+}
+
+func (c *ReadyChecker) volumeReady(v *corev1.PersistentVolumeClaim) bool {
+ if v.Status.Phase != corev1.ClaimBound {
+ log.Printf("PersistentVolumeClaim is not bound: %s/%s", v.GetNamespace(), v.GetName())
+ return false
+ }
+ return true
+}
+
+func (c *ReadyChecker) deploymentReady(rs *appsv1.ReplicaSet, dep *appsv1.Deployment) bool {
+ expectedReady := *dep.Spec.Replicas - utils.MaxUnavailable(*dep)
+ if !(rs.Status.ReadyReplicas >= expectedReady) {
+ log.Printf("Deployment is not ready: %s/%s. %d out of %d expected pods are ready", dep.Namespace, dep.Name, rs.Status.ReadyReplicas, expectedReady)
+ return false
+ }
+ return true
+}
+
+func (c *ReadyChecker) daemonSetReady(ds *appsv1.DaemonSet) bool {
+ // If the update strategy is not a rolling update, there will be nothing to wait for
+ if ds.Spec.UpdateStrategy.Type != appsv1.RollingUpdateDaemonSetStrategyType {
+ return true
+ }
+
+ // Make sure all the updated pods have been scheduled
+ if ds.Status.UpdatedNumberScheduled != ds.Status.DesiredNumberScheduled {
+ log.Printf("DaemonSet is not ready: %s/%s. %d out of %d expected pods have been scheduled", ds.Namespace, ds.Name, ds.Status.UpdatedNumberScheduled, ds.Status.DesiredNumberScheduled)
+ return false
+ }
+ maxUnavailable, err := intstr.GetValueFromIntOrPercent(ds.Spec.UpdateStrategy.RollingUpdate.MaxUnavailable, int(ds.Status.DesiredNumberScheduled), true)
+ if err != nil {
+ // If for some reason the value is invalid, set max unavailable to the
+ // number of desired replicas. This is the same behavior as the
+ // `MaxUnavailable` function in deploymentutil
+ maxUnavailable = int(ds.Status.DesiredNumberScheduled)
+ }
+
+ expectedReady := int(ds.Status.DesiredNumberScheduled) - maxUnavailable
+ if !(int(ds.Status.NumberReady) >= expectedReady) {
+ log.Printf("DaemonSet is not ready: %s/%s. %d out of %d expected pods are ready", ds.Namespace, ds.Name, ds.Status.NumberReady, expectedReady)
+ return false
+ }
+ return true
+}
+
+// Because the v1 extensions API is not available on all supported k8s versions
+// yet and because Go doesn't support generics, we need to have a duplicate
+// function to support the v1beta1 types
+func (c *ReadyChecker) crdBetaReady(crd apiextv1beta1.CustomResourceDefinition) bool {
+ for _, cond := range crd.Status.Conditions {
+ switch cond.Type {
+ case apiextv1beta1.Established:
+ if cond.Status == apiextv1beta1.ConditionTrue {
+ return true
+ }
+ case apiextv1beta1.NamesAccepted:
+ if cond.Status == apiextv1beta1.ConditionFalse {
+ // This indicates a naming conflict, but it's probably not the
+ // job of this function to fail because of that. Instead,
+ // we treat it as a success, since the process should be able to
+ // continue.
+ return true
+ }
+ }
+ }
+ return false
+}
+
+func (c *ReadyChecker) crdReady(crd apiextv1.CustomResourceDefinition) bool {
+ for _, cond := range crd.Status.Conditions {
+ switch cond.Type {
+ case apiextv1.Established:
+ if cond.Status == apiextv1.ConditionTrue {
+ return true
+ }
+ case apiextv1.NamesAccepted:
+ if cond.Status == apiextv1.ConditionFalse {
+ // This indicates a naming conflict, but it's probably not the
+ // job of this function to fail because of that. Instead,
+ // we treat it as a success, since the process should be able to
+ // continue.
+ return true
+ }
+ }
+ }
+ return false
+}
+
+func (c *ReadyChecker) statefulSetReady(sts *appsv1.StatefulSet) bool {
+ // If the update strategy is not a rolling update, there will be nothing to wait for
+ if sts.Spec.UpdateStrategy.Type != appsv1.RollingUpdateStatefulSetStrategyType {
+ return true
+ }
+
+ // Dereference all the pointers because StatefulSets like them
+ var partition int
+ // 1 is the default for replicas if not set
+ var replicas = 1
+ // For some reason, even if the update strategy is a rolling update, the
+ // actual rollingUpdate field can be nil. If it is, we can safely assume
+ // there is no partition value
+ if sts.Spec.UpdateStrategy.RollingUpdate != nil && sts.Spec.UpdateStrategy.RollingUpdate.Partition != nil {
+ partition = int(*sts.Spec.UpdateStrategy.RollingUpdate.Partition)
+ }
+ if sts.Spec.Replicas != nil {
+ replicas = int(*sts.Spec.Replicas)
+ }
+
+ // Because an update strategy can use partitioning, we need to calculate the
+ // number of updated replicas we should have. For example, if the replicas
+ // is set to 3 and the partition is 2, we'd expect only one pod to be
+ // updated
+ expectedReplicas := replicas - partition
+
+ // Make sure all the updated pods have been scheduled
+ if int(sts.Status.UpdatedReplicas) != expectedReplicas {
+ log.Printf("StatefulSet is not ready: %s/%s. %d out of %d expected pods have been scheduled", sts.Namespace, sts.Name, sts.Status.UpdatedReplicas, expectedReplicas)
+ return false
+ }
+
+ if int(sts.Status.ReadyReplicas) != replicas {
+ log.Printf("StatefulSet is not ready: %s/%s. %d out of %d expected pods are ready", sts.Namespace, sts.Name, sts.Status.ReadyReplicas, replicas)
+ return false
+ }
+ return true
+}
+
+func getPods(ctx context.Context, client kubernetes.Interface, namespace, selector string) ([]corev1.Pod, error) {
+ list, err := client.CoreV1().Pods(namespace).List(ctx, metav1.ListOptions{
+ LabelSelector: selector,
+ })
+ return list.Items, err
+}
diff --git a/src/k8splugin/internal/statuscheck/ready_test.go b/src/k8splugin/internal/statuscheck/ready_test.go
new file mode 100644
index 00000000..e1db16f4
--- /dev/null
+++ b/src/k8splugin/internal/statuscheck/ready_test.go
@@ -0,0 +1,517 @@
+/*
+Copyright The Helm Authors.
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package statuscheck // import "helm.sh/helm/v3/pkg/kube"
+
+import (
+ "context"
+ "testing"
+
+ appsv1 "k8s.io/api/apps/v1"
+ batchv1 "k8s.io/api/batch/v1"
+ corev1 "k8s.io/api/core/v1"
+ metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
+ "k8s.io/apimachinery/pkg/runtime"
+ "k8s.io/apimachinery/pkg/util/intstr"
+ "k8s.io/client-go/kubernetes/fake"
+)
+
+const defaultNamespace = metav1.NamespaceDefault
+
+func Test_ReadyChecker_deploymentReady(t *testing.T) {
+ type args struct {
+ rs *appsv1.ReplicaSet
+ dep *appsv1.Deployment
+ }
+ tests := []struct {
+ name string
+ args args
+ want bool
+ }{
+ {
+ name: "deployment is ready",
+ args: args{
+ rs: newReplicaSet("foo", 1, 1),
+ dep: newDeployment("foo", 1, 1, 0),
+ },
+ want: true,
+ },
+ {
+ name: "deployment is not ready",
+ args: args{
+ rs: newReplicaSet("foo", 0, 0),
+ dep: newDeployment("foo", 1, 1, 0),
+ },
+ want: false,
+ },
+ {
+ name: "deployment is ready when maxUnavailable is set",
+ args: args{
+ rs: newReplicaSet("foo", 2, 1),
+ dep: newDeployment("foo", 2, 1, 1),
+ },
+ want: true,
+ },
+ }
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ c := NewReadyChecker(fake.NewSimpleClientset(), PausedAsReady(false), CheckJobs(false))
+ if got := c.deploymentReady(tt.args.rs, tt.args.dep); got != tt.want {
+ t.Errorf("deploymentReady() = %v, want %v", got, tt.want)
+ }
+ })
+ }
+}
+
+func Test_ReadyChecker_daemonSetReady(t *testing.T) {
+ type args struct {
+ ds *appsv1.DaemonSet
+ }
+ tests := []struct {
+ name string
+ args args
+ want bool
+ }{
+ {
+ name: "daemonset is ready",
+ args: args{
+ ds: newDaemonSet("foo", 0, 1, 1, 1),
+ },
+ want: true,
+ },
+ {
+ name: "daemonset is not ready",
+ args: args{
+ ds: newDaemonSet("foo", 0, 0, 1, 1),
+ },
+ want: false,
+ },
+ {
+ name: "daemonset pods have not been scheduled successfully",
+ args: args{
+ ds: newDaemonSet("foo", 0, 0, 1, 0),
+ },
+ want: false,
+ },
+ {
+ name: "daemonset is ready when maxUnavailable is set",
+ args: args{
+ ds: newDaemonSet("foo", 1, 1, 2, 2),
+ },
+ want: true,
+ },
+ }
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ c := NewReadyChecker(fake.NewSimpleClientset(), PausedAsReady(false), CheckJobs(false))
+ if got := c.daemonSetReady(tt.args.ds); got != tt.want {
+ t.Errorf("daemonSetReady() = %v, want %v", got, tt.want)
+ }
+ })
+ }
+}
+
+func Test_ReadyChecker_statefulSetReady(t *testing.T) {
+ type args struct {
+ sts *appsv1.StatefulSet
+ }
+ tests := []struct {
+ name string
+ args args
+ want bool
+ }{
+ {
+ name: "statefulset is ready",
+ args: args{
+ sts: newStatefulSet("foo", 1, 0, 1, 1),
+ },
+ want: true,
+ },
+ {
+ name: "statefulset is not ready",
+ args: args{
+ sts: newStatefulSet("foo", 1, 0, 0, 1),
+ },
+ want: false,
+ },
+ {
+ name: "statefulset is ready when partition is specified",
+ args: args{
+ sts: newStatefulSet("foo", 2, 1, 2, 1),
+ },
+ want: true,
+ },
+ {
+ name: "statefulset is not ready when partition is set",
+ args: args{
+ sts: newStatefulSet("foo", 1, 1, 1, 1),
+ },
+ want: false,
+ },
+ }
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ c := NewReadyChecker(fake.NewSimpleClientset(), PausedAsReady(false), CheckJobs(false))
+ if got := c.statefulSetReady(tt.args.sts); got != tt.want {
+ t.Errorf("statefulSetReady() = %v, want %v", got, tt.want)
+ }
+ })
+ }
+}
+
+func Test_ReadyChecker_podsReadyForObject(t *testing.T) {
+ type args struct {
+ namespace string
+ obj runtime.Object
+ }
+ tests := []struct {
+ name string
+ args args
+ existPods []corev1.Pod
+ want bool
+ wantErr bool
+ }{
+ {
+ name: "pods ready for a replicaset",
+ args: args{
+ namespace: defaultNamespace,
+ obj: newReplicaSet("foo", 1, 1),
+ },
+ existPods: []corev1.Pod{
+ *newPodWithCondition("foo", corev1.ConditionTrue),
+ },
+ want: true,
+ wantErr: false,
+ },
+ {
+ name: "pods not ready for a replicaset",
+ args: args{
+ namespace: defaultNamespace,
+ obj: newReplicaSet("foo", 1, 1),
+ },
+ existPods: []corev1.Pod{
+ *newPodWithCondition("foo", corev1.ConditionFalse),
+ },
+ want: false,
+ wantErr: false,
+ },
+ }
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ c := NewReadyChecker(fake.NewSimpleClientset(), PausedAsReady(false), CheckJobs(false))
+ for _, pod := range tt.existPods {
+ if _, err := c.client.CoreV1().Pods(defaultNamespace).Create(context.TODO(), &pod, metav1.CreateOptions{}); err != nil {
+ t.Errorf("Failed to create Pod error: %v", err)
+ return
+ }
+ }
+ got, err := c.podsReadyForObject(context.TODO(), tt.args.namespace, tt.args.obj)
+ if (err != nil) != tt.wantErr {
+ t.Errorf("podsReadyForObject() error = %v, wantErr %v", err, tt.wantErr)
+ return
+ }
+ if got != tt.want {
+ t.Errorf("podsReadyForObject() got = %v, want %v", got, tt.want)
+ }
+ })
+ }
+}
+
+func Test_ReadyChecker_jobReady(t *testing.T) {
+ type args struct {
+ job *batchv1.Job
+ }
+ tests := []struct {
+ name string
+ args args
+ want bool
+ }{
+ {
+ name: "job is completed",
+ args: args{job: newJob("foo", 1, 1, 1, 0)},
+ want: true,
+ },
+ {
+ name: "job is incomplete",
+ args: args{job: newJob("foo", 1, 1, 0, 0)},
+ want: false,
+ },
+ {
+ name: "job is failed",
+ args: args{job: newJob("foo", 1, 1, 0, 1)},
+ want: false,
+ },
+ {
+ name: "job is completed with retry",
+ args: args{job: newJob("foo", 1, 1, 1, 1)},
+ want: true,
+ },
+ {
+ name: "job is failed with retry",
+ args: args{job: newJob("foo", 1, 1, 0, 2)},
+ want: false,
+ },
+ {
+ name: "job is completed single run",
+ args: args{job: newJob("foo", 0, 1, 1, 0)},
+ want: true,
+ },
+ {
+ name: "job is failed single run",
+ args: args{job: newJob("foo", 0, 1, 0, 1)},
+ want: false,
+ },
+ }
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ c := NewReadyChecker(fake.NewSimpleClientset(), PausedAsReady(false), CheckJobs(false))
+ if got := c.jobReady(tt.args.job); got != tt.want {
+ t.Errorf("jobReady() = %v, want %v", got, tt.want)
+ }
+ })
+ }
+}
+
+func Test_ReadyChecker_volumeReady(t *testing.T) {
+ type args struct {
+ v *corev1.PersistentVolumeClaim
+ }
+ tests := []struct {
+ name string
+ args args
+ want bool
+ }{
+ {
+ name: "pvc is bound",
+ args: args{
+ v: newPersistentVolumeClaim("foo", corev1.ClaimBound),
+ },
+ want: true,
+ },
+ {
+ name: "pvc is not ready",
+ args: args{
+ v: newPersistentVolumeClaim("foo", corev1.ClaimPending),
+ },
+ want: false,
+ },
+ }
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ c := NewReadyChecker(fake.NewSimpleClientset(), PausedAsReady(false), CheckJobs(false))
+ if got := c.volumeReady(tt.args.v); got != tt.want {
+ t.Errorf("volumeReady() = %v, want %v", got, tt.want)
+ }
+ })
+ }
+}
+
+func newDaemonSet(name string, maxUnavailable, numberReady, desiredNumberScheduled, updatedNumberScheduled int) *appsv1.DaemonSet {
+ return &appsv1.DaemonSet{
+ ObjectMeta: metav1.ObjectMeta{
+ Name: name,
+ Namespace: defaultNamespace,
+ },
+ Spec: appsv1.DaemonSetSpec{
+ UpdateStrategy: appsv1.DaemonSetUpdateStrategy{
+ Type: appsv1.RollingUpdateDaemonSetStrategyType,
+ RollingUpdate: &appsv1.RollingUpdateDaemonSet{
+ MaxUnavailable: func() *intstr.IntOrString { i := intstr.FromInt(maxUnavailable); return &i }(),
+ },
+ },
+ Selector: &metav1.LabelSelector{MatchLabels: map[string]string{"name": name}},
+ Template: corev1.PodTemplateSpec{
+ ObjectMeta: metav1.ObjectMeta{
+ Name: name,
+ Labels: map[string]string{"name": name},
+ },
+ Spec: corev1.PodSpec{
+ Containers: []corev1.Container{
+ {
+ Image: "nginx",
+ },
+ },
+ },
+ },
+ },
+ Status: appsv1.DaemonSetStatus{
+ DesiredNumberScheduled: int32(desiredNumberScheduled),
+ NumberReady: int32(numberReady),
+ UpdatedNumberScheduled: int32(updatedNumberScheduled),
+ },
+ }
+}
+
+func newStatefulSet(name string, replicas, partition, readyReplicas, updatedReplicas int) *appsv1.StatefulSet {
+ return &appsv1.StatefulSet{
+ ObjectMeta: metav1.ObjectMeta{
+ Name: name,
+ Namespace: defaultNamespace,
+ },
+ Spec: appsv1.StatefulSetSpec{
+ UpdateStrategy: appsv1.StatefulSetUpdateStrategy{
+ Type: appsv1.RollingUpdateStatefulSetStrategyType,
+ RollingUpdate: &appsv1.RollingUpdateStatefulSetStrategy{
+ Partition: intToInt32(partition),
+ },
+ },
+ Replicas: intToInt32(replicas),
+ Selector: &metav1.LabelSelector{MatchLabels: map[string]string{"name": name}},
+ Template: corev1.PodTemplateSpec{
+ ObjectMeta: metav1.ObjectMeta{
+ Name: name,
+ Labels: map[string]string{"name": name},
+ },
+ Spec: corev1.PodSpec{
+ Containers: []corev1.Container{
+ {
+ Image: "nginx",
+ },
+ },
+ },
+ },
+ },
+ Status: appsv1.StatefulSetStatus{
+ UpdatedReplicas: int32(updatedReplicas),
+ ReadyReplicas: int32(readyReplicas),
+ },
+ }
+}
+
+func newDeployment(name string, replicas, maxSurge, maxUnavailable int) *appsv1.Deployment {
+ return &appsv1.Deployment{
+ ObjectMeta: metav1.ObjectMeta{
+ Name: name,
+ Namespace: defaultNamespace,
+ },
+ Spec: appsv1.DeploymentSpec{
+ Strategy: appsv1.DeploymentStrategy{
+ Type: appsv1.RollingUpdateDeploymentStrategyType,
+ RollingUpdate: &appsv1.RollingUpdateDeployment{
+ MaxUnavailable: func() *intstr.IntOrString { i := intstr.FromInt(maxUnavailable); return &i }(),
+ MaxSurge: func() *intstr.IntOrString { i := intstr.FromInt(maxSurge); return &i }(),
+ },
+ },
+ Replicas: intToInt32(replicas),
+ Selector: &metav1.LabelSelector{MatchLabels: map[string]string{"name": name}},
+ Template: corev1.PodTemplateSpec{
+ ObjectMeta: metav1.ObjectMeta{
+ Name: name,
+ Labels: map[string]string{"name": name},
+ },
+ Spec: corev1.PodSpec{
+ Containers: []corev1.Container{
+ {
+ Image: "nginx",
+ },
+ },
+ },
+ },
+ },
+ }
+}
+
+func newReplicaSet(name string, replicas int, readyReplicas int) *appsv1.ReplicaSet {
+ d := newDeployment(name, replicas, 0, 0)
+ return &appsv1.ReplicaSet{
+ ObjectMeta: metav1.ObjectMeta{
+ Name: name,
+ Namespace: defaultNamespace,
+ Labels: d.Spec.Selector.MatchLabels,
+ OwnerReferences: []metav1.OwnerReference{*metav1.NewControllerRef(d, d.GroupVersionKind())},
+ },
+ Spec: appsv1.ReplicaSetSpec{
+ Selector: d.Spec.Selector,
+ Replicas: intToInt32(replicas),
+ Template: d.Spec.Template,
+ },
+ Status: appsv1.ReplicaSetStatus{
+ ReadyReplicas: int32(readyReplicas),
+ },
+ }
+}
+
+func newPodWithCondition(name string, podReadyCondition corev1.ConditionStatus) *corev1.Pod {
+ return &corev1.Pod{
+ ObjectMeta: metav1.ObjectMeta{
+ Name: name,
+ Namespace: defaultNamespace,
+ Labels: map[string]string{"name": name},
+ },
+ Spec: corev1.PodSpec{
+ Containers: []corev1.Container{
+ {
+ Image: "nginx",
+ },
+ },
+ },
+ Status: corev1.PodStatus{
+ Conditions: []corev1.PodCondition{
+ {
+ Type: corev1.PodReady,
+ Status: podReadyCondition,
+ },
+ },
+ },
+ }
+}
+
+func newPersistentVolumeClaim(name string, phase corev1.PersistentVolumeClaimPhase) *corev1.PersistentVolumeClaim {
+ return &corev1.PersistentVolumeClaim{
+ ObjectMeta: metav1.ObjectMeta{
+ Name: name,
+ Namespace: defaultNamespace,
+ },
+ Status: corev1.PersistentVolumeClaimStatus{
+ Phase: phase,
+ },
+ }
+}
+
+func newJob(name string, backoffLimit, completions, succeeded, failed int) *batchv1.Job {
+ return &batchv1.Job{
+ ObjectMeta: metav1.ObjectMeta{
+ Name: name,
+ Namespace: defaultNamespace,
+ },
+ Spec: batchv1.JobSpec{
+ BackoffLimit: intToInt32(backoffLimit),
+ Completions: intToInt32(completions),
+ Template: corev1.PodTemplateSpec{
+ ObjectMeta: metav1.ObjectMeta{
+ Name: name,
+ Labels: map[string]string{"name": name},
+ },
+ Spec: corev1.PodSpec{
+ Containers: []corev1.Container{
+ {
+ Image: "nginx",
+ },
+ },
+ },
+ },
+ },
+ Status: batchv1.JobStatus{
+ Succeeded: int32(succeeded),
+ Failed: int32(failed),
+ },
+ }
+}
+
+func intToInt32(i int) *int32 {
+ i32 := int32(i)
+ return &i32
+}
diff --git a/src/k8splugin/internal/statuscheck/resource.go b/src/k8splugin/internal/statuscheck/resource.go
new file mode 100644
index 00000000..598af2fb
--- /dev/null
+++ b/src/k8splugin/internal/statuscheck/resource.go
@@ -0,0 +1,85 @@
+/*
+Copyright The Helm Authors.
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package statuscheck // import "helm.sh/helm/v3/pkg/kube"
+
+import "k8s.io/cli-runtime/pkg/resource"
+
+// ResourceList provides convenience methods for comparing collections of Infos.
+type ResourceList []*resource.Info
+
+// Append adds an Info to the Result.
+func (r *ResourceList) Append(val *resource.Info) {
+ *r = append(*r, val)
+}
+
+// Visit implements resource.Visitor.
+func (r ResourceList) Visit(fn resource.VisitorFunc) error {
+ for _, i := range r {
+ if err := fn(i, nil); err != nil {
+ return err
+ }
+ }
+ return nil
+}
+
+// Filter returns a new Result with Infos that satisfy the predicate fn.
+func (r ResourceList) Filter(fn func(*resource.Info) bool) ResourceList {
+ var result ResourceList
+ for _, i := range r {
+ if fn(i) {
+ result.Append(i)
+ }
+ }
+ return result
+}
+
+// Get returns the Info from the result that matches the name and kind.
+func (r ResourceList) Get(info *resource.Info) *resource.Info {
+ for _, i := range r {
+ if isMatchingInfo(i, info) {
+ return i
+ }
+ }
+ return nil
+}
+
+// Contains checks to see if an object exists.
+func (r ResourceList) Contains(info *resource.Info) bool {
+ for _, i := range r {
+ if isMatchingInfo(i, info) {
+ return true
+ }
+ }
+ return false
+}
+
+// Difference will return a new Result with objects not contained in rs.
+func (r ResourceList) Difference(rs ResourceList) ResourceList {
+ return r.Filter(func(info *resource.Info) bool {
+ return !rs.Contains(info)
+ })
+}
+
+// Intersect will return a new Result with objects contained in both Results.
+func (r ResourceList) Intersect(rs ResourceList) ResourceList {
+ return r.Filter(rs.Contains)
+}
+
+// isMatchingInfo returns true if infos match on Name and GroupVersionKind.
+func isMatchingInfo(a, b *resource.Info) bool {
+ return a.Name == b.Name && a.Namespace == b.Namespace && a.Mapping.GroupVersionKind.Kind == b.Mapping.GroupVersionKind.Kind
+}
diff --git a/src/k8splugin/internal/statuscheck/resource_test.go b/src/k8splugin/internal/statuscheck/resource_test.go
new file mode 100644
index 00000000..532cebfd
--- /dev/null
+++ b/src/k8splugin/internal/statuscheck/resource_test.go
@@ -0,0 +1,61 @@
+/*
+Copyright The Helm Authors.
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package statuscheck // import "helm.sh/helm/v3/pkg/kube"
+
+import (
+ "testing"
+
+ "k8s.io/apimachinery/pkg/api/meta"
+ "k8s.io/apimachinery/pkg/runtime/schema"
+ "k8s.io/cli-runtime/pkg/resource"
+)
+
+func TestResourceList(t *testing.T) {
+ mapping := &meta.RESTMapping{
+ Resource: schema.GroupVersionResource{Group: "group", Version: "version", Resource: "pod"},
+ }
+
+ info := func(name string) *resource.Info {
+ return &resource.Info{Name: name, Mapping: mapping}
+ }
+
+ var r1, r2 ResourceList
+ r1 = []*resource.Info{info("foo"), info("bar")}
+ r2 = []*resource.Info{info("bar")}
+
+ if r1.Get(info("bar")).Mapping.Resource.Resource != "pod" {
+ t.Error("expected get pod")
+ }
+
+ diff := r1.Difference(r2)
+ if len(diff) != 1 {
+ t.Error("expected 1 result")
+ }
+
+ if !diff.Contains(info("foo")) {
+ t.Error("expected diff to return foo")
+ }
+
+ inter := r1.Intersect(r2)
+ if len(inter) != 1 {
+ t.Error("expected 1 result")
+ }
+
+ if !inter.Contains(info("bar")) {
+ t.Error("expected intersect to return bar")
+ }
+}
diff --git a/src/k8splugin/internal/statuscheck/wait.go b/src/k8splugin/internal/statuscheck/wait.go
new file mode 100644
index 00000000..41d90d91
--- /dev/null
+++ b/src/k8splugin/internal/statuscheck/wait.go
@@ -0,0 +1,109 @@
+/*
+Copyright The Helm Authors.
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package statuscheck // import "helm.sh/helm/v3/pkg/kube"
+
+import (
+ "context"
+ "fmt"
+ "time"
+
+ "github.com/pkg/errors"
+ appsv1 "k8s.io/api/apps/v1"
+ appsv1beta1 "k8s.io/api/apps/v1beta1"
+ appsv1beta2 "k8s.io/api/apps/v1beta2"
+ batchv1 "k8s.io/api/batch/v1"
+ corev1 "k8s.io/api/core/v1"
+ extensionsv1beta1 "k8s.io/api/extensions/v1beta1"
+ metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
+ "k8s.io/apimachinery/pkg/labels"
+ "k8s.io/apimachinery/pkg/runtime"
+
+ "k8s.io/apimachinery/pkg/util/wait"
+)
+
+type waiter struct {
+ c ReadyChecker
+ timeout time.Duration
+ log func(string, ...interface{})
+}
+
+// waitForResources polls to get the current status of all pods, PVCs, Services and
+// Jobs(optional) until all are ready or a timeout is reached
+func (w *waiter) waitForResources(created ResourceList) error {
+ w.log("beginning wait for %d resources with timeout of %v", len(created), w.timeout)
+
+ ctx, cancel := context.WithTimeout(context.Background(), w.timeout)
+ defer cancel()
+
+ return wait.PollImmediateUntil(2*time.Second, func() (bool, error) {
+ for _, v := range created {
+ ready, err := w.c.IsReady(ctx, v)
+ if !ready || err != nil {
+ return false, err
+ }
+ }
+ return true, nil
+ }, ctx.Done())
+}
+
+// SelectorsForObject returns the pod label selector for a given object
+//
+// Modified version of https://github.com/kubernetes/kubernetes/blob/v1.14.1/pkg/kubectl/polymorphichelpers/helpers.go#L84
+func SelectorsForObject(object runtime.Object) (selector labels.Selector, err error) {
+ switch t := object.(type) {
+ case *extensionsv1beta1.ReplicaSet:
+ selector, err = metav1.LabelSelectorAsSelector(t.Spec.Selector)
+ case *appsv1.ReplicaSet:
+ selector, err = metav1.LabelSelectorAsSelector(t.Spec.Selector)
+ case *appsv1beta2.ReplicaSet:
+ selector, err = metav1.LabelSelectorAsSelector(t.Spec.Selector)
+ case *corev1.ReplicationController:
+ selector = labels.SelectorFromSet(t.Spec.Selector)
+ case *appsv1.StatefulSet:
+ selector, err = metav1.LabelSelectorAsSelector(t.Spec.Selector)
+ case *appsv1beta1.StatefulSet:
+ selector, err = metav1.LabelSelectorAsSelector(t.Spec.Selector)
+ case *appsv1beta2.StatefulSet:
+ selector, err = metav1.LabelSelectorAsSelector(t.Spec.Selector)
+ case *extensionsv1beta1.DaemonSet:
+ selector, err = metav1.LabelSelectorAsSelector(t.Spec.Selector)
+ case *appsv1.DaemonSet:
+ selector, err = metav1.LabelSelectorAsSelector(t.Spec.Selector)
+ case *appsv1beta2.DaemonSet:
+ selector, err = metav1.LabelSelectorAsSelector(t.Spec.Selector)
+ case *extensionsv1beta1.Deployment:
+ selector, err = metav1.LabelSelectorAsSelector(t.Spec.Selector)
+ case *appsv1.Deployment:
+ selector, err = metav1.LabelSelectorAsSelector(t.Spec.Selector)
+ case *appsv1beta1.Deployment:
+ selector, err = metav1.LabelSelectorAsSelector(t.Spec.Selector)
+ case *appsv1beta2.Deployment:
+ selector, err = metav1.LabelSelectorAsSelector(t.Spec.Selector)
+ case *batchv1.Job:
+ selector, err = metav1.LabelSelectorAsSelector(t.Spec.Selector)
+ case *corev1.Service:
+ if t.Spec.Selector == nil || len(t.Spec.Selector) == 0 {
+ return nil, fmt.Errorf("invalid service '%s': Service is defined without a selector", t.Name)
+ }
+ selector = labels.SelectorFromSet(t.Spec.Selector)
+
+ default:
+ return nil, fmt.Errorf("selector for %T not implemented", object)
+ }
+
+ return selector, errors.Wrap(err, "invalid label selector")
+}