path: root/src/k8splugin/plugins
diff options
Diffstat (limited to 'src/k8splugin/plugins')
3 files changed, 283 insertions, 3 deletions
diff --git a/src/k8splugin/plugins/generic/plugin.go b/src/k8splugin/plugins/generic/plugin.go
index f38fee78..f71c436c 100644
--- a/src/k8splugin/plugins/generic/plugin.go
+++ b/src/k8splugin/plugins/generic/plugin.go
@@ -1,5 +1,6 @@
Copyright 2018 Intel Corporation.
+Copyright © 2021 Nokia Bell Labs.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
@@ -15,16 +16,35 @@ package main
import (
+ "fmt"
+ logger "log"
+ "time"
+ appsv1 "k8s.io/api/apps/v1"
+ "k8s.io/client-go/kubernetes"
+ //appsv1beta1 "k8s.io/api/apps/v1beta1"
+ //appsv1beta2 "k8s.io/api/apps/v1beta2"
+ batchv1 "k8s.io/api/batch/v1"
+ corev1 "k8s.io/api/core/v1"
+ "k8s.io/apimachinery/pkg/runtime"
+ "k8s.io/apimachinery/pkg/util/intstr"
pkgerrors "github.com/pkg/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
+ "k8s.io/apimachinery/pkg/fields"
+ "k8s.io/apimachinery/pkg/watch"
+ "k8s.io/client-go/rest"
- utils "github.com/onap/multicloud-k8s/src/k8splugin/internal"
+ "github.com/onap/multicloud-k8s/src/k8splugin/internal/app"
+ "github.com/onap/multicloud-k8s/src/k8splugin/internal/utils"
+ cachetools "k8s.io/client-go/tools/cache"
+ watchtools "k8s.io/client-go/tools/watch"
// Compile time check to see if genericPlugin implements the correct interface
@@ -36,6 +56,232 @@ var ExportedVariable genericPlugin
type genericPlugin struct {
+func (g genericPlugin) WatchUntilReady(
+ timeout time.Duration,
+ ns string,
+ res helm.KubernetesResource,
+ mapper meta.RESTMapper,
+ restClient rest.Interface,
+ objType runtime.Object,
+ clientSet kubernetes.Interface) error {
+ selector, err := fields.ParseSelector(fmt.Sprintf("metadata.name=%s", res.Name))
+ if err != nil {
+ return err
+ }
+ mapping, err := mapper.RESTMapping(schema.GroupKind{
+ Group: res.GVK.Group,
+ Kind: res.GVK.Kind,
+ }, res.GVK.Version)
+ if err != nil {
+ return pkgerrors.Wrap(err, "Preparing mapper based on GVK")
+ }
+ lw := cachetools.NewListWatchFromClient(restClient, mapping.Resource.Resource, ns, selector)
+ // What we watch for depends on the Kind.
+ // - For a Job, we watch for completion.
+ // - For all else, we watch until Ready.
+ // In the future, we might want to add some special logic for types
+ // like Ingress, Volume, etc.
+ ctx, cancel := watchtools.ContextWithOptionalTimeout(context.Background(), timeout)
+ defer cancel()
+ _, err = watchtools.UntilWithSync(ctx, lw, objType, nil, func(e watch.Event) (bool, error) {
+ obj := e.Object
+ switch e.Type {
+ case watch.Added, watch.Modified:
+ // For things like a secret or a config map, this is the best indicator
+ // we get. We care mostly about jobs, where what we want to see is
+ // the status go into a good state.
+ logger.Printf("Add/Modify event for %s: %v", res.Name, e.Type)
+ switch res.GVK.Kind {
+ case "Job":
+ return g.waitForJob(obj, res.Name)
+ case "Pod":
+ return g.waitForPodSuccess(obj, res.Name)
+ case "Deployment":
+ return g.waitForDeploymentSuccess(obj, res.Name, clientSet)
+ case "DaemonSet":
+ return g.waitForDaemonSetSuccess(obj, res.Name)
+ case "StatefulSet":
+ return g.waitForStatefulSetSuccess(obj, res.Name)
+ }
+ return true, nil
+ case watch.Deleted:
+ logger.Printf("Deleted event for %s", res.Name)
+ return true, nil
+ case watch.Error:
+ // Handle error and return with an error.
+ logger.Printf("Error event for %s", res.Name)
+ return true, pkgerrors.New("failed to deploy " + res.Name)
+ default:
+ return false, nil
+ }
+ })
+ if err != nil {
+ logger.Printf("Error in Rss %s", res.Name)
+ return err
+ } else {
+ logger.Printf("Done for %s", res.Name)
+ return nil
+ }
+// waitForJob is a helper that waits for a job to complete.
+// This operates on an event returned from a watcher.
+func (g genericPlugin) waitForJob(obj runtime.Object, name string) (bool, error) {
+ o, ok := obj.(*batchv1.Job)
+ if !ok {
+ return true, pkgerrors.New("expected " + name + " to be a *batch.Job, got " + obj.GetObjectKind().GroupVersionKind().Kind)
+ }
+ for _, c := range o.Status.Conditions {
+ if c.Type == batchv1.JobComplete && c.Status == "True" {
+ return true, nil
+ } else if c.Type == batchv1.JobFailed && c.Status == "True" {
+ return true, pkgerrors.New("job failed: " + c.Reason)
+ }
+ }
+ logger.Printf("%s: Jobs active: %d, jobs failed: %d, jobs succeeded: %d", name, o.Status.Active, o.Status.Failed, o.Status.Succeeded)
+ return false, nil
+// waitForPodSuccess is a helper that waits for a pod to complete.
+// This operates on an event returned from a watcher.
+func (g genericPlugin) waitForPodSuccess(obj runtime.Object, name string) (bool, error) {
+ o, ok := obj.(*corev1.Pod)
+ if !ok {
+ return true, pkgerrors.New("expected " + name + " to be a *v1.Pod, got " + obj.GetObjectKind().GroupVersionKind().Kind)
+ }
+ switch o.Status.Phase {
+ case corev1.PodSucceeded:
+ logger.Printf("Pod %s succeeded", o.Name)
+ return true, nil
+ case corev1.PodFailed:
+ return true, pkgerrors.New("pod " + o.Name + " failed")
+ case corev1.PodPending:
+ logger.Printf("Pod %s pending", o.Name)
+ case corev1.PodRunning:
+ logger.Printf("Pod %s running", o.Name)
+ }
+ return false, nil
+// waitForDeploymentSuccess is a helper that waits for a deployment to run.
+// This operates on an event returned from a watcher.
+func (g genericPlugin) waitForDeploymentSuccess(obj runtime.Object, name string, clientSet kubernetes.Interface) (bool, error) {
+ o, ok := obj.(*appsv1.Deployment)
+ if !ok {
+ return true, pkgerrors.New("expected " + name + " to be a *apps.Deployment, got " + obj.GetObjectKind().GroupVersionKind().Kind)
+ }
+ // If paused deployment will never be ready -> consider ready
+ if o.Spec.Paused {
+ logger.Printf("Depoyment %s is paused, consider ready", o.Name)
+ return true, nil
+ }
+ // Find RS associated with deployment
+ newReplicaSet, err := app.GetNewReplicaSet(o, clientSet.AppsV1())
+ if err != nil || newReplicaSet == nil {
+ return false, err
+ }
+ expectedReady := *o.Spec.Replicas - app.MaxUnavailable(*o)
+ if !(newReplicaSet.Status.ReadyReplicas >= expectedReady) {
+ logger.Printf("Deployment is not ready: %s/%s. %d out of %d expected pods are ready", o.Namespace, o.Name, newReplicaSet.Status.ReadyReplicas, expectedReady)
+ return false, nil
+ }
+ return true, nil
+// waitForDaemonSetSuccess is a helper that waits for a daemonSet to run.
+// This operates on an event returned from a watcher.
+func (g genericPlugin) waitForDaemonSetSuccess(obj runtime.Object, name string) (bool, error) {
+ o, ok := obj.(*appsv1.DaemonSet)
+ if !ok {
+ return true, pkgerrors.New("expected " + name + " to be a *apps.DaemonSet, got " + obj.GetObjectKind().GroupVersionKind().Kind)
+ }
+ // If the update strategy is not a rolling update, there will be nothing to wait for
+ if o.Spec.UpdateStrategy.Type != appsv1.RollingUpdateDaemonSetStrategyType {
+ return true, nil
+ }
+ // Make sure all the updated pods have been scheduled
+ if o.Status.UpdatedNumberScheduled != o.Status.DesiredNumberScheduled {
+ logger.Printf("DaemonSet is not ready: %s/%s. %d out of %d expected pods have been scheduled", o.Namespace, o.Name, o.Status.UpdatedNumberScheduled, o.Status.DesiredNumberScheduled)
+ return false, nil
+ }
+ maxUnavailable, err := intstr.GetValueFromIntOrPercent(o.Spec.UpdateStrategy.RollingUpdate.MaxUnavailable, int(o.Status.DesiredNumberScheduled), true)
+ if err != nil {
+ // If for some reason the value is invalid, set max unavailable to the
+ // number of desired replicas. This is the same behavior as the
+ // `MaxUnavailable` function in deploymentutil
+ maxUnavailable = int(o.Status.DesiredNumberScheduled)
+ }
+ expectedReady := int(o.Status.DesiredNumberScheduled) - maxUnavailable
+ if !(int(o.Status.NumberReady) >= expectedReady) {
+ logger.Printf("DaemonSet is not ready: %s/%s. %d out of %d expected pods are ready", o.Namespace, o.Name, o.Status.NumberReady, expectedReady)
+ return false, nil
+ }
+ return true, nil
+// waitForStatefulSetSuccess is a helper that waits for a statefulSet to run.
+// This operates on an event returned from a watcher.
+func (g genericPlugin) waitForStatefulSetSuccess(obj runtime.Object, name string) (bool, error) {
+ o, ok := obj.(*appsv1.StatefulSet)
+ if !ok {
+ return true, pkgerrors.New("expected " + name + " to be a *apps.StatefulSet, got " + obj.GetObjectKind().GroupVersionKind().Kind)
+ }
+ // If the update strategy is not a rolling update, there will be nothing to wait for
+ if o.Spec.UpdateStrategy.Type != appsv1.RollingUpdateStatefulSetStrategyType {
+ return true, nil
+ }
+ // Dereference all the pointers because StatefulSets like them
+ var partition int
+ // 1 is the default for replicas if not set
+ var replicas = 1
+ // For some reason, even if the update strategy is a rolling update, the
+ // actual rollingUpdate field can be nil. If it is, we can safely assume
+ // there is no partition value
+ if o.Spec.UpdateStrategy.RollingUpdate != nil && o.Spec.UpdateStrategy.RollingUpdate.Partition != nil {
+ partition = int(*o.Spec.UpdateStrategy.RollingUpdate.Partition)
+ }
+ if o.Spec.Replicas != nil {
+ replicas = int(*o.Spec.Replicas)
+ }
+ // Because an update strategy can use partitioning, we need to calculate the
+ // number of updated replicas we should have. For example, if the replicas
+ // is set to 3 and the partition is 2, we'd expect only one pod to be
+ // updated
+ expectedReplicas := replicas - partition
+ // Make sure all the updated pods have been scheduled
+ if int(o.Status.UpdatedReplicas) != expectedReplicas {
+ logger.Printf("StatefulSet is not ready: %s/%s. %d out of %d expected pods have been scheduled", o.Namespace, o.Name, o.Status.UpdatedReplicas, expectedReplicas)
+ return false, nil
+ }
+ if int(o.Status.ReadyReplicas) != replicas {
+ logger.Printf("StatefulSet is not ready: %s/%s. %d out of %d expected pods are ready", o.Namespace, o.Name, o.Status.ReadyReplicas, replicas)
+ return false, nil
+ }
+ return true, nil
// Create generic object in a specific Kubernetes cluster
func (g genericPlugin) Create(yamlFilePath string, namespace string, client plugin.KubernetesConnector) (string, error) {
if namespace == "" {
diff --git a/src/k8splugin/plugins/namespace/plugin.go b/src/k8splugin/plugins/namespace/plugin.go
index 851a5568..8732442e 100644
--- a/src/k8splugin/plugins/namespace/plugin.go
+++ b/src/k8splugin/plugins/namespace/plugin.go
@@ -1,5 +1,6 @@
Copyright 2018 Intel Corporation.
+Copyright © 2021 Nokia Bell Labs.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
@@ -16,15 +17,20 @@ package main
import (
+ "time"
pkgerrors "github.com/pkg/errors"
coreV1 "k8s.io/api/core/v1"
metaV1 "k8s.io/apimachinery/pkg/apis/meta/v1"
+ "k8s.io/apimachinery/pkg/api/meta"
+ "k8s.io/apimachinery/pkg/runtime"
+ "k8s.io/client-go/kubernetes"
+ "k8s.io/client-go/rest"
- utils "github.com/onap/multicloud-k8s/src/k8splugin/internal"
+ "github.com/onap/multicloud-k8s/src/k8splugin/internal/utils"
// Compile time check to see if namespacePlugin implements the correct interface
@@ -36,6 +42,17 @@ var ExportedVariable namespacePlugin
type namespacePlugin struct {
+func (g namespacePlugin) WatchUntilReady(
+ timeout time.Duration,
+ ns string,
+ res helm.KubernetesResource,
+ mapper meta.RESTMapper,
+ restClient rest.Interface,
+ objType runtime.Object,
+ clientSet kubernetes.Interface) error {
+ return pkgerrors.Errorf("This function is not implemented in this plugin")
// Create a namespace object in a specific Kubernetes cluster
func (p namespacePlugin) Create(yamlFilePath string, namespace string, client plugin.KubernetesConnector) (string, error) {
namespaceObj := &coreV1.Namespace{
diff --git a/src/k8splugin/plugins/service/plugin.go b/src/k8splugin/plugins/service/plugin.go
index ba1decbb..aa5c685c 100644
--- a/src/k8splugin/plugins/service/plugin.go
+++ b/src/k8splugin/plugins/service/plugin.go
@@ -1,5 +1,6 @@
Copyright 2018 Intel Corporation.
+Copyright © 2021 Nokia Bell Labs.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
@@ -16,16 +17,21 @@ package main
import (
+ "time"
pkgerrors "github.com/pkg/errors"
coreV1 "k8s.io/api/core/v1"
metaV1 "k8s.io/apimachinery/pkg/apis/meta/v1"
+ "k8s.io/apimachinery/pkg/api/meta"
+ "k8s.io/apimachinery/pkg/runtime"
+ "k8s.io/client-go/kubernetes"
+ "k8s.io/client-go/rest"
- utils "github.com/onap/multicloud-k8s/src/k8splugin/internal"
+ "github.com/onap/multicloud-k8s/src/k8splugin/internal/utils"
// Compile time check to see if servicePlugin implements the correct interface
@@ -37,6 +43,17 @@ var ExportedVariable servicePlugin
type servicePlugin struct {
+func (g servicePlugin) WatchUntilReady(
+ timeout time.Duration,
+ ns string,
+ res helm.KubernetesResource,
+ mapper meta.RESTMapper,
+ restClient rest.Interface,
+ objType runtime.Object,
+ clientSet kubernetes.Interface) error {
+ return pkgerrors.Errorf("This function is not implemented in this plugin")
// Create a service object in a specific Kubernetes cluster
func (p servicePlugin) Create(yamlFilePath string, namespace string, client plugin.KubernetesConnector) (string, error) {
if namespace == "" {