diff options
author | hthieu <huu_trung.thieu@nokia-bell-labs.com> | 2021-07-01 20:03:09 +0200 |
---|---|---|
committer | hthieu <huu_trung.thieu@nokia-bell-labs.com> | 2021-08-11 15:02:27 +0200 |
commit | 57d1305db9f032c94949b719f0dc052ac7cd2d41 (patch) | |
tree | 1249fbd139622900c4f0d04bfacf52836e471d31 /src/k8splugin/plugins/generic/plugin.go | |
parent | 6875d67ee2ad879170774304dd35d9a14dd9f50c (diff) |
Support pre/post install/delete hooks
Update instance create and delete handler to support pre/post install/delete hooks.
Add hook.go: to execute and delete hook (base on delete policy).
Implement watchUntilReady in generic plugin to wait for readiness of hook rss.
Add hook_sorter.go: to sort hook based on weight.
User can define timeout for each type of hooks in overwrite-values. Variable name is k8s-rb-instance-pre-install-timeout (default 60s),
k8s-rb-instance-post-install-timeout (default 600s), k8s-rb-instance-pre-delete-timeout (default 60s) and k8s-rb-instance-post-delete-timeout (600s). This is timeout
for each hook of a hook event (not a total time).
Add recovery capability to continue the execution of instantiation (create or delete) when the plugin stop unexpectedly.
For now, this is disabled because we have data-race issue during test. Will enable when we find the solution.
Add basic test for hooks (in hook_test.go)
Add test for hook in instance_test
For instance get request, we can request for full data by adding query param to the request: full=true.
Issue-ID: MULTICLOUD-1347
Signed-off-by: hthieu <huu_trung.thieu@nokia-bell-labs.com>
Change-Id: If2b4a90831b9bfce1af8b926e4062a7d706bee08
Diffstat (limited to 'src/k8splugin/plugins/generic/plugin.go')
-rw-r--r-- | src/k8splugin/plugins/generic/plugin.go | 248 |
1 files changed, 247 insertions, 1 deletions
diff --git a/src/k8splugin/plugins/generic/plugin.go b/src/k8splugin/plugins/generic/plugin.go index 61d84fe3..f71c436c 100644 --- a/src/k8splugin/plugins/generic/plugin.go +++ b/src/k8splugin/plugins/generic/plugin.go @@ -1,5 +1,6 @@ /* Copyright 2018 Intel Corporation. +Copyright © 2021 Nokia Bell Labs. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at @@ -15,16 +16,35 @@ package main import ( "context" - "github.com/onap/multicloud-k8s/src/k8splugin/internal/utils" + "fmt" + logger "log" + "time" + + appsv1 "k8s.io/api/apps/v1" + "k8s.io/client-go/kubernetes" + //appsv1beta1 "k8s.io/api/apps/v1beta1" + //appsv1beta2 "k8s.io/api/apps/v1beta2" + batchv1 "k8s.io/api/batch/v1" + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/util/intstr" + pkgerrors "github.com/pkg/errors" "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/fields" "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apimachinery/pkg/watch" + "k8s.io/client-go/rest" + "github.com/onap/multicloud-k8s/src/k8splugin/internal/app" "github.com/onap/multicloud-k8s/src/k8splugin/internal/config" "github.com/onap/multicloud-k8s/src/k8splugin/internal/helm" "github.com/onap/multicloud-k8s/src/k8splugin/internal/plugin" + "github.com/onap/multicloud-k8s/src/k8splugin/internal/utils" + cachetools "k8s.io/client-go/tools/cache" + watchtools "k8s.io/client-go/tools/watch" ) // Compile time check to see if genericPlugin implements the correct interface @@ -36,6 +56,232 @@ var ExportedVariable genericPlugin type genericPlugin struct { } +func (g genericPlugin) WatchUntilReady( + timeout time.Duration, + ns string, + res helm.KubernetesResource, + mapper meta.RESTMapper, + restClient rest.Interface, + objType runtime.Object, + clientSet kubernetes.Interface) error { + selector, err := fields.ParseSelector(fmt.Sprintf("metadata.name=%s", res.Name)) + if err != nil { + return err + } + + mapping, err := mapper.RESTMapping(schema.GroupKind{ + Group: res.GVK.Group, + Kind: res.GVK.Kind, + }, res.GVK.Version) + if err != nil { + return pkgerrors.Wrap(err, "Preparing mapper based on GVK") + } + lw := cachetools.NewListWatchFromClient(restClient, mapping.Resource.Resource, ns, selector) + + // What we watch for depends on the Kind. + // - For a Job, we watch for completion. + // - For all else, we watch until Ready. + // In the future, we might want to add some special logic for types + // like Ingress, Volume, etc. + ctx, cancel := watchtools.ContextWithOptionalTimeout(context.Background(), timeout) + defer cancel() + + _, err = watchtools.UntilWithSync(ctx, lw, objType, nil, func(e watch.Event) (bool, error) { + obj := e.Object + switch e.Type { + case watch.Added, watch.Modified: + // For things like a secret or a config map, this is the best indicator + // we get. We care mostly about jobs, where what we want to see is + // the status go into a good state. + logger.Printf("Add/Modify event for %s: %v", res.Name, e.Type) + switch res.GVK.Kind { + case "Job": + return g.waitForJob(obj, res.Name) + case "Pod": + return g.waitForPodSuccess(obj, res.Name) + case "Deployment": + return g.waitForDeploymentSuccess(obj, res.Name, clientSet) + case "DaemonSet": + return g.waitForDaemonSetSuccess(obj, res.Name) + case "StatefulSet": + return g.waitForStatefulSetSuccess(obj, res.Name) + } + return true, nil + case watch.Deleted: + logger.Printf("Deleted event for %s", res.Name) + return true, nil + case watch.Error: + // Handle error and return with an error. + logger.Printf("Error event for %s", res.Name) + return true, pkgerrors.New("failed to deploy " + res.Name) + default: + return false, nil + } + }) + if err != nil { + logger.Printf("Error in Rss %s", res.Name) + return err + } else { + logger.Printf("Done for %s", res.Name) + return nil + } +} + +// waitForJob is a helper that waits for a job to complete. +// +// This operates on an event returned from a watcher. +func (g genericPlugin) waitForJob(obj runtime.Object, name string) (bool, error) { + o, ok := obj.(*batchv1.Job) + if !ok { + return true, pkgerrors.New("expected " + name + " to be a *batch.Job, got " + obj.GetObjectKind().GroupVersionKind().Kind) + } + + for _, c := range o.Status.Conditions { + if c.Type == batchv1.JobComplete && c.Status == "True" { + return true, nil + } else if c.Type == batchv1.JobFailed && c.Status == "True" { + return true, pkgerrors.New("job failed: " + c.Reason) + } + } + + logger.Printf("%s: Jobs active: %d, jobs failed: %d, jobs succeeded: %d", name, o.Status.Active, o.Status.Failed, o.Status.Succeeded) + return false, nil +} + +// waitForPodSuccess is a helper that waits for a pod to complete. +// +// This operates on an event returned from a watcher. +func (g genericPlugin) waitForPodSuccess(obj runtime.Object, name string) (bool, error) { + o, ok := obj.(*corev1.Pod) + if !ok { + return true, pkgerrors.New("expected " + name + " to be a *v1.Pod, got " + obj.GetObjectKind().GroupVersionKind().Kind) + } + + switch o.Status.Phase { + case corev1.PodSucceeded: + logger.Printf("Pod %s succeeded", o.Name) + return true, nil + case corev1.PodFailed: + return true, pkgerrors.New("pod " + o.Name + " failed") + case corev1.PodPending: + logger.Printf("Pod %s pending", o.Name) + case corev1.PodRunning: + logger.Printf("Pod %s running", o.Name) + } + + return false, nil +} + +// waitForDeploymentSuccess is a helper that waits for a deployment to run. +// +// This operates on an event returned from a watcher. +func (g genericPlugin) waitForDeploymentSuccess(obj runtime.Object, name string, clientSet kubernetes.Interface) (bool, error) { + o, ok := obj.(*appsv1.Deployment) + if !ok { + return true, pkgerrors.New("expected " + name + " to be a *apps.Deployment, got " + obj.GetObjectKind().GroupVersionKind().Kind) + } + + // If paused deployment will never be ready -> consider ready + if o.Spec.Paused { + logger.Printf("Depoyment %s is paused, consider ready", o.Name) + return true, nil + } + + // Find RS associated with deployment + newReplicaSet, err := app.GetNewReplicaSet(o, clientSet.AppsV1()) + if err != nil || newReplicaSet == nil { + return false, err + } + expectedReady := *o.Spec.Replicas - app.MaxUnavailable(*o) + if !(newReplicaSet.Status.ReadyReplicas >= expectedReady) { + logger.Printf("Deployment is not ready: %s/%s. %d out of %d expected pods are ready", o.Namespace, o.Name, newReplicaSet.Status.ReadyReplicas, expectedReady) + return false, nil + } + return true, nil +} + +// waitForDaemonSetSuccess is a helper that waits for a daemonSet to run. +// +// This operates on an event returned from a watcher. +func (g genericPlugin) waitForDaemonSetSuccess(obj runtime.Object, name string) (bool, error) { + o, ok := obj.(*appsv1.DaemonSet) + if !ok { + return true, pkgerrors.New("expected " + name + " to be a *apps.DaemonSet, got " + obj.GetObjectKind().GroupVersionKind().Kind) + } + + // If the update strategy is not a rolling update, there will be nothing to wait for + if o.Spec.UpdateStrategy.Type != appsv1.RollingUpdateDaemonSetStrategyType { + return true, nil + } + + // Make sure all the updated pods have been scheduled + if o.Status.UpdatedNumberScheduled != o.Status.DesiredNumberScheduled { + logger.Printf("DaemonSet is not ready: %s/%s. %d out of %d expected pods have been scheduled", o.Namespace, o.Name, o.Status.UpdatedNumberScheduled, o.Status.DesiredNumberScheduled) + return false, nil + } + maxUnavailable, err := intstr.GetValueFromIntOrPercent(o.Spec.UpdateStrategy.RollingUpdate.MaxUnavailable, int(o.Status.DesiredNumberScheduled), true) + if err != nil { + // If for some reason the value is invalid, set max unavailable to the + // number of desired replicas. This is the same behavior as the + // `MaxUnavailable` function in deploymentutil + maxUnavailable = int(o.Status.DesiredNumberScheduled) + } + + expectedReady := int(o.Status.DesiredNumberScheduled) - maxUnavailable + if !(int(o.Status.NumberReady) >= expectedReady) { + logger.Printf("DaemonSet is not ready: %s/%s. %d out of %d expected pods are ready", o.Namespace, o.Name, o.Status.NumberReady, expectedReady) + return false, nil + } + return true, nil +} + +// waitForStatefulSetSuccess is a helper that waits for a statefulSet to run. +// +// This operates on an event returned from a watcher. +func (g genericPlugin) waitForStatefulSetSuccess(obj runtime.Object, name string) (bool, error) { + o, ok := obj.(*appsv1.StatefulSet) + if !ok { + return true, pkgerrors.New("expected " + name + " to be a *apps.StatefulSet, got " + obj.GetObjectKind().GroupVersionKind().Kind) + } + + // If the update strategy is not a rolling update, there will be nothing to wait for + if o.Spec.UpdateStrategy.Type != appsv1.RollingUpdateStatefulSetStrategyType { + return true, nil + } + + // Dereference all the pointers because StatefulSets like them + var partition int + // 1 is the default for replicas if not set + var replicas = 1 + // For some reason, even if the update strategy is a rolling update, the + // actual rollingUpdate field can be nil. If it is, we can safely assume + // there is no partition value + if o.Spec.UpdateStrategy.RollingUpdate != nil && o.Spec.UpdateStrategy.RollingUpdate.Partition != nil { + partition = int(*o.Spec.UpdateStrategy.RollingUpdate.Partition) + } + if o.Spec.Replicas != nil { + replicas = int(*o.Spec.Replicas) + } + + // Because an update strategy can use partitioning, we need to calculate the + // number of updated replicas we should have. For example, if the replicas + // is set to 3 and the partition is 2, we'd expect only one pod to be + // updated + expectedReplicas := replicas - partition + + // Make sure all the updated pods have been scheduled + if int(o.Status.UpdatedReplicas) != expectedReplicas { + logger.Printf("StatefulSet is not ready: %s/%s. %d out of %d expected pods have been scheduled", o.Namespace, o.Name, o.Status.UpdatedReplicas, expectedReplicas) + return false, nil + } + + if int(o.Status.ReadyReplicas) != replicas { + logger.Printf("StatefulSet is not ready: %s/%s. %d out of %d expected pods are ready", o.Namespace, o.Name, o.Status.ReadyReplicas, replicas) + return false, nil + } + return true, nil +} + // Create generic object in a specific Kubernetes cluster func (g genericPlugin) Create(yamlFilePath string, namespace string, client plugin.KubernetesConnector) (string, error) { if namespace == "" { |