aboutsummaryrefslogtreecommitdiffstats
path: root/src/k8splugin/plugins/generic/plugin.go
diff options
context:
space:
mode:
authorhthieu <huu_trung.thieu@nokia-bell-labs.com>2021-07-01 20:03:09 +0200
committerhthieu <huu_trung.thieu@nokia-bell-labs.com>2021-08-11 15:02:27 +0200
commit57d1305db9f032c94949b719f0dc052ac7cd2d41 (patch)
tree1249fbd139622900c4f0d04bfacf52836e471d31 /src/k8splugin/plugins/generic/plugin.go
parent6875d67ee2ad879170774304dd35d9a14dd9f50c (diff)
Support pre/post install/delete hooks  
Update instance create and delete handler to support pre/post install/delete hooks.  Add hook.go: to execute and delete hook (base on delete policy).  Implement watchUntilReady in generic plugin to wait for readiness of hook rss. Add hook_sorter.go: to sort hook based on weight. User can define timeout for each type of hooks in overwrite-values. Variable name is k8s-rb-instance-pre-install-timeout (default 60s), k8s-rb-instance-post-install-timeout (default 600s), k8s-rb-instance-pre-delete-timeout (default 60s) and k8s-rb-instance-post-delete-timeout (600s). This is timeout for each hook of a hook event (not a total time). Add recovery capability to continue the execution of instantiation (create or delete) when the plugin stop unexpectedly. For now, this is disabled because we have data-race issue during test. Will enable when we find the solution. Add basic test for hooks (in hook_test.go) Add test for hook in instance_test For instance get request, we can request for full data by adding query param to the request: full=true. Issue-ID: MULTICLOUD-1347 Signed-off-by: hthieu <huu_trung.thieu@nokia-bell-labs.com> Change-Id: If2b4a90831b9bfce1af8b926e4062a7d706bee08
Diffstat (limited to 'src/k8splugin/plugins/generic/plugin.go')
-rw-r--r--src/k8splugin/plugins/generic/plugin.go248
1 files changed, 247 insertions, 1 deletions
diff --git a/src/k8splugin/plugins/generic/plugin.go b/src/k8splugin/plugins/generic/plugin.go
index 61d84fe3..f71c436c 100644
--- a/src/k8splugin/plugins/generic/plugin.go
+++ b/src/k8splugin/plugins/generic/plugin.go
@@ -1,5 +1,6 @@
/*
Copyright 2018 Intel Corporation.
+Copyright © 2021 Nokia Bell Labs.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
@@ -15,16 +16,35 @@ package main
import (
"context"
- "github.com/onap/multicloud-k8s/src/k8splugin/internal/utils"
+ "fmt"
+ logger "log"
+ "time"
+
+ appsv1 "k8s.io/api/apps/v1"
+ "k8s.io/client-go/kubernetes"
+ //appsv1beta1 "k8s.io/api/apps/v1beta1"
+ //appsv1beta2 "k8s.io/api/apps/v1beta2"
+ batchv1 "k8s.io/api/batch/v1"
+ corev1 "k8s.io/api/core/v1"
+ "k8s.io/apimachinery/pkg/runtime"
+ "k8s.io/apimachinery/pkg/util/intstr"
+
pkgerrors "github.com/pkg/errors"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
+ "k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/runtime/schema"
+ "k8s.io/apimachinery/pkg/watch"
+ "k8s.io/client-go/rest"
+ "github.com/onap/multicloud-k8s/src/k8splugin/internal/app"
"github.com/onap/multicloud-k8s/src/k8splugin/internal/config"
"github.com/onap/multicloud-k8s/src/k8splugin/internal/helm"
"github.com/onap/multicloud-k8s/src/k8splugin/internal/plugin"
+ "github.com/onap/multicloud-k8s/src/k8splugin/internal/utils"
+ cachetools "k8s.io/client-go/tools/cache"
+ watchtools "k8s.io/client-go/tools/watch"
)
// Compile time check to see if genericPlugin implements the correct interface
@@ -36,6 +56,232 @@ var ExportedVariable genericPlugin
type genericPlugin struct {
}
+func (g genericPlugin) WatchUntilReady(
+ timeout time.Duration,
+ ns string,
+ res helm.KubernetesResource,
+ mapper meta.RESTMapper,
+ restClient rest.Interface,
+ objType runtime.Object,
+ clientSet kubernetes.Interface) error {
+ selector, err := fields.ParseSelector(fmt.Sprintf("metadata.name=%s", res.Name))
+ if err != nil {
+ return err
+ }
+
+ mapping, err := mapper.RESTMapping(schema.GroupKind{
+ Group: res.GVK.Group,
+ Kind: res.GVK.Kind,
+ }, res.GVK.Version)
+ if err != nil {
+ return pkgerrors.Wrap(err, "Preparing mapper based on GVK")
+ }
+ lw := cachetools.NewListWatchFromClient(restClient, mapping.Resource.Resource, ns, selector)
+
+ // What we watch for depends on the Kind.
+ // - For a Job, we watch for completion.
+ // - For all else, we watch until Ready.
+ // In the future, we might want to add some special logic for types
+ // like Ingress, Volume, etc.
+ ctx, cancel := watchtools.ContextWithOptionalTimeout(context.Background(), timeout)
+ defer cancel()
+
+ _, err = watchtools.UntilWithSync(ctx, lw, objType, nil, func(e watch.Event) (bool, error) {
+ obj := e.Object
+ switch e.Type {
+ case watch.Added, watch.Modified:
+ // For things like a secret or a config map, this is the best indicator
+ // we get. We care mostly about jobs, where what we want to see is
+ // the status go into a good state.
+ logger.Printf("Add/Modify event for %s: %v", res.Name, e.Type)
+ switch res.GVK.Kind {
+ case "Job":
+ return g.waitForJob(obj, res.Name)
+ case "Pod":
+ return g.waitForPodSuccess(obj, res.Name)
+ case "Deployment":
+ return g.waitForDeploymentSuccess(obj, res.Name, clientSet)
+ case "DaemonSet":
+ return g.waitForDaemonSetSuccess(obj, res.Name)
+ case "StatefulSet":
+ return g.waitForStatefulSetSuccess(obj, res.Name)
+ }
+ return true, nil
+ case watch.Deleted:
+ logger.Printf("Deleted event for %s", res.Name)
+ return true, nil
+ case watch.Error:
+ // Handle error and return with an error.
+ logger.Printf("Error event for %s", res.Name)
+ return true, pkgerrors.New("failed to deploy " + res.Name)
+ default:
+ return false, nil
+ }
+ })
+ if err != nil {
+ logger.Printf("Error in Rss %s", res.Name)
+ return err
+ } else {
+ logger.Printf("Done for %s", res.Name)
+ return nil
+ }
+}
+
+// waitForJob is a helper that waits for a job to complete.
+//
+// This operates on an event returned from a watcher.
+func (g genericPlugin) waitForJob(obj runtime.Object, name string) (bool, error) {
+ o, ok := obj.(*batchv1.Job)
+ if !ok {
+ return true, pkgerrors.New("expected " + name + " to be a *batch.Job, got " + obj.GetObjectKind().GroupVersionKind().Kind)
+ }
+
+ for _, c := range o.Status.Conditions {
+ if c.Type == batchv1.JobComplete && c.Status == "True" {
+ return true, nil
+ } else if c.Type == batchv1.JobFailed && c.Status == "True" {
+ return true, pkgerrors.New("job failed: " + c.Reason)
+ }
+ }
+
+ logger.Printf("%s: Jobs active: %d, jobs failed: %d, jobs succeeded: %d", name, o.Status.Active, o.Status.Failed, o.Status.Succeeded)
+ return false, nil
+}
+
+// waitForPodSuccess is a helper that waits for a pod to complete.
+//
+// This operates on an event returned from a watcher.
+func (g genericPlugin) waitForPodSuccess(obj runtime.Object, name string) (bool, error) {
+ o, ok := obj.(*corev1.Pod)
+ if !ok {
+ return true, pkgerrors.New("expected " + name + " to be a *v1.Pod, got " + obj.GetObjectKind().GroupVersionKind().Kind)
+ }
+
+ switch o.Status.Phase {
+ case corev1.PodSucceeded:
+ logger.Printf("Pod %s succeeded", o.Name)
+ return true, nil
+ case corev1.PodFailed:
+ return true, pkgerrors.New("pod " + o.Name + " failed")
+ case corev1.PodPending:
+ logger.Printf("Pod %s pending", o.Name)
+ case corev1.PodRunning:
+ logger.Printf("Pod %s running", o.Name)
+ }
+
+ return false, nil
+}
+
+// waitForDeploymentSuccess is a helper that waits for a deployment to run.
+//
+// This operates on an event returned from a watcher.
+func (g genericPlugin) waitForDeploymentSuccess(obj runtime.Object, name string, clientSet kubernetes.Interface) (bool, error) {
+ o, ok := obj.(*appsv1.Deployment)
+ if !ok {
+ return true, pkgerrors.New("expected " + name + " to be a *apps.Deployment, got " + obj.GetObjectKind().GroupVersionKind().Kind)
+ }
+
+ // If paused deployment will never be ready -> consider ready
+ if o.Spec.Paused {
+ logger.Printf("Depoyment %s is paused, consider ready", o.Name)
+ return true, nil
+ }
+
+ // Find RS associated with deployment
+ newReplicaSet, err := app.GetNewReplicaSet(o, clientSet.AppsV1())
+ if err != nil || newReplicaSet == nil {
+ return false, err
+ }
+ expectedReady := *o.Spec.Replicas - app.MaxUnavailable(*o)
+ if !(newReplicaSet.Status.ReadyReplicas >= expectedReady) {
+ logger.Printf("Deployment is not ready: %s/%s. %d out of %d expected pods are ready", o.Namespace, o.Name, newReplicaSet.Status.ReadyReplicas, expectedReady)
+ return false, nil
+ }
+ return true, nil
+}
+
+// waitForDaemonSetSuccess is a helper that waits for a daemonSet to run.
+//
+// This operates on an event returned from a watcher.
+func (g genericPlugin) waitForDaemonSetSuccess(obj runtime.Object, name string) (bool, error) {
+ o, ok := obj.(*appsv1.DaemonSet)
+ if !ok {
+ return true, pkgerrors.New("expected " + name + " to be a *apps.DaemonSet, got " + obj.GetObjectKind().GroupVersionKind().Kind)
+ }
+
+ // If the update strategy is not a rolling update, there will be nothing to wait for
+ if o.Spec.UpdateStrategy.Type != appsv1.RollingUpdateDaemonSetStrategyType {
+ return true, nil
+ }
+
+ // Make sure all the updated pods have been scheduled
+ if o.Status.UpdatedNumberScheduled != o.Status.DesiredNumberScheduled {
+ logger.Printf("DaemonSet is not ready: %s/%s. %d out of %d expected pods have been scheduled", o.Namespace, o.Name, o.Status.UpdatedNumberScheduled, o.Status.DesiredNumberScheduled)
+ return false, nil
+ }
+ maxUnavailable, err := intstr.GetValueFromIntOrPercent(o.Spec.UpdateStrategy.RollingUpdate.MaxUnavailable, int(o.Status.DesiredNumberScheduled), true)
+ if err != nil {
+ // If for some reason the value is invalid, set max unavailable to the
+ // number of desired replicas. This is the same behavior as the
+ // `MaxUnavailable` function in deploymentutil
+ maxUnavailable = int(o.Status.DesiredNumberScheduled)
+ }
+
+ expectedReady := int(o.Status.DesiredNumberScheduled) - maxUnavailable
+ if !(int(o.Status.NumberReady) >= expectedReady) {
+ logger.Printf("DaemonSet is not ready: %s/%s. %d out of %d expected pods are ready", o.Namespace, o.Name, o.Status.NumberReady, expectedReady)
+ return false, nil
+ }
+ return true, nil
+}
+
+// waitForStatefulSetSuccess is a helper that waits for a statefulSet to run.
+//
+// This operates on an event returned from a watcher.
+func (g genericPlugin) waitForStatefulSetSuccess(obj runtime.Object, name string) (bool, error) {
+ o, ok := obj.(*appsv1.StatefulSet)
+ if !ok {
+ return true, pkgerrors.New("expected " + name + " to be a *apps.StatefulSet, got " + obj.GetObjectKind().GroupVersionKind().Kind)
+ }
+
+ // If the update strategy is not a rolling update, there will be nothing to wait for
+ if o.Spec.UpdateStrategy.Type != appsv1.RollingUpdateStatefulSetStrategyType {
+ return true, nil
+ }
+
+ // Dereference all the pointers because StatefulSets like them
+ var partition int
+ // 1 is the default for replicas if not set
+ var replicas = 1
+ // For some reason, even if the update strategy is a rolling update, the
+ // actual rollingUpdate field can be nil. If it is, we can safely assume
+ // there is no partition value
+ if o.Spec.UpdateStrategy.RollingUpdate != nil && o.Spec.UpdateStrategy.RollingUpdate.Partition != nil {
+ partition = int(*o.Spec.UpdateStrategy.RollingUpdate.Partition)
+ }
+ if o.Spec.Replicas != nil {
+ replicas = int(*o.Spec.Replicas)
+ }
+
+ // Because an update strategy can use partitioning, we need to calculate the
+ // number of updated replicas we should have. For example, if the replicas
+ // is set to 3 and the partition is 2, we'd expect only one pod to be
+ // updated
+ expectedReplicas := replicas - partition
+
+ // Make sure all the updated pods have been scheduled
+ if int(o.Status.UpdatedReplicas) != expectedReplicas {
+ logger.Printf("StatefulSet is not ready: %s/%s. %d out of %d expected pods have been scheduled", o.Namespace, o.Name, o.Status.UpdatedReplicas, expectedReplicas)
+ return false, nil
+ }
+
+ if int(o.Status.ReadyReplicas) != replicas {
+ logger.Printf("StatefulSet is not ready: %s/%s. %d out of %d expected pods are ready", o.Namespace, o.Name, o.Status.ReadyReplicas, replicas)
+ return false, nil
+ }
+ return true, nil
+}
+
// Create generic object in a specific Kubernetes cluster
func (g genericPlugin) Create(yamlFilePath string, namespace string, client plugin.KubernetesConnector) (string, error) {
if namespace == "" {