diff options
Diffstat (limited to 'src/k8splugin/internal/app')
-rw-r--r-- | src/k8splugin/internal/app/client.go | 162 | ||||
-rw-r--r-- | src/k8splugin/internal/app/client_test.go | 2 | ||||
-rw-r--r-- | src/k8splugin/internal/app/config.go | 21 | ||||
-rw-r--r-- | src/k8splugin/internal/app/config_backend.go | 43 | ||||
-rw-r--r-- | src/k8splugin/internal/app/config_test.go | 45 | ||||
-rw-r--r-- | src/k8splugin/internal/app/deploymentutil.go | 178 | ||||
-rw-r--r-- | src/k8splugin/internal/app/hook.go | 183 | ||||
-rw-r--r-- | src/k8splugin/internal/app/hook_sorter.go | 50 | ||||
-rw-r--r-- | src/k8splugin/internal/app/hook_test.go | 264 | ||||
-rw-r--r-- | src/k8splugin/internal/app/instance.go | 596 | ||||
-rw-r--r-- | src/k8splugin/internal/app/instance_test.go | 99 | ||||
-rw-r--r-- | src/k8splugin/internal/app/query.go | 108 |
12 files changed, 1670 insertions, 81 deletions
diff --git a/src/k8splugin/internal/app/client.go b/src/k8splugin/internal/app/client.go index 00fd8e97..9813333e 100644 --- a/src/k8splugin/internal/app/client.go +++ b/src/k8splugin/internal/app/client.go @@ -1,6 +1,8 @@ /* Copyright 2018 Intel Corporation. Copyright © 2021 Samsung Electronics +Copyright © 2021 Orange +Copyright © 2021 Nokia Bell Labs. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. @@ -18,6 +20,15 @@ package app import ( "context" "io/ioutil" + appsv1 "k8s.io/api/apps/v1" + //appsv1beta1 "k8s.io/api/apps/v1beta1" + //appsv1beta2 "k8s.io/api/apps/v1beta2" + batchv1 "k8s.io/api/batch/v1" + corev1 "k8s.io/api/core/v1" + //extensionsv1beta1 "k8s.io/api/extensions/v1beta1" + //apiextv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1" + //apiextv1beta1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1beta1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "os" "strings" "time" @@ -27,10 +38,10 @@ import ( "github.com/onap/multicloud-k8s/src/k8splugin/internal/helm" log "github.com/onap/multicloud-k8s/src/k8splugin/internal/logutils" "github.com/onap/multicloud-k8s/src/k8splugin/internal/plugin" + logger "log" pkgerrors "github.com/pkg/errors" "k8s.io/apimachinery/pkg/api/meta" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" @@ -61,6 +72,137 @@ type ResourceStatus struct { Status unstructured.Unstructured `json:"status"` } +func (k *KubernetesClient) getObjTypeForHook(kind string) (runtime.Object, error) { + switch kind { + case "Job": + return &batchv1.Job{}, nil + case "Pod": + return &corev1.Pod{}, nil + case "Deployment": + return &appsv1.Deployment{}, nil + case "DaemonSet": + return &appsv1.DaemonSet{}, nil + case "StatefulSet": + return &appsv1.StatefulSet{}, nil + default: + return nil, pkgerrors.New("kind " + kind + " unknown") + } +} + +func (k *KubernetesClient) getRestApi(apiVersion string) (rest.Interface, error) { + //based on kubectl api-versions + switch apiVersion { + case "admissionregistration.k8s.io/v1": + return k.clientSet.AdmissionregistrationV1().RESTClient(), nil + case "admissionregistration.k8s.io/v1beta1": + return k.clientSet.AdmissionregistrationV1beta1().RESTClient(), nil + case "apps/v1": + return k.clientSet.AppsV1().RESTClient(), nil + case "apps/v1beta1": + return k.clientSet.AppsV1beta1().RESTClient(), nil + case "apps/v1beta2": + return k.clientSet.AppsV1beta2().RESTClient(), nil + case "authentication.k8s.io/v1": + return k.clientSet.AuthenticationV1().RESTClient(), nil + case "authentication.k8s.io/v1beta1": + return k.clientSet.AuthenticationV1beta1().RESTClient(), nil + case "authorization.k8s.io/v1": + return k.clientSet.AuthorizationV1().RESTClient(), nil + case "authorization.k8s.io/v1beta1": + return k.clientSet.AuthorizationV1beta1().RESTClient(), nil + case "autoscaling/v1": + return k.clientSet.AutoscalingV1().RESTClient(), nil + case "autoscaling/v2beta1": + return k.clientSet.AutoscalingV2beta1().RESTClient(), nil + case "autoscaling/v2beta2": + return k.clientSet.AutoscalingV2beta2().RESTClient(), nil + case "batch/v1": + return k.clientSet.BatchV1().RESTClient(), nil + case "batch/v1beta1": + return k.clientSet.BatchV1beta1().RESTClient(), nil + case "certificates.k8s.io/v1": + return k.clientSet.CertificatesV1().RESTClient(), nil + case "certificates.k8s.io/v1beta1": + return k.clientSet.CertificatesV1beta1().RESTClient(), nil + case "coordination.k8s.io/v1": + return k.clientSet.CoordinationV1().RESTClient(), nil + case "coordination.k8s.io/v1beta1": + return k.clientSet.CoordinationV1beta1().RESTClient(), nil + case "v1": + return k.clientSet.CoreV1().RESTClient(), nil + case "discovery.k8s.io/v1beta1": + return k.clientSet.DiscoveryV1beta1().RESTClient(), nil + case "events.k8s.io/v1": + return k.clientSet.EventsV1().RESTClient(), nil + case "events.k8s.io/v1beta1": + return k.clientSet.EventsV1beta1().RESTClient(), nil + case "extensions/v1beta1": + return k.clientSet.ExtensionsV1beta1().RESTClient(), nil + case "flowcontrol.apiserver.k8s.io/v1alpha1": + return k.clientSet.FlowcontrolV1alpha1().RESTClient(), nil + case "networking.k8s.io/v1": + return k.clientSet.NetworkingV1().RESTClient(), nil + case "networking.k8s.io/v1beta1": + return k.clientSet.NetworkingV1beta1().RESTClient(), nil + case "node.k8s.io/v1alpha1": + return k.clientSet.NodeV1alpha1().RESTClient(), nil + case "node.k8s.io/v1beta1": + return k.clientSet.NodeV1beta1().RESTClient(), nil + case "policy/v1beta1": + return k.clientSet.PolicyV1beta1().RESTClient(), nil + case "rbac.authorization.k8s.io/v1": + return k.clientSet.RbacV1().RESTClient(), nil + case "rbac.authorization.k8s.io/v1alpha1": + return k.clientSet.RbacV1alpha1().RESTClient(), nil + case "rbac.authorization.k8s.io/v1beta1": + return k.clientSet.RbacV1beta1().RESTClient(), nil + case "scheduling.k8s.io/v1": + return k.clientSet.SchedulingV1().RESTClient(), nil + case "scheduling.k8s.io/v1alpha1": + return k.clientSet.SchedulingV1alpha1().RESTClient(), nil + case "scheduling.k8s.io/v1beta1": + return k.clientSet.SchedulingV1beta1().RESTClient(), nil + case "storage.k8s.io/v1": + return k.clientSet.StorageV1().RESTClient(), nil + case "storage.k8s.io/v1alpha1": + return k.clientSet.StorageV1alpha1().RESTClient(), nil + case "storage.k8s.io/v1beta1": + return k.clientSet.StorageV1beta1().RESTClient(), nil + default: + return nil, pkgerrors.New("Api version " + apiVersion + " unknown") + } +} + +func (k *KubernetesClient) WatchHookUntilReady(timeout time.Duration, ns string, res helm.KubernetesResource) error { + //for now, only generic plugin has dedicated WatchUntilReady implemented. Later, we can implement this function + //for each plugin separately. + pluginImpl, err := plugin.GetPluginByKind("generic") + if err != nil { + return pkgerrors.Wrap(err, "Error loading plugin") + } + + mapper := k.GetMapper() + apiVersion, kind := res.GVK.ToAPIVersionAndKind() + if apiVersion == "" { + //apiVersion is empty -> we can suppose that the rss is ready + logger.Printf("apiVersion is empty, consider that the rss is ready") + return nil + } + objType, err := k.getObjTypeForHook(kind) + if err != nil { + //have error from getObjTypeForHook -> this kind is not considered in hook -> consider ready + return nil + } + + logger.Printf("apiVersion: %s, Kind: %s", apiVersion, kind) + restClient, err := k.getRestApi(apiVersion) + if err != nil { + return pkgerrors.Wrap(err, "Get rest client") + } + + return pluginImpl.WatchUntilReady(timeout, ns, res, mapper, restClient, objType, k.clientSet) +} + // getPodsByLabel yields status of all pods under given instance ID func (k *KubernetesClient) getPodsByLabel(namespace string) ([]ResourceStatus, error) { client := k.GetStandardClient().CoreV1().Pods(namespace) @@ -121,9 +263,11 @@ func (k *KubernetesClient) queryResources(apiVersion, kind, labelSelector, names return nil, pkgerrors.Wrap(err, "Querying for resources") } - resp := make([]ResourceStatus, len(unstrList.Items)) + resp := make([]ResourceStatus, 0) for _, unstr := range unstrList.Items { - resp = append(resp, ResourceStatus{unstr.GetName(), gvk, unstr}) + if unstr.GetName() != "" { + resp = append(resp, ResourceStatus{unstr.GetName(), gvk, unstr}) + } } return resp, nil } @@ -276,8 +420,7 @@ func (k *KubernetesClient) ensureNamespace(namespace string) error { return nil } -func (k *KubernetesClient) CreateKind(resTempl helm.KubernetesResourceTemplate, - namespace string) (helm.KubernetesResource, error) { +func (k *KubernetesClient) CreateKind(resTempl helm.KubernetesResourceTemplate, namespace string) (helm.KubernetesResource, error) { if _, err := os.Stat(resTempl.FilePath); os.IsNotExist(err) { return helm.KubernetesResource{}, pkgerrors.New("File " + resTempl.FilePath + "does not exists") @@ -317,7 +460,7 @@ func (k *KubernetesClient) updateKind(resTempl helm.KubernetesResourceTemplate, namespace string) (helm.KubernetesResource, error) { if _, err := os.Stat(resTempl.FilePath); os.IsNotExist(err) { - return helm.KubernetesResource{}, pkgerrors.New("File " + resTempl.FilePath + "does not exists") + return helm.KubernetesResource{}, pkgerrors.New("File " + resTempl.FilePath + " does not exists") } log.Info("Processing Kubernetes Resource", log.Fields{ @@ -353,16 +496,17 @@ func (k *KubernetesClient) updateKind(resTempl helm.KubernetesResourceTemplate, func (k *KubernetesClient) createResources(sortedTemplates []helm.KubernetesResourceTemplate, namespace string) ([]helm.KubernetesResource, error) { + var createdResources []helm.KubernetesResource + err := k.ensureNamespace(namespace) if err != nil { - return nil, pkgerrors.Wrap(err, "Creating Namespace") + return createdResources, pkgerrors.Wrap(err, "Creating Namespace") } - var createdResources []helm.KubernetesResource for _, resTempl := range sortedTemplates { resCreated, err := k.CreateKind(resTempl, namespace) if err != nil { - return nil, pkgerrors.Wrapf(err, "Error creating kind: %+v", resTempl.GVK) + return createdResources, pkgerrors.Wrapf(err, "Error creating kind: %+v", resTempl.GVK) } createdResources = append(createdResources, resCreated) } diff --git a/src/k8splugin/internal/app/client_test.go b/src/k8splugin/internal/app/client_test.go index 6db541a4..0ba244d2 100644 --- a/src/k8splugin/internal/app/client_test.go +++ b/src/k8splugin/internal/app/client_test.go @@ -15,13 +15,13 @@ package app import ( "encoding/base64" + "github.com/onap/multicloud-k8s/src/k8splugin/internal/utils" "io/ioutil" "os" "plugin" "reflect" "testing" - utils "github.com/onap/multicloud-k8s/src/k8splugin/internal" "github.com/onap/multicloud-k8s/src/k8splugin/internal/connection" "github.com/onap/multicloud-k8s/src/k8splugin/internal/db" "github.com/onap/multicloud-k8s/src/k8splugin/internal/helm" diff --git a/src/k8splugin/internal/app/config.go b/src/k8splugin/internal/app/config.go index d0f8876d..94acadcc 100644 --- a/src/k8splugin/internal/app/config.go +++ b/src/k8splugin/internal/app/config.go @@ -42,7 +42,7 @@ type ConfigResult struct { ProfileName string `json:"profile-name"` ConfigName string `json:"config-name"` TemplateName string `json:"template-name"` - ConfigVersion uint `json:"config-verion"` + ConfigVersion uint `json:"config-version"` } //ConfigRollback input @@ -62,6 +62,7 @@ type ConfigTagit struct { type ConfigManager interface { Create(instanceID string, p Config) (ConfigResult, error) Get(instanceID, configName string) (Config, error) + List(instanceID string) ([]Config, error) Help() map[string]string Update(instanceID, configName string, p Config) (ConfigResult, error) Delete(instanceID, configName string) (ConfigResult, error) @@ -225,6 +226,24 @@ func (v *ConfigClient) Get(instanceID, configName string) (Config, error) { return cfg, nil } +// List config entry in the database +func (v *ConfigClient) List(instanceID string) ([]Config, error) { + + // Acquire per profile Mutex + lock, _ := getProfileData(instanceID) + lock.Lock() + defer lock.Unlock() + // Read Config DB + cs := ConfigStore{ + instanceID: instanceID, + } + cfg, err := cs.getConfigList() + if err != nil { + return []Config{}, pkgerrors.Wrap(err, "Get Config DB Entry") + } + return cfg, nil +} + // Delete the Config from database func (v *ConfigClient) Delete(instanceID, configName string) (ConfigResult, error) { diff --git a/src/k8splugin/internal/app/config_backend.go b/src/k8splugin/internal/app/config_backend.go index e2f802c7..30a480df 100644 --- a/src/k8splugin/internal/app/config_backend.go +++ b/src/k8splugin/internal/app/config_backend.go @@ -170,6 +170,33 @@ func (c ConfigStore) getConfig() (Config, error) { return Config{}, pkgerrors.Wrap(err, "Get Config DB Entry") } +// Read the config entry in the database +func (c ConfigStore) getConfigList() ([]Config, error) { + rbName, rbVersion, profileName, _, err := resolveModelFromInstance(c.instanceID) + if err != nil { + return []Config{}, pkgerrors.Wrap(err, "Retrieving model info") + } + cfgKey := constructKey(rbName, rbVersion, profileName, c.instanceID, tagConfig) + values, err := db.Etcd.GetAll(cfgKey) + if err != nil { + return []Config{}, pkgerrors.Wrap(err, "Get Config DB List") + } + //value is a byte array + if values != nil { + result := make([]Config, 0) + for _, value := range values { + cfg := Config{} + err = db.DeSerialize(string(value), &cfg) + if err != nil { + return []Config{}, pkgerrors.Wrap(err, "Unmarshaling Config Value") + } + result = append(result, cfg) + } + return result, nil + } + return []Config{}, pkgerrors.Wrap(err, "Get Config DB List") +} + // Delete the config entry in the database func (c ConfigStore) deleteConfig() (Config, error) { @@ -353,12 +380,12 @@ func (c ConfigVersionStore) decrementVersion() error { // Apply Config func applyConfig(instanceID string, p Config, pChannel chan configResourceList, action string) error { - rbName, rbVersion, profileName, _, err := resolveModelFromInstance(instanceID) + rbName, rbVersion, profileName, releaseName, err := resolveModelFromInstance(instanceID) if err != nil { return pkgerrors.Wrap(err, "Retrieving model info") } // Get Template and Resolve the template with values - crl, err := resolve(rbName, rbVersion, profileName, p) + crl, err := resolve(rbName, rbVersion, profileName, p, releaseName) if err != nil { return pkgerrors.Wrap(err, "Resolve Config") } @@ -436,7 +463,7 @@ func scheduleResources(c chan configResourceList) { //Resolve returns the path where the helm chart merged with //configuration overrides resides. -var resolve = func(rbName, rbVersion, profileName string, p Config) (configResourceList, error) { +var resolve = func(rbName, rbVersion, profileName string, p Config, releaseName string) (configResourceList, error) { var resTemplates []helm.KubernetesResourceTemplate @@ -483,9 +510,17 @@ var resolve = func(rbName, rbVersion, profileName string, p Config) (configResou return configResourceList{}, pkgerrors.Wrap(err, "Extracting Template") } + var finalReleaseName string + + if releaseName == "" { + finalReleaseName = profile.ReleaseName + } else { + finalReleaseName = releaseName + } + helmClient := helm.NewTemplateClient(profile.KubernetesVersion, profile.Namespace, - profile.ReleaseName) + finalReleaseName) chartPath := filepath.Join(chartBasePath, t.ChartName) resTemplates, _, err = helmClient.GenerateKubernetesArtifacts(chartPath, diff --git a/src/k8splugin/internal/app/config_test.go b/src/k8splugin/internal/app/config_test.go index 028895d7..9ee96881 100644 --- a/src/k8splugin/internal/app/config_test.go +++ b/src/k8splugin/internal/app/config_test.go @@ -19,10 +19,11 @@ package app import ( "fmt" - "github.com/onap/multicloud-k8s/src/k8splugin/internal/db" "reflect" "strings" "testing" + + "github.com/onap/multicloud-k8s/src/k8splugin/internal/db" // pkgerrors "github.com/pkg/errors" ) @@ -90,7 +91,7 @@ func TestCreateConfig(t *testing.T) { db.Etcd = testCase.mockdb db.DBconn = provideMockModelData(testCase.instanceID, testCase.rbName, testCase.rbVersion, testCase.profileName) - resolve = func(rbName, rbVersion, profileName string, p Config) (configResourceList, error) { + resolve = func(rbName, rbVersion, profileName string, p Config, releaseName string) (configResourceList, error) { return configResourceList{}, nil } impl := NewConfigClient() @@ -104,7 +105,7 @@ func TestCreateConfig(t *testing.T) { } } else { if reflect.DeepEqual(testCase.expected, got) == false { - t.Errorf("Create Resource Bundle returned unexpected body: got %v;"+ + t.Errorf("Create returned unexpected body: got %v;"+ " expected %v", got, testCase.expected) } } @@ -203,7 +204,7 @@ func TestRollbackConfig(t *testing.T) { db.Etcd = testCase.mockdb db.DBconn = provideMockModelData(testCase.instanceID, testCase.rbName, testCase.rbVersion, testCase.profileName) - resolve = func(rbName, rbVersion, profileName string, p Config) (configResourceList, error) { + resolve = func(rbName, rbVersion, profileName string, p Config, releaseName string) (configResourceList, error) { return configResourceList{}, nil } impl := NewConfigClient() @@ -217,10 +218,38 @@ func TestRollbackConfig(t *testing.T) { } } else { if reflect.DeepEqual(testCase.expected1, got) == false { - t.Errorf("Create Resource Bundle returned unexpected body: got %v;"+ + t.Errorf("Create returned unexpected body: got %v;"+ " expected %v", got, testCase.expected1) } } + get, err := impl.Get(testCase.instanceID, testCase.inp.ConfigName) + if err != nil { + if testCase.expectedError == "" { + t.Fatalf("Get returned an unexpected error %s", err) + } + if strings.Contains(err.Error(), testCase.expectedError) == false { + t.Fatalf("Get returned an unexpected error %s", err) + } + } else { + if reflect.DeepEqual(testCase.inp, get) == false { + t.Errorf("Get returned unexpected body: got %v;"+ + " expected %v", get, testCase.inp) + } + } + getList, err := impl.List(testCase.instanceID) + if err != nil { + if testCase.expectedError == "" { + t.Fatalf("List returned an unexpected error %s", err) + } + if strings.Contains(err.Error(), testCase.expectedError) == false { + t.Fatalf("List returned an unexpected error %s", err) + } + } else { + if reflect.DeepEqual([]Config{testCase.inp}, getList) == false { + t.Errorf("List returned unexpected body: got %v;"+ + " expected %v", getList, []Config{testCase.inp}) + } + } got, err = impl.Update(testCase.instanceID, testCase.inp.ConfigName, testCase.inpUpdate1) if err != nil { if testCase.expectedError == "" { @@ -231,7 +260,7 @@ func TestRollbackConfig(t *testing.T) { } } else { if reflect.DeepEqual(testCase.expected2, got) == false { - t.Errorf("Create Resource Bundle returned unexpected body: got %v;"+ + t.Errorf("Create returned unexpected body: got %v;"+ " expected %v", got, testCase.expected2) } } @@ -245,7 +274,7 @@ func TestRollbackConfig(t *testing.T) { } } else { if reflect.DeepEqual(testCase.expected3, got) == false { - t.Errorf("Create Resource Bundle returned unexpected body: got %v;"+ + t.Errorf("Create returned unexpected body: got %v;"+ " expected %v", got, testCase.expected3) } } @@ -259,7 +288,7 @@ func TestRollbackConfig(t *testing.T) { } } else { if reflect.DeepEqual(testCase.expected4, got) == false { - t.Errorf("Create Resource Bundle returned unexpected body: got %v;"+ + t.Errorf("Create returned unexpected body: got %v;"+ " expected %v", got, testCase.expected4) } } diff --git a/src/k8splugin/internal/app/deploymentutil.go b/src/k8splugin/internal/app/deploymentutil.go new file mode 100644 index 00000000..e945b055 --- /dev/null +++ b/src/k8splugin/internal/app/deploymentutil.go @@ -0,0 +1,178 @@ +/* +Copyright 2016 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package app + +import ( + "context" + "sort" + + apps "k8s.io/api/apps/v1" + v1 "k8s.io/api/core/v1" + apiequality "k8s.io/apimachinery/pkg/api/equality" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + intstrutil "k8s.io/apimachinery/pkg/util/intstr" + appsclient "k8s.io/client-go/kubernetes/typed/apps/v1" +) + +// deploymentutil contains a copy of a few functions from Kubernetes controller code to avoid a dependency on k8s.io/kubernetes. +// This code is copied from https://github.com/kubernetes/kubernetes/blob/e856613dd5bb00bcfaca6974431151b5c06cbed5/pkg/controller/deployment/util/deployment_util.go +// No changes to the code were made other than removing some unused functions + +// RsListFunc returns the ReplicaSet from the ReplicaSet namespace and the List metav1.ListOptions. +type RsListFunc func(string, metav1.ListOptions) ([]*apps.ReplicaSet, error) + +// ListReplicaSets returns a slice of RSes the given deployment targets. +// Note that this does NOT attempt to reconcile ControllerRef (adopt/orphan), +// because only the controller itself should do that. +// However, it does filter out anything whose ControllerRef doesn't match. +func ListReplicaSets(deployment *apps.Deployment, getRSList RsListFunc) ([]*apps.ReplicaSet, error) { + // TODO: Right now we list replica sets by their labels. We should list them by selector, i.e. the replica set's selector + // should be a superset of the deployment's selector, see https://github.com/kubernetes/kubernetes/issues/19830. + namespace := deployment.Namespace + selector, err := metav1.LabelSelectorAsSelector(deployment.Spec.Selector) + if err != nil { + return nil, err + } + options := metav1.ListOptions{LabelSelector: selector.String()} + all, err := getRSList(namespace, options) + if err != nil { + return nil, err + } + // Only include those whose ControllerRef matches the Deployment. + owned := make([]*apps.ReplicaSet, 0, len(all)) + for _, rs := range all { + if metav1.IsControlledBy(rs, deployment) { + owned = append(owned, rs) + } + } + return owned, nil +} + +// ReplicaSetsByCreationTimestamp sorts a list of ReplicaSet by creation timestamp, using their names as a tie breaker. +type ReplicaSetsByCreationTimestamp []*apps.ReplicaSet + +func (o ReplicaSetsByCreationTimestamp) Len() int { return len(o) } +func (o ReplicaSetsByCreationTimestamp) Swap(i, j int) { o[i], o[j] = o[j], o[i] } +func (o ReplicaSetsByCreationTimestamp) Less(i, j int) bool { + if o[i].CreationTimestamp.Equal(&o[j].CreationTimestamp) { + return o[i].Name < o[j].Name + } + return o[i].CreationTimestamp.Before(&o[j].CreationTimestamp) +} + +// FindNewReplicaSet returns the new RS this given deployment targets (the one with the same pod template). +func FindNewReplicaSet(deployment *apps.Deployment, rsList []*apps.ReplicaSet) *apps.ReplicaSet { + sort.Sort(ReplicaSetsByCreationTimestamp(rsList)) + for i := range rsList { + if EqualIgnoreHash(&rsList[i].Spec.Template, &deployment.Spec.Template) { + // In rare cases, such as after cluster upgrades, Deployment may end up with + // having more than one new ReplicaSets that have the same template as its template, + // see https://github.com/kubernetes/kubernetes/issues/40415 + // We deterministically choose the oldest new ReplicaSet. + return rsList[i] + } + } + // new ReplicaSet does not exist. + return nil +} + +// EqualIgnoreHash returns true if two given podTemplateSpec are equal, ignoring the diff in value of Labels[pod-template-hash] +// We ignore pod-template-hash because: +// 1. The hash result would be different upon podTemplateSpec API changes +// (e.g. the addition of a new field will cause the hash code to change) +// 2. The deployment template won't have hash labels +func EqualIgnoreHash(template1, template2 *v1.PodTemplateSpec) bool { + t1Copy := template1.DeepCopy() + t2Copy := template2.DeepCopy() + // Remove hash labels from template.Labels before comparing + delete(t1Copy.Labels, apps.DefaultDeploymentUniqueLabelKey) + delete(t2Copy.Labels, apps.DefaultDeploymentUniqueLabelKey) + return apiequality.Semantic.DeepEqual(t1Copy, t2Copy) +} + +// GetNewReplicaSet returns a replica set that matches the intent of the given deployment; get ReplicaSetList from client interface. +// Returns nil if the new replica set doesn't exist yet. +func GetNewReplicaSet(deployment *apps.Deployment, c appsclient.AppsV1Interface) (*apps.ReplicaSet, error) { + rsList, err := ListReplicaSets(deployment, RsListFromClient(c)) + if err != nil { + return nil, err + } + return FindNewReplicaSet(deployment, rsList), nil +} + +// RsListFromClient returns an rsListFunc that wraps the given client. +func RsListFromClient(c appsclient.AppsV1Interface) RsListFunc { + return func(namespace string, options metav1.ListOptions) ([]*apps.ReplicaSet, error) { + rsList, err := c.ReplicaSets(namespace).List(context.Background(), options) + if err != nil { + return nil, err + } + var ret []*apps.ReplicaSet + for i := range rsList.Items { + ret = append(ret, &rsList.Items[i]) + } + return ret, err + } +} + +// IsRollingUpdate returns true if the strategy type is a rolling update. +func IsRollingUpdate(deployment *apps.Deployment) bool { + return deployment.Spec.Strategy.Type == apps.RollingUpdateDeploymentStrategyType +} + +// MaxUnavailable returns the maximum unavailable pods a rolling deployment can take. +func MaxUnavailable(deployment apps.Deployment) int32 { + if !IsRollingUpdate(&deployment) || *(deployment.Spec.Replicas) == 0 { + return int32(0) + } + // Error caught by validation + _, maxUnavailable, _ := ResolveFenceposts(deployment.Spec.Strategy.RollingUpdate.MaxSurge, deployment.Spec.Strategy.RollingUpdate.MaxUnavailable, *(deployment.Spec.Replicas)) + if maxUnavailable > *deployment.Spec.Replicas { + return *deployment.Spec.Replicas + } + return maxUnavailable +} + +// ResolveFenceposts resolves both maxSurge and maxUnavailable. This needs to happen in one +// step. For example: +// +// 2 desired, max unavailable 1%, surge 0% - should scale old(-1), then new(+1), then old(-1), then new(+1) +// 1 desired, max unavailable 1%, surge 0% - should scale old(-1), then new(+1) +// 2 desired, max unavailable 25%, surge 1% - should scale new(+1), then old(-1), then new(+1), then old(-1) +// 1 desired, max unavailable 25%, surge 1% - should scale new(+1), then old(-1) +// 2 desired, max unavailable 0%, surge 1% - should scale new(+1), then old(-1), then new(+1), then old(-1) +// 1 desired, max unavailable 0%, surge 1% - should scale new(+1), then old(-1) +func ResolveFenceposts(maxSurge, maxUnavailable *intstrutil.IntOrString, desired int32) (int32, int32, error) { + surge, err := intstrutil.GetValueFromIntOrPercent(intstrutil.ValueOrDefault(maxSurge, intstrutil.FromInt(0)), int(desired), true) + if err != nil { + return 0, 0, err + } + unavailable, err := intstrutil.GetValueFromIntOrPercent(intstrutil.ValueOrDefault(maxUnavailable, intstrutil.FromInt(0)), int(desired), false) + if err != nil { + return 0, 0, err + } + + if surge == 0 && unavailable == 0 { + // Validation should never allow the user to explicitly use zero values for both maxSurge + // maxUnavailable. Due to rounding down maxUnavailable though, it may resolve to zero. + // If both fenceposts resolve to zero, then we should set maxUnavailable to 1 on the + // theory that surge might not work due to quota. + unavailable = 1 + } + + return int32(surge), int32(unavailable), nil +} diff --git a/src/k8splugin/internal/app/hook.go b/src/k8splugin/internal/app/hook.go new file mode 100644 index 00000000..ebf5f8e3 --- /dev/null +++ b/src/k8splugin/internal/app/hook.go @@ -0,0 +1,183 @@ +/* +Copyright © 2021 Nokia Bell Labs +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package app + +import ( + "fmt" + "github.com/onap/multicloud-k8s/src/k8splugin/internal/db" + "github.com/onap/multicloud-k8s/src/k8splugin/internal/helm" + "helm.sh/helm/v3/pkg/release" + "log" + "strings" + "time" +) + +// Timeout used when deleting resources with a hook-delete-policy. +const defaultHookDeleteTimeoutInSeconds = int64(60) + +// HookClient implements the Helm Hook interface +type HookClient struct { + kubeNameSpace string + id string + dbStoreName string + dbTagInst string +} + +type MultiCloudHook struct{ + release.Hook + Group string + Version string +} + +// NewHookClient returns a new instance of HookClient +func NewHookClient(namespace, id, dbStoreName, dbTagInst string) *HookClient { + return &HookClient{ + kubeNameSpace: namespace, + id: id, + dbStoreName: dbStoreName, + dbTagInst: dbTagInst, + } +} + +func (hc *HookClient) getHookByEvent(hs []*helm.Hook, hook release.HookEvent) []*helm.Hook { + hooks := []*helm.Hook{} + for _, h := range hs { + for _, e := range h.Hook.Events { + if e == hook { + hooks = append(hooks, h) + } + } + } + return hooks +} + +// Mimic function ExecHook in helm/pkg/tiller/release_server.go +func (hc *HookClient) ExecHook( + k8sClient KubernetesClient, + hs []*helm.Hook, + hook release.HookEvent, + timeout int64, + startIndex int, + dbData *InstanceDbData) (error){ + executingHooks := hc.getHookByEvent(hs, hook) + key := InstanceKey{ + ID: hc.id, + } + log.Printf("Executing %d %s hook(s) for instance %s", len(executingHooks), hook, hc.id) + executingHooks = sortByHookWeight(executingHooks) + + for index, h := range executingHooks { + if index < startIndex { + continue + } + // Set default delete policy to before-hook-creation + if h.Hook.DeletePolicies == nil || len(h.Hook.DeletePolicies) == 0 { + h.Hook.DeletePolicies = []release.HookDeletePolicy{release.HookBeforeHookCreation} + } + if err := hc.deleteHookByPolicy(h, release.HookBeforeHookCreation, k8sClient); err != nil { + return err + } + //update DB here before the creation of the hook, if the plugin quits + //-> when it comes back, it will continue from next hook and consider that this one is done + if dbData != nil { + dbData.HookProgress = fmt.Sprintf("%d/%d", index + 1, len(executingHooks)) + err := db.DBconn.Update(hc.dbStoreName, key, hc.dbTagInst, dbData) + if err != nil { + return err + } + } + log.Printf(" Instance: %s, Creating %s hook %s, index %d", hc.id, hook, h.Hook.Name, index) + resTempl := helm.KubernetesResourceTemplate{ + GVK: h.KRT.GVK, + FilePath: h.KRT.FilePath, + } + createdHook, err := k8sClient.CreateKind(resTempl, hc.kubeNameSpace) + if err != nil { + log.Printf(" Instance: %s, Warning: %s hook %s, filePath: %s, error: %s", hc.id, hook, h.Hook.Name, h.KRT.FilePath, err) + hc.deleteHookByPolicy(h, release.HookFailed, k8sClient) + return err + } + if hook != "crd-install" { + //timeout <= 0 -> do not wait + if timeout > 0 { + // Watch hook resources until they are completed + err = k8sClient.WatchHookUntilReady(time.Duration(timeout)*time.Second, hc.kubeNameSpace, createdHook) + if err != nil { + // If a hook is failed, check the annotation of the hook to determine whether the hook should be deleted + // under failed condition. If so, then clear the corresponding resource object in the hook + if err := hc.deleteHookByPolicy(h, release.HookFailed, k8sClient); err != nil { + return err + } + return err + } + } + } else { + //Do not handle CRD Hooks + } + } + + for _, h := range executingHooks { + if err := hc.deleteHookByPolicy(h, release.HookSucceeded, k8sClient); err != nil { + log.Printf(" Instance: %s, Warning: Error deleting %s hook %s based on delete policy, continue", hc.id, hook, h.Hook.Name) + return err + } + } + log.Printf("%d %s hook(s) complete for release %s", len(executingHooks), hook, hc.id) + return nil +} + +func (hc *HookClient) deleteHookByPolicy(h *helm.Hook, policy release.HookDeletePolicy, k8sClient KubernetesClient) error { + rss := helm.KubernetesResource{ + GVK: h.KRT.GVK, + Name: h.Hook.Name, + } + if hookHasDeletePolicy(h, policy) { + log.Printf(" Instance: %s, Deleting hook %s due to %q policy", hc.id, h.Hook.Name, policy) + if errHookDelete := k8sClient.deleteResources(append([]helm.KubernetesResource{}, rss), hc.kubeNameSpace); errHookDelete != nil { + if strings.Contains(errHookDelete.Error(), "not found") { + return nil + } else { + log.Printf(" Instance: %s, Warning: hook %s, filePath %s could not be deleted: %s", hc.id, h.Hook.Name, h.KRT.FilePath ,errHookDelete) + return errHookDelete + } + } else { + //Verify that the rss is deleted + isDeleted := false + for !isDeleted { + log.Printf(" Instance: %s, Waiting on deleting hook %s for release %s due to %q policy", hc.id, h.Hook.Name, hc.id, policy) + if _, err := k8sClient.GetResourceStatus(rss, hc.kubeNameSpace); err != nil { + if strings.Contains(err.Error(), "not found") { + log.Printf(" Instance: %s, Deleted hook %s for release %s due to %q policy", hc.id, h.Hook.Name, hc.id, policy) + return nil + } else { + isDeleted = true + } + } + time.Sleep(5 * time.Second) + } + } + } + return nil +} + +// hookHasDeletePolicy determines whether the defined hook deletion policy matches the hook deletion polices +// supported by helm. If so, mark the hook as one should be deleted. +func hookHasDeletePolicy(h *helm.Hook, policy release.HookDeletePolicy) bool { + for _, v := range h.Hook.DeletePolicies { + if policy == v { + return true + } + } + return false +}
\ No newline at end of file diff --git a/src/k8splugin/internal/app/hook_sorter.go b/src/k8splugin/internal/app/hook_sorter.go new file mode 100644 index 00000000..fa6a9830 --- /dev/null +++ b/src/k8splugin/internal/app/hook_sorter.go @@ -0,0 +1,50 @@ +/* +Copyright © 2021 Nokia Bell Labs +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package app + +import ( + "github.com/onap/multicloud-k8s/src/k8splugin/internal/helm" + sortLib "sort" +) + +// sortByHookWeight does an in-place sort of hooks by their supplied weight. +func sortByHookWeight(hooks []*helm.Hook) []*helm.Hook { + hs := newHookWeightSorter(hooks) + sortLib.Sort(hs) + return hs.hooks +} + +type hookWeightSorter struct { + hooks []*helm.Hook +} + +func newHookWeightSorter(h []*helm.Hook) *hookWeightSorter { + return &hookWeightSorter{ + hooks: h, + } +} + +func (hs *hookWeightSorter) Len() int { return len(hs.hooks) } + +func (hs *hookWeightSorter) Swap(i, j int) { + hs.hooks[i], hs.hooks[j] = hs.hooks[j], hs.hooks[i] +} + +func (hs *hookWeightSorter) Less(i, j int) bool { + if hs.hooks[i].Hook.Weight == hs.hooks[j].Hook.Weight { + return hs.hooks[i].Hook.Name < hs.hooks[j].Hook.Name + } + return hs.hooks[i].Hook.Weight < hs.hooks[j].Hook.Weight +} + diff --git a/src/k8splugin/internal/app/hook_test.go b/src/k8splugin/internal/app/hook_test.go new file mode 100644 index 00000000..9c63194e --- /dev/null +++ b/src/k8splugin/internal/app/hook_test.go @@ -0,0 +1,264 @@ +/* +Copyright © 2021 Nokia Bell Labs. +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package app + +import ( + "encoding/base64" + "github.com/onap/multicloud-k8s/src/k8splugin/internal/utils" + "github.com/onap/multicloud-k8s/src/k8splugin/internal/connection" + "github.com/onap/multicloud-k8s/src/k8splugin/internal/helm" + "github.com/onap/multicloud-k8s/src/k8splugin/internal/db" + "helm.sh/helm/v3/pkg/release" + "helm.sh/helm/v3/pkg/time" + "io/ioutil" + "k8s.io/apimachinery/pkg/runtime/schema" + "testing" +) + +func generateHookList() []*helm.Hook { + var hookList []*helm.Hook + preInstallHook1 := helm.Hook{ + Hook: release.Hook{ + Name : "preinstall1", + Kind : "Job", + Path : "", + Manifest : "", + Events : []release.HookEvent{release.HookPreInstall}, + LastRun : release.HookExecution{ + StartedAt: time.Now(), + CompletedAt: time.Now(), + Phase: "", + }, + Weight : -5, + DeletePolicies : []release.HookDeletePolicy{}, + }, + KRT: helm.KubernetesResourceTemplate{ + GVK: schema.GroupVersionKind{ + Group: "batch", + Version: "v1", + Kind: "Job", + }, + FilePath: "../../mock_files/mock_yamls/job.yaml", + }, + } + preInstallHook2 := helm.Hook{ + Hook: release.Hook{ + Name : "preinstall2", + Kind : "Deployment", + Path : "", + Manifest : "", + Events : []release.HookEvent{release.HookPreInstall}, + LastRun : release.HookExecution{ + StartedAt: time.Now(), + CompletedAt: time.Now(), + Phase: "", + }, + Weight : 0, + DeletePolicies : []release.HookDeletePolicy{}, + }, + KRT: helm.KubernetesResourceTemplate{ + GVK: schema.GroupVersionKind{ + Group: "batch", + Version: "v1", + Kind: "Job", + }, + FilePath: "../../mock_files/mock_yamls/job.yaml", + }, + } + postInstallHook := helm.Hook{ + Hook: release.Hook{ + Name : "postinstall", + Kind : "Job", + Path : "", + Manifest : "", + Events : []release.HookEvent{release.HookPostInstall}, + LastRun : release.HookExecution{ + StartedAt: time.Now(), + CompletedAt: time.Now(), + Phase: "", + }, + Weight : -5, + DeletePolicies : []release.HookDeletePolicy{}, + }, + KRT: helm.KubernetesResourceTemplate{ + GVK: schema.GroupVersionKind{ + Group: "batch", + Version: "v1", + Kind: "Job", + }, + FilePath: "../../mock_files/mock_yamls/job.yaml", + }, + } + preDeleteHook := helm.Hook{ + Hook: release.Hook{ + Name : "predelete", + Kind : "Job", + Path : "", + Manifest : "", + Events : []release.HookEvent{release.HookPreDelete}, + LastRun : release.HookExecution{ + StartedAt: time.Now(), + CompletedAt: time.Now(), + Phase: "", + }, + Weight : -5, + DeletePolicies : []release.HookDeletePolicy{}, + }, + KRT: helm.KubernetesResourceTemplate{ + GVK: schema.GroupVersionKind{ + Group: "batch", + Version: "v1", + Kind: "Job", + }, + FilePath: "../../mock_files/mock_yamls/job.yaml", + }, + } + postDeleteHook := helm.Hook{ + Hook: release.Hook{ + Name : "postdelete", + Kind : "Job", + Path : "", + Manifest : "", + Events : []release.HookEvent{release.HookPostDelete}, + LastRun : release.HookExecution{ + StartedAt: time.Now(), + CompletedAt: time.Now(), + Phase: "", + }, + Weight : -5, + DeletePolicies : []release.HookDeletePolicy{}, + }, + KRT: helm.KubernetesResourceTemplate{ + GVK: schema.GroupVersionKind{ + Group: "batch", + Version: "v1", + Kind: "Job", + }, + FilePath: "../../mock_files/mock_yamls/job.yaml", + }, + } + hookList = append(hookList, &preInstallHook2) + hookList = append(hookList, &preInstallHook1) + hookList = append(hookList, &postInstallHook) + hookList = append(hookList, &preDeleteHook) + hookList = append(hookList, &postDeleteHook) + + return hookList +} + +func TestGetHookByEvent(t *testing.T) { + hookList := generateHookList() + hookClient := NewHookClient("test", "test", "rbdef", "instance") + t.Run("Get pre-install hook", func(t *testing.T) { + preinstallList := hookClient.getHookByEvent(hookList, release.HookPreInstall) + if len(preinstallList) != 2 { + t.Fatalf("TestGetHookByEvent error: expected=2 preinstall hook, result= %d", len(preinstallList)) + } + if preinstallList[0].Hook.Name != "preinstall2" { + t.Fatalf("TestGetHookByEvent error: expect name of 1st preinstall hook is preinstall2, result= %s", preinstallList[0].Hook.Name) + } + if preinstallList[1].Hook.Name != "preinstall1" { + t.Fatalf("TestGetHookByEvent error: expect name of 2nd preinstall hook is preinstall1, result= %s", preinstallList[0].Hook.Name) + } + }) + t.Run("Get post-install hook", func(t *testing.T) { + postinstallList := hookClient.getHookByEvent(hookList, release.HookPostInstall) + if len(postinstallList) != 1 { + t.Fatalf("TestGetHookByEvent error: expected=1 postinstall hook, result= %d", len(postinstallList)) + } + if postinstallList[0].Hook.Name != "postinstall" { + t.Fatalf("TestGetHookByEvent error: expect name of 1st postinstall hook is postinstall, result= %s", postinstallList[0].Hook.Name) + } + }) + t.Run("Get pre-delete hook", func(t *testing.T) { + predeleteList := hookClient.getHookByEvent(hookList, release.HookPreDelete) + if len(predeleteList) != 1 { + t.Fatalf("TestGetHookByEvent error: expected=1 predelete hook, result= %d", len(predeleteList)) + } + if predeleteList[0].Hook.Name != "predelete" { + t.Fatalf("TestGetHookByEvent error: expect name of 1st predelete hook is predelete, result= %s", predeleteList[0].Hook.Name) + } + }) + t.Run("Get post-delete hook", func(t *testing.T) { + postdeleteList := hookClient.getHookByEvent(hookList, release.HookPostDelete) + if len(postdeleteList) != 1 { + t.Fatalf("TestGetHookByEvent error: expected=1 postdelete hook, result= %d", len(postdeleteList)) + } + if postdeleteList[0].Hook.Name != "postdelete" { + t.Fatalf("TestGetHookByEvent error: expect name of 1st postdelete hook is postdelete, result= %s", postdeleteList[0].Hook.Name) + } + }) +} + +func TestShortHook(t *testing.T) { + hookList := generateHookList() + hookClient := NewHookClient("test", "test", "rbdef", "instance") + preinstallList := hookClient.getHookByEvent(hookList, release.HookPreInstall) + t.Run("Short pre-install hook", func(t *testing.T) { + shortedHooks := sortByHookWeight(preinstallList) + if shortedHooks[0].Hook.Name != "preinstall1" { + t.Fatalf("TestShortHook error: expect name of 1st preinstall hook is preinstall1, result= %s", preinstallList[0].Hook.Name) + } + if shortedHooks[1].Hook.Name != "preinstall2" { + t.Fatalf("TestShortHook error: expect name of 2nd preinstall hook is preinstall2, result= %s", preinstallList[0].Hook.Name) + } + }) +} + +func TestExecHook(t *testing.T) { + hookList := generateHookList() + hookClient := NewHookClient("test", "test", "rbdef", "instance") + err := LoadMockPlugins(utils.LoadedPlugins) + if err != nil { + t.Fatalf("LoadMockPlugins returned an error (%s)", err) + } + + // Load the mock kube config file into memory + fd, err := ioutil.ReadFile("../../mock_files/mock_configs/mock_kube_config") + if err != nil { + t.Fatal("Unable to read mock_kube_config") + } + db.DBconn = &db.MockDB{ + Items: map[string]map[string][]byte{ + connection.ConnectionKey{CloudRegion: "mock_connection"}.String(): { + "metadata": []byte( + "{\"cloud-region\":\"mock_connection\"," + + "\"cloud-owner\":\"mock_owner\"," + + "\"kubeconfig\": \"" + base64.StdEncoding.EncodeToString(fd) + "\"}"), + }, + }, + } + + k8sClient := KubernetesClient{} + err = k8sClient.Init("mock_connection", "test") + if err != nil { + t.Fatal(err.Error()) + } + err = hookClient.ExecHook(k8sClient, hookList, release.HookPreInstall,10,0, nil) + if err != nil { + t.Fatal(err.Error()) + } + err = hookClient.ExecHook(k8sClient, hookList, release.HookPostInstall,10,0, nil) + if err != nil { + t.Fatal(err.Error()) + } + err = hookClient.ExecHook(k8sClient, hookList, release.HookPreDelete,10,0, nil) + if err != nil { + t.Fatal(err.Error()) + } + err = hookClient.ExecHook(k8sClient, hookList, release.HookPostDelete,10,0, nil) + if err != nil { + t.Fatal(err.Error()) + } +}
\ No newline at end of file diff --git a/src/k8splugin/internal/app/instance.go b/src/k8splugin/internal/app/instance.go index c1ec35b6..5aa60882 100644 --- a/src/k8splugin/internal/app/instance.go +++ b/src/k8splugin/internal/app/instance.go @@ -1,6 +1,8 @@ /* * Copyright 2018 Intel Corporation, Inc * Copyright © 2021 Samsung Electronics + * Copyright © 2021 Orange + * Copyright © 2021 Nokia Bell Labs * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -18,16 +20,29 @@ package app import ( + "context" "encoding/json" + "log" + "strings" + "strconv" + "time" + + appsv1 "k8s.io/api/apps/v1" + batchv1 "k8s.io/api/batch/v1" + corev1 "k8s.io/api/core/v1" + apiextv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/cli-runtime/pkg/resource" + "github.com/onap/multicloud-k8s/src/k8splugin/internal/db" "github.com/onap/multicloud-k8s/src/k8splugin/internal/helm" "github.com/onap/multicloud-k8s/src/k8splugin/internal/namegenerator" "github.com/onap/multicloud-k8s/src/k8splugin/internal/rb" - "k8s.io/apimachinery/pkg/runtime/schema" - "log" - "strings" + "github.com/onap/multicloud-k8s/src/k8splugin/internal/statuscheck" pkgerrors "github.com/pkg/errors" + "helm.sh/helm/v3/pkg/release" ) // InstanceRequest contains the parameters needed for instantiation @@ -52,6 +67,22 @@ type InstanceResponse struct { Hooks []*helm.Hook `json:"-"` } +// InstanceDbData contains the data to put to Db +type InstanceDbData struct { + ID string `json:"id"` + Request InstanceRequest `json:"request"` + Namespace string `json:"namespace"` + Status string `json:"status"` + ReleaseName string `json:"release-name"` + Resources []helm.KubernetesResource `json:"resources"` + Hooks []*helm.Hook `json:"hooks"` + HookProgress string `json:"hook-progress"` + PreInstallTimeout int64 `json:"PreInstallTimeout"` + PostInstallTimeout int64 `json:"PostInstallTimeout"` + PreDeleteTimeout int64 `json:"PreDeleteTimeout"` + PostDeleteTimeout int64 `json:"PostDeleteTimeout"` +} + // InstanceMiniResponse contains the response from instantiation // It does NOT include the created resources. // Use the regular GET to get the created resources for a particular instance @@ -74,11 +105,13 @@ type InstanceStatus struct { type InstanceManager interface { Create(i InstanceRequest) (InstanceResponse, error) Get(id string) (InstanceResponse, error) + GetFull(id string) (InstanceDbData, error) Status(id string) (InstanceStatus, error) Query(id, apiVersion, kind, name, labels string) (InstanceStatus, error) List(rbname, rbversion, profilename string) ([]InstanceMiniResponse, error) Find(rbName string, ver string, profile string, labelKeys map[string]string) ([]InstanceMiniResponse, error) Delete(id string) error + RecoverCreateOrDelete(id string) error } // InstanceKey is used as the primary key in the db @@ -100,13 +133,16 @@ func (dk InstanceKey) String() string { // InstanceClient implements the InstanceManager interface // It will also be used to maintain some localized state type InstanceClient struct { - storeName string - tagInst string + storeName string + tagInst string } // NewInstanceClient returns an instance of the InstanceClient // which implements the InstanceManager func NewInstanceClient() *InstanceClient { + //TODO: Call RecoverCreateOrDelete to perform recovery when the plugin restart. + //Not implement here now because We have issue with current test set (date race) + return &InstanceClient{ storeName: "rbdef", tagInst: "instance", @@ -125,7 +161,6 @@ func resolveModelFromInstance(instanceID string) (rbName, rbVersion, profileName // Create an instance of rb on the cluster in the database func (v *InstanceClient) Create(i InstanceRequest) (InstanceResponse, error) { - // Name is required if i.RBName == "" || i.RBVersion == "" || i.ProfileName == "" || i.CloudRegion == "" { return InstanceResponse{}, @@ -141,10 +176,52 @@ func (v *InstanceClient) Create(i InstanceRequest) (InstanceResponse, error) { //Convert override values from map to array of strings of the following format //foo=bar overrideValues := []string{} + var preInstallTimeOut, postInstallTimeOut, preDeleteTimeout, postDeleteTimeout int64 if i.OverrideValues != nil { + preInstallTimeOutStr, ok := i.OverrideValues["k8s-rb-instance-pre-install-timeout"] + if !ok { + preInstallTimeOutStr = "60" + } + preInstallTimeOut,err = strconv.ParseInt(preInstallTimeOutStr, 10, 64) + if err != nil { + return InstanceResponse{}, pkgerrors.Wrap(err, "Error parsing k8s-rb-instance-pre-install-timeout") + } + + postInstallTimeOutStr, ok := i.OverrideValues["k8s-rb-instance-post-install-timeout"] + if !ok { + postInstallTimeOutStr = "600" + } + postInstallTimeOut,err = strconv.ParseInt(postInstallTimeOutStr, 10, 64) + if err != nil { + return InstanceResponse{}, pkgerrors.Wrap(err, "Error parsing k8s-rb-instance-post-install-timeout") + } + + preDeleteTimeOutStr, ok := i.OverrideValues["k8s-rb-instance-pre-delete-timeout"] + if !ok { + preDeleteTimeOutStr = "60" + } + preDeleteTimeout,err = strconv.ParseInt(preDeleteTimeOutStr, 10, 64) + if err != nil { + return InstanceResponse{}, pkgerrors.Wrap(err, "Error parsing k8s-rb-instance-pre-delete-timeout") + } + + postDeleteTimeOutStr, ok := i.OverrideValues["k8s-rb-instance-post-delete-timeout"] + if !ok { + postDeleteTimeOutStr = "600" + } + postDeleteTimeout,err = strconv.ParseInt(postDeleteTimeOutStr, 10, 64) + if err != nil { + return InstanceResponse{}, pkgerrors.Wrap(err, "Error parsing k8s-rb-instance-post-delete-timeout") + } + for k, v := range i.OverrideValues { overrideValues = append(overrideValues, k+"="+v) } + } else { + preInstallTimeOut = 60 + postInstallTimeOut = 600 + preDeleteTimeout = 60 + postDeleteTimeout = 600 } //Execute the kubernetes create command @@ -162,11 +239,93 @@ func (v *InstanceClient) Create(i InstanceRequest) (InstanceResponse, error) { return InstanceResponse{}, pkgerrors.Wrap(err, "Getting CloudRegion Information") } - createdResources, err := k8sClient.createResources(sortedTemplates, profile.Namespace) + log.Printf("Main rss info") + for _,t := range sortedTemplates { + log.Printf(" Path: %s", t.FilePath) + log.Printf(" Kind: %s", t.GVK.Kind) + } + + log.Printf("Hook info") + for _,h := range hookList { + log.Printf(" Name: %s", h.Hook.Name) + log.Printf(" Events: %s", h.Hook.Events) + log.Printf(" Weight: %d", h.Hook.Weight) + log.Printf(" DeletePolicies: %s", h.Hook.DeletePolicies) + } + dbData := InstanceDbData{ + ID: id, + Request: i, + Namespace: profile.Namespace, + ReleaseName: releaseName, + Status: "PRE-INSTALL", + Resources: []helm.KubernetesResource{}, + Hooks: hookList, + HookProgress: "", + PreInstallTimeout: preInstallTimeOut, + PostInstallTimeout: postInstallTimeOut, + PreDeleteTimeout: preDeleteTimeout, + PostDeleteTimeout: postDeleteTimeout, + } + + key := InstanceKey{ + ID: id, + } + err = db.DBconn.Create(v.storeName, key, v.tagInst, dbData) + if err != nil { + return InstanceResponse{}, pkgerrors.Wrap(err, "Creating Instance DB Entry") + } + + err = k8sClient.ensureNamespace(profile.Namespace) + if err != nil { + return InstanceResponse{}, pkgerrors.Wrap(err, "Creating Namespace") + } + + hookClient := NewHookClient(profile.Namespace, id, v.storeName, v.tagInst) + if len(hookClient.getHookByEvent(hookList, release.HookPreInstall)) != 0 { + err = hookClient.ExecHook(k8sClient, hookList, release.HookPreInstall, preInstallTimeOut, 0, &dbData) + if err != nil { + log.Printf("Error running preinstall hooks for release %s, Error: %s. Stop here", releaseName, err) + err2 := db.DBconn.Delete(v.storeName, key, v.tagInst) + if err2 != nil { + log.Printf("Error cleaning failed instance in DB, please check DB.") + } + return InstanceResponse{}, pkgerrors.Wrap(err, "Error running preinstall hooks") + } + } + + dbData.Status = "CREATING" + err = db.DBconn.Update(v.storeName, key, v.tagInst, dbData) + if err != nil { + err = db.DBconn.Delete(v.storeName, key, v.tagInst) + if err != nil { + log.Printf("Delete Instance DB Entry for release %s has error.", releaseName) + } + return InstanceResponse{}, pkgerrors.Wrap(err, "Update Instance DB Entry") + } + + //Main rss creation is supposed to be very quick -> no need to support recover for main rss + createdResources, err := k8sClient.createResources(sortedTemplates, profile.Namespace); if err != nil { + if len(createdResources) > 0 { + log.Printf("[Instance] Reverting created resources on Error: %s", err.Error()) + k8sClient.deleteResources(createdResources, profile.Namespace) + } + log.Printf(" Instance: %s, Main rss are failed, skip post-install and remove instance in DB", id) + //main rss creation failed -> remove instance in DB + err = db.DBconn.Delete(v.storeName, key, v.tagInst) + if err != nil { + log.Printf("Delete Instance DB Entry for release %s has error.", releaseName) + } return InstanceResponse{}, pkgerrors.Wrap(err, "Create Kubernetes Resources") } + dbData.Status = "CREATED" + dbData.Resources = createdResources + err = db.DBconn.Update(v.storeName, key, v.tagInst, dbData) + if err != nil { + return InstanceResponse{}, pkgerrors.Wrap(err, "Update Instance DB Entry") + } + //Compose the return response resp := InstanceResponse{ ID: id, @@ -177,15 +336,71 @@ func (v *InstanceClient) Create(i InstanceRequest) (InstanceResponse, error) { Hooks: hookList, } + if len(hookClient.getHookByEvent(hookList, release.HookPostInstall)) != 0 { + go func() { + dbData.Status = "POST-INSTALL" + dbData.HookProgress = "" + err = hookClient.ExecHook(k8sClient, hookList, release.HookPostInstall, postInstallTimeOut, 0, &dbData) + if err != nil { + dbData.Status = "POST-INSTALL-FAILED" + log.Printf(" Instance: %s, Error running postinstall hooks error: %s", id, err) + } else { + dbData.Status = "DONE" + } + err = db.DBconn.Update(v.storeName, key, v.tagInst, dbData) + if err != nil { + log.Printf("Update Instance DB Entry for release %s has error.", releaseName) + } + }() + } else { + dbData.Status = "DONE" + err = db.DBconn.Update(v.storeName, key, v.tagInst, dbData) + if err != nil { + log.Printf("Update Instance DB Entry for release %s has error.", releaseName) + } + } + + return resp, nil +} + +// Get returns the full instance for corresponding ID +func (v *InstanceClient) GetFull(id string) (InstanceDbData, error) { key := InstanceKey{ ID: id, } - err = db.DBconn.Create(v.storeName, key, v.tagInst, resp) + value, err := db.DBconn.Read(v.storeName, key, v.tagInst) if err != nil { - return InstanceResponse{}, pkgerrors.Wrap(err, "Creating Instance DB Entry") + return InstanceDbData{}, pkgerrors.Wrap(err, "Get Instance") } - return resp, nil + //value is a byte array + if value != nil { + resp := InstanceDbData{} + err = db.DBconn.Unmarshal(value, &resp) + if err != nil { + return InstanceDbData{}, pkgerrors.Wrap(err, "Unmarshaling Instance Value") + } + //In case that we are communicating with an old db, some field will missing -> fill it with default value + if resp.Status == "" { + //For instance that is in Db -> consider it's DONE + resp.Status = "DONE" + } + if resp.PreInstallTimeout == 0 { + resp.PreInstallTimeout = 60 + } + if resp.PostInstallTimeout == 0 { + resp.PostInstallTimeout = 600 + } + if resp.PreDeleteTimeout == 0 { + resp.PreInstallTimeout = 60 + } + if resp.PostDeleteTimeout == 0 { + resp.PostDeleteTimeout = 600 + } + return resp, nil + } + + return InstanceDbData{}, pkgerrors.New("Error getting Instance") } // Get returns the instance for corresponding ID @@ -214,6 +429,7 @@ func (v *InstanceClient) Get(id string) (InstanceResponse, error) { // Query returns state of instance's filtered resources func (v *InstanceClient) Query(id, apiVersion, kind, name, labels string) (InstanceStatus, error) { + queryClient := NewQueryClient() //Read the status from the DB key := InstanceKey{ ID: id, @@ -231,54 +447,21 @@ func (v *InstanceClient) Query(id, apiVersion, kind, name, labels string) (Insta return InstanceStatus{}, pkgerrors.Wrap(err, "Unmarshaling Instance Value") } - k8sClient := KubernetesClient{} - err = k8sClient.Init(resResp.Request.CloudRegion, id) + resources, err := queryClient.Query(resResp.Namespace, resResp.Request.CloudRegion, apiVersion, kind, name, labels, id) if err != nil { - return InstanceStatus{}, pkgerrors.Wrap(err, "Getting CloudRegion Information") - } - - var resourcesStatus []ResourceStatus - if labels != "" { - resList, err := k8sClient.queryResources(apiVersion, kind, labels, resResp.Namespace) - if err != nil { - return InstanceStatus{}, pkgerrors.Wrap(err, "Querying Resources") - } - // If user specifies both label and name, we want to pick up only single resource from these matching label - if name != "" { - //Assigning 0-length, because we may actually not find matching name - resourcesStatus = make([]ResourceStatus, 0) - for _, res := range resList { - if res.Name == name { - resourcesStatus = append(resourcesStatus, res) - break - } - } - } else { - resourcesStatus = resList - } - } else if name != "" { - resIdentifier := helm.KubernetesResource{ - Name: name, - GVK: schema.FromAPIVersionAndKind(apiVersion, kind), - } - res, err := k8sClient.GetResourceStatus(resIdentifier, resResp.Namespace) - if err != nil { - return InstanceStatus{}, pkgerrors.Wrap(err, "Querying Resource") - } - resourcesStatus = []ResourceStatus{res} + return InstanceStatus{}, pkgerrors.Wrap(err, "Querying Resources") } resp := InstanceStatus{ Request: resResp.Request, - ResourceCount: int32(len(resourcesStatus)), - ResourcesStatus: resourcesStatus, + ResourceCount: resources.ResourceCount, + ResourcesStatus: resources.ResourcesStatus, } return resp, nil } // Status returns the status for the instance func (v *InstanceClient) Status(id string) (InstanceStatus, error) { - //Read the status from the DB key := InstanceKey{ ID: id, @@ -294,7 +477,7 @@ func (v *InstanceClient) Status(id string) (InstanceStatus, error) { return InstanceStatus{}, pkgerrors.New("Status is not available") } - resResp := InstanceResponse{} + resResp := InstanceDbData{} err = db.DBconn.Unmarshal(value, &resResp) if err != nil { return InstanceStatus{}, pkgerrors.Wrap(err, "Unmarshaling Instance Value") @@ -312,25 +495,36 @@ func (v *InstanceClient) Status(id string) (InstanceStatus, error) { cumulatedErrorMsg = append(cumulatedErrorMsg, err.Error()) } + isReady := true generalStatus := make([]ResourceStatus, 0, len(resResp.Resources)) Main: - for _, resource := range resResp.Resources { + for _, oneResource := range resResp.Resources { for _, pod := range podsStatus { - if resource.GVK == pod.GVK && resource.Name == pod.Name { + if oneResource.GVK == pod.GVK && oneResource.Name == pod.Name { continue Main //Don't double check pods if someone decided to define pod explicitly in helm chart } } - status, err := k8sClient.GetResourceStatus(resource, resResp.Namespace) + status, err := k8sClient.GetResourceStatus(oneResource, resResp.Namespace) if err != nil { cumulatedErrorMsg = append(cumulatedErrorMsg, err.Error()) + isReady = false } else { generalStatus = append(generalStatus, status) + ready, err := v.checkRssStatus(oneResource, k8sClient, resResp.Namespace, status) + + if !ready || err != nil { + isReady = false + if err != nil { + cumulatedErrorMsg = append(cumulatedErrorMsg, err.Error()) + } + } } } + //We still need to iterate through rss list even the status is not DONE, to gather status of rss + pod for the response resp := InstanceStatus{ Request: resResp.Request, ResourceCount: int32(len(generalStatus) + len(podsStatus)), - Ready: false, //FIXME To determine readiness, some parsing of status fields is necessary + Ready: isReady && resResp.Status == "DONE", ResourcesStatus: append(generalStatus, podsStatus...), } @@ -344,6 +538,70 @@ Main: return resp, nil } +func (v *InstanceClient) checkRssStatus(rss helm.KubernetesResource, k8sClient KubernetesClient, namespace string, status ResourceStatus) (bool, error) { + readyChecker := statuscheck.NewReadyChecker(k8sClient.clientSet, statuscheck.PausedAsReady(true), statuscheck.CheckJobs(true)) + ctx, cancel := context.WithTimeout(context.Background(), time.Duration(60)*time.Second) + defer cancel() + + apiVersion, kind := rss.GVK.ToAPIVersionAndKind() + log.Printf("apiVersion: %s, Kind: %s", apiVersion, kind) + + var parsedRes runtime.Object + //TODO: Should we care about different api version for a same kind? + switch kind { + case "Pod": + parsedRes = new(corev1.Pod) + case "Job": + parsedRes = new(batchv1.Job) + case "Deployment": + parsedRes = new(appsv1.Deployment) + case "PersistentVolumeClaim": + parsedRes = new(corev1.PersistentVolume) + case "Service": + parsedRes = new(corev1.Service) + case "DaemonSet": + parsedRes = new(appsv1.DaemonSet) + case "CustomResourceDefinition": + parsedRes = new(apiextv1.CustomResourceDefinition) + case "StatefulSet": + parsedRes = new(appsv1.StatefulSet) + case "ReplicationController": + parsedRes = new(corev1.ReplicationController) + case "ReplicaSet": + parsedRes = new(appsv1.ReplicaSet) + default: + //For not listed resource, consider ready + return true, nil + } + + restClient, err := k8sClient.getRestApi(apiVersion) + if err != nil { + return false, err + } + mapper := k8sClient.GetMapper() + mapping, err := mapper.RESTMapping(schema.GroupKind{ + Group: rss.GVK.Group, + Kind: rss.GVK.Kind, + }, rss.GVK.Version) + resourceInfo := resource.Info{ + Client: restClient, + Mapping: mapping, + Namespace: namespace, + Name: rss.Name, + Source: "", + Object: nil, + ResourceVersion: "", + } + + err = runtime.DefaultUnstructuredConverter.FromUnstructured(status.Status.Object, parsedRes) + if err != nil { + return false, err + } + resourceInfo.Object = parsedRes + ready, err := readyChecker.IsReady(ctx, &resourceInfo) + return ready, err +} + // List returns the instance for corresponding ID // Empty string returns all func (v *InstanceClient) List(rbname, rbversion, profilename string) ([]InstanceMiniResponse, error) { @@ -358,7 +616,7 @@ func (v *InstanceClient) List(rbname, rbversion, profilename string) ([]Instance for key, value := range dbres { //value is a byte array if value != nil { - resp := InstanceResponse{} + resp := InstanceDbData{} err = db.DBconn.Unmarshal(value, &resp) if err != nil { log.Printf("[Instance] Error: %s Unmarshaling Instance: %s", err.Error(), key) @@ -385,6 +643,11 @@ func (v *InstanceClient) List(rbname, rbversion, profilename string) ([]Instance continue } + if resp.Status == "PRE-INSTALL" { + //DO not add instance which is in pre-install phase + continue + } + results = append(results, miniresp) } } @@ -423,7 +686,6 @@ func (v *InstanceClient) Find(rbName string, version string, profile string, lab if add { ret = append(ret, resp) } - } return ret, nil @@ -431,29 +693,249 @@ func (v *InstanceClient) Find(rbName string, version string, profile string, lab // Delete the Instance from database func (v *InstanceClient) Delete(id string) error { - inst, err := v.Get(id) + inst, err := v.GetFull(id) if err != nil { return pkgerrors.Wrap(err, "Error getting Instance") } + key := InstanceKey{ + ID: id, + } + if inst.Status == "DELETED" { + //The instance is deleted when the plugin comes back -> just remove from Db + err = db.DBconn.Delete(v.storeName, key, v.tagInst) + if err != nil { + log.Printf("Delete Instance DB Entry for release %s has error.", inst.ReleaseName) + } + return nil + } else if inst.Status != "DONE"{ + //Recover is ongoing, do nothing here + return nil + } k8sClient := KubernetesClient{} err = k8sClient.Init(inst.Request.CloudRegion, inst.ID) if err != nil { return pkgerrors.Wrap(err, "Getting CloudRegion Information") } + inst.Status = "PRE-DELETE" + inst.HookProgress = "" + err = db.DBconn.Update(v.storeName, key, v.tagInst, inst) + if err != nil { + log.Printf("Update Instance DB Entry for release %s has error.", inst.ReleaseName) + } + + hookClient := NewHookClient(inst.Namespace, id, v.storeName, v.tagInst) + if len(hookClient.getHookByEvent(inst.Hooks, release.HookPreDelete)) != 0 { + err = hookClient.ExecHook(k8sClient, inst.Hooks, release.HookPreDelete, inst.PreDeleteTimeout, 0, &inst) + if err != nil { + log.Printf(" Instance: %s, Error running pre-delete hooks error: %s", id, err) + inst.Status = "PRE-DELETE-FAILED" + err2 := db.DBconn.Update(v.storeName, key, v.tagInst, inst) + if err2 != nil { + log.Printf("Update Instance DB Entry for release %s has error.", inst.ReleaseName) + } + return pkgerrors.Wrap(err, "Error running pre-delete hooks") + } + } + + inst.Status = "DELETING" + err = db.DBconn.Update(v.storeName, key, v.tagInst, inst) + if err != nil { + log.Printf("Update Instance DB Entry for release %s has error.", inst.ReleaseName) + } err = k8sClient.deleteResources(inst.Resources, inst.Namespace) if err != nil { return pkgerrors.Wrap(err, "Deleting Instance Resources") } + if len(hookClient.getHookByEvent(inst.Hooks, release.HookPostDelete)) != 0 { + go func() { + inst.HookProgress = "" + if err := v.runPostDelete(k8sClient, hookClient, &inst, 0, true); err != nil { + log.Printf(err.Error()) + } + }() + } else { + err = db.DBconn.Delete(v.storeName, key, v.tagInst) + if err != nil { + return pkgerrors.Wrap(err, "Delete Instance") + } + } + + return nil +} +//Continue the instantiation +func (v *InstanceClient) RecoverCreateOrDelete(id string) error { + instance, err := v.GetFull(id) + if err != nil { + return pkgerrors.Wrap(err, "Error getting instance " + id + ", skip this instance. Error detail") + } + log.Printf("Instance " + id + ", status: " + instance.Status + ", HookProgress: " + instance.HookProgress) + //have to resolve again template for this instance because all templates are in /tmp -> will be deleted when container restarts + overrideValues := []string{} + if instance.Request.OverrideValues != nil { + for k, v := range instance.Request.OverrideValues { + overrideValues = append(overrideValues, k + "=" + v) + } + } key := InstanceKey{ ID: id, } - err = db.DBconn.Delete(v.storeName, key, v.tagInst) + log.Printf(" Resolving template for release %s", instance.Request.ReleaseName) + _, hookList, _, err := rb.NewProfileClient().Resolve(instance.Request.RBName, instance.Request.RBVersion, instance.Request.ProfileName, overrideValues, instance.Request.ReleaseName) + instance.Hooks = hookList + err = db.DBconn.Update(v.storeName, key, v.tagInst, instance) + if err != nil { + return pkgerrors.Wrap(err, "Update Instance DB Entry") + } + + if strings.Contains(instance.Status, "FAILED"){ + log.Printf(" This instance has failed during instantiation, not going to recover") + return nil + } else if !strings.Contains(instance.Status, "-INSTALL") && !strings.Contains(instance.Status, "-DELETE") { + log.Printf(" This instance is not in hook state, not going to recover") + return nil + } + + splitHookProgress := strings.Split(instance.HookProgress,"/") + completedHooks,err := strconv.Atoi(splitHookProgress[0]) + if err != nil { + return pkgerrors.Wrap(err, "Error getting completed PRE-INSTALL hooks for instance " + instance.ID + ", skip. Error detail") + } + + //we can add an option to delete instances that will not be recovered from database to clean the db + if (instance.Status != "POST-INSTALL") && (instance.Status != "PRE-DELETE") && (instance.Status != "POST-DELETE") { + if instance.Status == "PRE-INSTALL" { + //Plugin quits during pre-install hooks -> Will do nothing because from SO point of view, there's no instance ID and will be reported as fail and be rolled back + log.Printf(" The plugin quits during pre-install hook of this instance, not going to recover") + } + return nil + } + k8sClient := KubernetesClient{} + err = k8sClient.Init(instance.Request.CloudRegion, id) if err != nil { - return pkgerrors.Wrap(err, "Delete Instance") + log.Printf(" Error getting CloudRegion %s", instance.Request.CloudRegion) + return nil + } + hookClient := NewHookClient(instance.Namespace, id, v.storeName, v.tagInst) + switch instance.Status { + case "POST-INSTALL": + //Plugin quits during post-install hooks -> continue + go func() { + log.Printf(" The plugin quits during post-install hook of this instance, continue post-install hook") + err = hookClient.ExecHook(k8sClient, instance.Hooks, release.HookPostInstall, instance.PostInstallTimeout, completedHooks, &instance) + log.Printf("dbData.HookProgress %s", instance.HookProgress) + if err != nil { + instance.Status = "POST-INSTALL-FAILED" + log.Printf(" Instance: %s, Error running postinstall hooks error: %s", id, err) + } else { + instance.Status = "DONE" + } + err = db.DBconn.Update(v.storeName, key, v.tagInst, instance) + if err != nil { + log.Printf("Update Instance DB Entry for release %s has error.", instance.ReleaseName) + } + }() + case "PRE-DELETE": + //Plugin quits during pre-delete hooks -> This already effects the instance -> should continue the deletion + go func() { + log.Printf(" The plugin quits during pre-delete hook of this instance, continue pre-delete hook") + err = hookClient.ExecHook(k8sClient, instance.Hooks, release.HookPreDelete, instance.PreDeleteTimeout, completedHooks, &instance) + if err != nil { + log.Printf(" Instance: %s, Error running pre-delete hooks error: %s", id, err) + instance.Status = "PRE-DELETE-FAILED" + err = db.DBconn.Update(v.storeName, key, v.tagInst, instance) + if err != nil { + log.Printf("Update Instance DB Entry for release %s has error.", instance.ReleaseName) + } + return + } + + err = k8sClient.deleteResources(instance.Resources, instance.Namespace) + if err != nil { + log.Printf(" Error running deleting instance resources, error: %s", err) + return + } + //will not delete the instance in Db to avoid error when SO call delete again and there is not instance in DB + //the instance in DB will be deleted when SO call delete again. + instance.HookProgress = "" + if err := v.runPostDelete(k8sClient, hookClient, &instance, 0, false); err != nil { + log.Printf(err.Error()) + } + }() + case "POST-DELETE": + //Plugin quits during post-delete hooks -> continue + go func() { + log.Printf(" The plugin quits during post-delete hook of this instance, continue post-delete hook") + if err := v.runPostDelete(k8sClient, hookClient, &instance, completedHooks, true); err != nil { + log.Printf(err.Error()) + } + }() + default: + log.Printf(" This instance is not in hook state, not going to recover") } return nil } + +func (v *InstanceClient) runPostDelete(k8sClient KubernetesClient, hookClient *HookClient, instance *InstanceDbData, startIndex int, clearDb bool) error { + key := InstanceKey{ + ID: instance.ID, + } + instance.Status = "POST-DELETE" + err := db.DBconn.Update(v.storeName, key, v.tagInst, instance) + if err != nil { + log.Printf("Update Instance DB Entry for release %s has error.", instance.ReleaseName) + } + err = hookClient.ExecHook(k8sClient, instance.Hooks, release.HookPostDelete, instance.PostDeleteTimeout, startIndex, instance) + if err != nil { + //If this case happen, user should clean the cluster + log.Printf(" Instance: %s, Error running post-delete hooks error: %s", instance.ID, err) + instance.Status = "POST-DELETE-FAILED" + err2 := db.DBconn.Update(v.storeName, key, v.tagInst, instance) + if err2 != nil { + log.Printf("Update Instance DB Entry for release %s has error.", instance.ReleaseName) + return pkgerrors.Wrap(err2, "Delete Instance DB Entry") + } + return pkgerrors.Wrap(err, "Error running post-delete hooks") + } + if clearDb { + err = db.DBconn.Delete(v.storeName, key, v.tagInst) + if err != nil { + log.Printf("Delete Instance DB Entry for release %s has error.", instance.ReleaseName) + return pkgerrors.Wrap(err, "Delete Instance DB Entry") + } + } else { + instance.Status = "DELETED" + err := db.DBconn.Update(v.storeName, key, v.tagInst, instance) + if err != nil { + log.Printf("Update Instance DB Entry for release %s has error.", instance.ReleaseName) + return pkgerrors.Wrap(err, "Update Instance DB Entry") + } + } + + go func() { + //Clear all hook rss that does not have delete-on-success deletion policy + log.Printf("Clean leftover hook resource") + var remainHookRss []helm.KubernetesResource + for _, h := range instance.Hooks { + res := helm.KubernetesResource{ + GVK: h.KRT.GVK, + Name: h.Hook.Name, + } + if _, err := k8sClient.GetResourceStatus(res, hookClient.kubeNameSpace); err == nil { + remainHookRss = append(remainHookRss, res) + log.Printf(" Rss %s will be deleted.", res.Name) + } + } + if len(remainHookRss) > 0 { + err = k8sClient.deleteResources(remainHookRss, hookClient.kubeNameSpace) + if err != nil { + log.Printf("Error cleaning Hook Rss, please do it manually if needed. Error: %s", err.Error()) + } + } + }() + + return nil +} diff --git a/src/k8splugin/internal/app/instance_test.go b/src/k8splugin/internal/app/instance_test.go index 2711a52f..890c4c99 100644 --- a/src/k8splugin/internal/app/instance_test.go +++ b/src/k8splugin/internal/app/instance_test.go @@ -1,5 +1,6 @@ /* Copyright 2018 Intel Corporation. +Copyright © 2021 Nokia Bell Labs. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at @@ -15,13 +16,13 @@ package app import ( "encoding/base64" + "github.com/onap/multicloud-k8s/src/k8splugin/internal/utils" "io/ioutil" "log" "reflect" "sort" "testing" - utils "github.com/onap/multicloud-k8s/src/k8splugin/internal" "github.com/onap/multicloud-k8s/src/k8splugin/internal/connection" "github.com/onap/multicloud-k8s/src/k8splugin/internal/db" "github.com/onap/multicloud-k8s/src/k8splugin/internal/helm" @@ -794,3 +795,99 @@ func TestInstanceDelete(t *testing.T) { } }) } + +//TODO: add a test when pre-hook is failed (if possible) +func TestInstanceWithHookCreate(t *testing.T) { + err := LoadMockPlugins(utils.LoadedPlugins) + if err != nil { + t.Fatalf("LoadMockPlugins returned an error (%s)", err) + } + + // Load the mock kube config file into memory + fd, err := ioutil.ReadFile("../../mock_files/mock_configs/mock_kube_config") + if err != nil { + t.Fatal("Unable to read mock_kube_config") + } + + t.Run("Successfully create Instance With Hook", func(t *testing.T) { + db.DBconn = &db.MockDB{ + Items: map[string]map[string][]byte{ + rb.ProfileKey{RBName: "test-rbdef-hook", RBVersion: "v1", + ProfileName: "profile1"}.String(): { + "profilemetadata": []byte( + "{\"profile-name\":\"profile1\"," + + "\"release-name\":\"testprofilereleasename\"," + + "\"namespace\":\"testnamespace\"," + + "\"rb-name\":\"test-rbdef\"," + + "\"rb-version\":\"v1\"," + + "\"kubernetesversion\":\"1.12.3\"}"), + // base64 encoding of vagrant/tests/vnfs/testrb/helm/profile + "profilecontent": []byte("H4sICCVd3FwAA3Byb2ZpbGUxLnRhcgDt1NEKgjAUxvFd7ylG98aWO" + + "sGXiYELxLRwJvj2rbyoIPDGiuD/uzmwM9iB7Vvruvrgw7CdXHsUn6Ejm2W3aopcP9eZL" + + "YRJM1voPN+ZndAm16kVSn9onheXMLheKeGqfdM0rq07/3bfUv9PJUkiR9+H+tSVajRym" + + "M6+lEqN7njxoVSbU+z2deX388r9nWzkr8fGSt5d79pnLOZfm0f+dRrzb7P4DZD/LyDJA" + + "AAAAAAAAAAAAAAA/+0Ksq1N5QAoAAA="), + }, + rb.DefinitionKey{RBName: "test-rbdef-hook", RBVersion: "v1"}.String(): { + "defmetadata": []byte( + "{\"rb-name\":\"test-rbdef-hook\"," + + "\"rb-version\":\"v1\"," + + "\"chart-name\":\"test\"," + + "\"description\":\"testresourcebundle\"}"), + // base64 encoding of test helm package with hooks inside + "defcontent": []byte("H4sICE+Q8WAAA3Rlc3QudGFyAO1aW2+jOBTOM7/CYl6HlEsIq7xV24" + + "fVqluNdlYjrVajkQMnhS1gFjvZjbr972MDJYTQwGhMMmn9qVUaYx/o8TnfuRgGlF1NxoX" + + "J4Xmu+LQ812x+PmNiOXzEMe3ZfD4xLdO23QlyR36uAmvKcI7QhIXs6Ly+6xcKJvZ/g+M1" + + "0OkWJ/EY9xAbPJ/PXtx/m9tGtf+WOePjlu143gSZYzxMG298/9+hG1jhdcxQaQRoRXKU5" + + "WBEKVdMHEM+1d6hP8KIIv6D0Z/Xv90afE6CGYMAraIYxIQb8GOcAxeSR3gZczmMoCWgDF" + + "PKp0Up/8pCQAySLMbc6KYaDpIoXWgIhYQ8fAkgBgZfMhJH/naBdDFo0LXvAwQQvOey+E3" + + "BKIb9HDCLSKqfW3mvAIX//xzinI3m/r3+b7nzZ/83Z57gf9tyHeX/pwDOok+QU+5NC7Sx" + + "NJxl9VfdmppTU9cCoH4eZawYvEa/QJwgX1hMwRXCgKL0HiWcQyI/JutAS3ECi+KCtnkWV" + + "sjSzv3fKrRR+H/NyuNkgoPyv5npzRzxOxP+b9uOyv9Ogdb+BxgSklKQGg36+N+zZ7v9tw" + + "X/u3xM8f8p0OR/Tv70igeBhygNFuimMIWPwLQEGA4wwyJZK7n98RFNf+cZG6YwveMj6On" + + "JqE2nmkUz7POp+uPj3tRi+OlJ57NivISYCqlI3LtPLM3AF5Mpn+EzkpcLeSLqh7cNSYNk" + + "oToTraQ0/kWBeE/gQJH80apHFPBJynCUcuU+jxiV9uortfgowfdCV8s13S7Jf3p9gbKAJ" + + "8mI5WuoxxjbtkZ8kiRY7NlfOg31z9+y/y3/zwhlRpmLG3+TpRwW6PF/25l7Vf5nWZaIE9" + + "ac/6H8/xRo+v9SuNKOAH4ly4Gu37IaSy4DdEjHaUpYUQNWi/WQZ6VTGl6JAlFfoMaaw+v" + + "GvxDdh4xP042f9I7r1c3KYlQvn+pT2SMpqtbpYcmK/kf/rAkTD1wT1RL7D2S1uo2SiC2Q" + + "I490OjSyzz2Up+fwISc+UHq324kGaeQg7J59qOrtO9jUdHRIXDvqojFAZhwS2BEK26cns" + + "V5/z2sLU/+sGYahjWGA9qgGaMs0QPMV2J89tv31Wd+LttdlebawvHPT7g+DdvzPQXr474" + + "//7i7+25Yt8n/PVPH/JJBDv3tWIzv8HwjvJ996yWsM/gf6eOOxf08fskP/gXBZxneZgf9" + + "AHSruXzZa8Z9Cvol8kHsW1Nf/K+r/sv83dx3R/5u5rjr/PQla5z8l+X4srWAgAVc2I7nt" + + "B1lMtgmku75fRnJWLTOKLwtkces56AgOkXlutf8waPf/axVJpIDe/r9jtc5/XNszlf+fA" + + "kf6/ztvGXgAsFswNhV8xxFA8yFlnQE0ZV7YIUBH/V+9+XOy/v/M9qxd/PfMsv/vKv8/BY" + + "7F/2vfJ+vB7k9xUaJwC6oMaKh/dy0cVGXtph+p8d0R6iyptWvD3UbonLSky9PrxfZOWhp" + + "RzZOGQkbonrSkSzPAi+2ftBRyYQ2UtuV9Z87YVMhY+eOL95Bmi9YQW9Q7X2GWkNLuP6V8" + + "Sx2Q1B5La48yXFdq25XcHqS3qoKXg673f2QXAL3nf17j/M8U539zx1T5/0kg7/WLEfPYD" + + "vHDXsB4xZlsh07eeCrb0sgYLwF9czI71AgvM5vtUMmFpbPnpl8FBQUFBQUFBQUFBQUFBQ" + + "UFBQUFhdHwFf2f+3IAUAAA"), + }, + connection.ConnectionKey{CloudRegion: "mock_connection"}.String(): { + "metadata": []byte( + "{\"cloud-region\":\"mock_connection\"," + + "\"cloud-owner\":\"mock_owner\"," + + "\"kubeconfig\": \"" + base64.StdEncoding.EncodeToString(fd) + "\"}"), + }, + }, + } + + ic := NewInstanceClient() + input := InstanceRequest{ + RBName: "test-rbdef-hook", + RBVersion: "v1", + ProfileName: "profile1", + CloudRegion: "mock_connection", + } + + ir, err := ic.Create(input) + if err != nil { + t.Fatalf("TestInstanceWithHookCreate returned an error (%s)", err) + } + + log.Println(ir) + + if len(ir.Resources) == 0 { + t.Fatalf("TestInstanceWithHookCreate returned empty data (%+v)", ir) + } + }) +} diff --git a/src/k8splugin/internal/app/query.go b/src/k8splugin/internal/app/query.go new file mode 100644 index 00000000..cb645afd --- /dev/null +++ b/src/k8splugin/internal/app/query.go @@ -0,0 +1,108 @@ +/* + * Copyright 2018 Intel Corporation, Inc + * Copyright © 2021 Samsung Electronics + * Copyright © 2021 Orange + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package app + +import ( + "github.com/onap/multicloud-k8s/src/k8splugin/internal/helm" + "k8s.io/apimachinery/pkg/runtime/schema" + + pkgerrors "github.com/pkg/errors" +) + +// QueryStatus is what is returned when status is queried for an instance +type QueryStatus struct { + ResourceCount int32 `json:"resourceCount"` + ResourcesStatus []ResourceStatus `json:"resourcesStatus"` +} + +// QueryManager is an interface exposes the instantiation functionality +type QueryManager interface { + Query(namespace, cloudRegion, apiVersion, kind, name, labels, id string) (QueryStatus, error) +} + +// QueryClient implements the InstanceManager interface +// It will also be used to maintain some localized state +type QueryClient struct { + storeName string + tagInst string +} + +// NewQueryClient returns an instance of the QueryClient +// which implements the InstanceManager +func NewQueryClient() *QueryClient { + return &QueryClient{ + storeName: "rbdef", + tagInst: "instance", + } +} + +// Query returns state of instance's filtered resources +func (v *QueryClient) Query(namespace, cloudRegion, apiVersion, kind, name, labels, id string) (QueryStatus, error) { + + //Read the status from the DD + + k8sClient := KubernetesClient{} + err := k8sClient.Init(cloudRegion, id) + if err != nil { + return QueryStatus{}, pkgerrors.Wrap(err, "Getting CloudRegion Information") + } + + var resourcesStatus []ResourceStatus + if labels != "" { + resList, err := k8sClient.queryResources(apiVersion, kind, labels, namespace) + if err != nil { + return QueryStatus{}, pkgerrors.Wrap(err, "Querying Resources") + } + // If user specifies both label and name, we want to pick up only single resource from these matching label + if name != "" { + //Assigning 0-length, because we may actually not find matching name + resourcesStatus = make([]ResourceStatus, 0) + for _, res := range resList { + if res.Name == name { + resourcesStatus = append(resourcesStatus, res) + break + } + } + } else { + resourcesStatus = resList + } + } else if name != "" { + resIdentifier := helm.KubernetesResource{ + Name: name, + GVK: schema.FromAPIVersionAndKind(apiVersion, kind), + } + res, err := k8sClient.GetResourceStatus(resIdentifier, namespace) + if err != nil { + return QueryStatus{}, pkgerrors.Wrap(err, "Querying Resource") + } + resourcesStatus = []ResourceStatus{res} + } else { + resList, err := k8sClient.queryResources(apiVersion, kind, labels, namespace) + if err != nil { + return QueryStatus{}, pkgerrors.Wrap(err, "Querying Resources") + } + resourcesStatus = resList + } + + resp := QueryStatus{ + ResourceCount: int32(len(resourcesStatus)), + ResourcesStatus: resourcesStatus, + } + return resp, nil +} |