diff options
Diffstat (limited to 'src/k8splugin/internal/app/instance.go')
-rw-r--r-- | src/k8splugin/internal/app/instance.go | 594 |
1 files changed, 537 insertions, 57 deletions
diff --git a/src/k8splugin/internal/app/instance.go b/src/k8splugin/internal/app/instance.go index c1ec35b6..1c9c81a9 100644 --- a/src/k8splugin/internal/app/instance.go +++ b/src/k8splugin/internal/app/instance.go @@ -1,6 +1,8 @@ /* * Copyright 2018 Intel Corporation, Inc * Copyright © 2021 Samsung Electronics + * Copyright © 2021 Orange + * Copyright © 2021 Nokia Bell Labs * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -18,16 +20,29 @@ package app import ( + "context" "encoding/json" + "log" + "strings" + "strconv" + "time" + + appsv1 "k8s.io/api/apps/v1" + batchv1 "k8s.io/api/batch/v1" + corev1 "k8s.io/api/core/v1" + apiextv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/cli-runtime/pkg/resource" + "github.com/onap/multicloud-k8s/src/k8splugin/internal/db" "github.com/onap/multicloud-k8s/src/k8splugin/internal/helm" "github.com/onap/multicloud-k8s/src/k8splugin/internal/namegenerator" "github.com/onap/multicloud-k8s/src/k8splugin/internal/rb" - "k8s.io/apimachinery/pkg/runtime/schema" - "log" - "strings" + "github.com/onap/multicloud-k8s/src/k8splugin/internal/statuscheck" pkgerrors "github.com/pkg/errors" + "helm.sh/helm/v3/pkg/release" ) // InstanceRequest contains the parameters needed for instantiation @@ -52,6 +67,22 @@ type InstanceResponse struct { Hooks []*helm.Hook `json:"-"` } +// InstanceDbData contains the data to put to Db +type InstanceDbData struct { + ID string `json:"id"` + Request InstanceRequest `json:"request"` + Namespace string `json:"namespace"` + Status string `json:"status"` + ReleaseName string `json:"release-name"` + Resources []helm.KubernetesResource `json:"resources"` + Hooks []*helm.Hook `json:"hooks"` + HookProgress string `json:"hook-progress"` + PreInstallTimeout int64 `json:"PreInstallTimeout"` + PostInstallTimeout int64 `json:"PostInstallTimeout"` + PreDeleteTimeout int64 `json:"PreDeleteTimeout"` + PostDeleteTimeout int64 `json:"PostDeleteTimeout"` +} + // InstanceMiniResponse contains the response from instantiation // It does NOT include the created resources. // Use the regular GET to get the created resources for a particular instance @@ -74,11 +105,13 @@ type InstanceStatus struct { type InstanceManager interface { Create(i InstanceRequest) (InstanceResponse, error) Get(id string) (InstanceResponse, error) + GetFull(id string) (InstanceDbData, error) Status(id string) (InstanceStatus, error) Query(id, apiVersion, kind, name, labels string) (InstanceStatus, error) List(rbname, rbversion, profilename string) ([]InstanceMiniResponse, error) Find(rbName string, ver string, profile string, labelKeys map[string]string) ([]InstanceMiniResponse, error) Delete(id string) error + RecoverCreateOrDelete(id string) error } // InstanceKey is used as the primary key in the db @@ -100,13 +133,16 @@ func (dk InstanceKey) String() string { // InstanceClient implements the InstanceManager interface // It will also be used to maintain some localized state type InstanceClient struct { - storeName string - tagInst string + storeName string + tagInst string } // NewInstanceClient returns an instance of the InstanceClient // which implements the InstanceManager func NewInstanceClient() *InstanceClient { + //TODO: Call RecoverCreateOrDelete to perform recovery when the plugin restart. + //Not implement here now because We have issue with current test set (date race) + return &InstanceClient{ storeName: "rbdef", tagInst: "instance", @@ -125,7 +161,6 @@ func resolveModelFromInstance(instanceID string) (rbName, rbVersion, profileName // Create an instance of rb on the cluster in the database func (v *InstanceClient) Create(i InstanceRequest) (InstanceResponse, error) { - // Name is required if i.RBName == "" || i.RBVersion == "" || i.ProfileName == "" || i.CloudRegion == "" { return InstanceResponse{}, @@ -141,10 +176,52 @@ func (v *InstanceClient) Create(i InstanceRequest) (InstanceResponse, error) { //Convert override values from map to array of strings of the following format //foo=bar overrideValues := []string{} + var preInstallTimeOut, postInstallTimeOut, preDeleteTimeout, postDeleteTimeout int64 if i.OverrideValues != nil { + preInstallTimeOutStr, ok := i.OverrideValues["k8s-rb-instance-pre-install-timeout"] + if !ok { + preInstallTimeOutStr = "60" + } + preInstallTimeOut,err = strconv.ParseInt(preInstallTimeOutStr, 10, 64) + if err != nil { + return InstanceResponse{}, pkgerrors.Wrap(err, "Error parsing k8s-rb-instance-pre-install-timeout") + } + + postInstallTimeOutStr, ok := i.OverrideValues["k8s-rb-instance-post-install-timeout"] + if !ok { + postInstallTimeOutStr = "600" + } + postInstallTimeOut,err = strconv.ParseInt(postInstallTimeOutStr, 10, 64) + if err != nil { + return InstanceResponse{}, pkgerrors.Wrap(err, "Error parsing k8s-rb-instance-post-install-timeout") + } + + preDeleteTimeOutStr, ok := i.OverrideValues["k8s-rb-instance-pre-delete-timeout"] + if !ok { + preDeleteTimeOutStr = "60" + } + preDeleteTimeout,err = strconv.ParseInt(preDeleteTimeOutStr, 10, 64) + if err != nil { + return InstanceResponse{}, pkgerrors.Wrap(err, "Error parsing k8s-rb-instance-pre-delete-timeout") + } + + postDeleteTimeOutStr, ok := i.OverrideValues["k8s-rb-instance-post-delete-timeout"] + if !ok { + postDeleteTimeOutStr = "600" + } + postDeleteTimeout,err = strconv.ParseInt(postDeleteTimeOutStr, 10, 64) + if err != nil { + return InstanceResponse{}, pkgerrors.Wrap(err, "Error parsing k8s-rb-instance-post-delete-timeout") + } + for k, v := range i.OverrideValues { overrideValues = append(overrideValues, k+"="+v) } + } else { + preInstallTimeOut = 60 + postInstallTimeOut = 600 + preDeleteTimeout = 60 + postDeleteTimeout = 600 } //Execute the kubernetes create command @@ -162,11 +239,93 @@ func (v *InstanceClient) Create(i InstanceRequest) (InstanceResponse, error) { return InstanceResponse{}, pkgerrors.Wrap(err, "Getting CloudRegion Information") } - createdResources, err := k8sClient.createResources(sortedTemplates, profile.Namespace) + log.Printf("Main rss info") + for _,t := range sortedTemplates { + log.Printf(" Path: %s", t.FilePath) + log.Printf(" Kind: %s", t.GVK.Kind) + } + + log.Printf("Hook info") + for _,h := range hookList { + log.Printf(" Name: %s", h.Hook.Name) + log.Printf(" Events: %s", h.Hook.Events) + log.Printf(" Weight: %d", h.Hook.Weight) + log.Printf(" DeletePolicies: %s", h.Hook.DeletePolicies) + } + dbData := InstanceDbData{ + ID: id, + Request: i, + Namespace: profile.Namespace, + ReleaseName: releaseName, + Status: "PRE-INSTALL", + Resources: []helm.KubernetesResource{}, + Hooks: hookList, + HookProgress: "", + PreInstallTimeout: preInstallTimeOut, + PostInstallTimeout: postInstallTimeOut, + PreDeleteTimeout: preDeleteTimeout, + PostDeleteTimeout: postDeleteTimeout, + } + + key := InstanceKey{ + ID: id, + } + err = db.DBconn.Create(v.storeName, key, v.tagInst, dbData) + if err != nil { + return InstanceResponse{}, pkgerrors.Wrap(err, "Creating Instance DB Entry") + } + + err = k8sClient.ensureNamespace(profile.Namespace) + if err != nil { + return InstanceResponse{}, pkgerrors.Wrap(err, "Creating Namespace") + } + + hookClient := NewHookClient(profile.Namespace, id, v.storeName, v.tagInst) + if len(hookClient.getHookByEvent(hookList, release.HookPreInstall)) != 0 { + err = hookClient.ExecHook(k8sClient, hookList, release.HookPreInstall, preInstallTimeOut, 0, &dbData) + if err != nil { + log.Printf("Error running preinstall hooks for release %s, Error: %s. Stop here", releaseName, err) + err2 := db.DBconn.Delete(v.storeName, key, v.tagInst) + if err2 != nil { + log.Printf("Error cleaning failed instance in DB, please check DB.") + } + return InstanceResponse{}, pkgerrors.Wrap(err, "Error running preinstall hooks") + } + } + + dbData.Status = "CREATING" + err = db.DBconn.Update(v.storeName, key, v.tagInst, dbData) + if err != nil { + err = db.DBconn.Delete(v.storeName, key, v.tagInst) + if err != nil { + log.Printf("Delete Instance DB Entry for release %s has error.", releaseName) + } + return InstanceResponse{}, pkgerrors.Wrap(err, "Update Instance DB Entry") + } + + //Main rss creation is supposed to be very quick -> no need to support recover for main rss + createdResources, err := k8sClient.createResources(sortedTemplates, profile.Namespace); if err != nil { + if len(createdResources) > 0 { + log.Printf("[Instance] Reverting created resources on Error: %s", err.Error()) + k8sClient.deleteResources(createdResources, profile.Namespace) + } + log.Printf(" Instance: %s, Main rss are failed, skip post-install and remove instance in DB", id) + //main rss creation failed -> remove instance in DB + err = db.DBconn.Delete(v.storeName, key, v.tagInst) + if err != nil { + log.Printf("Delete Instance DB Entry for release %s has error.", releaseName) + } return InstanceResponse{}, pkgerrors.Wrap(err, "Create Kubernetes Resources") } + dbData.Status = "CREATED" + dbData.Resources = createdResources + err = db.DBconn.Update(v.storeName, key, v.tagInst, dbData) + if err != nil { + return InstanceResponse{}, pkgerrors.Wrap(err, "Update Instance DB Entry") + } + //Compose the return response resp := InstanceResponse{ ID: id, @@ -177,15 +336,71 @@ func (v *InstanceClient) Create(i InstanceRequest) (InstanceResponse, error) { Hooks: hookList, } + if len(hookClient.getHookByEvent(hookList, release.HookPostInstall)) != 0 { + go func() { + dbData.Status = "POST-INSTALL" + dbData.HookProgress = "" + err = hookClient.ExecHook(k8sClient, hookList, release.HookPostInstall, postInstallTimeOut, 0, &dbData) + if err != nil { + dbData.Status = "POST-INSTALL-FAILED" + log.Printf(" Instance: %s, Error running postinstall hooks error: %s", id, err) + } else { + dbData.Status = "DONE" + } + err = db.DBconn.Update(v.storeName, key, v.tagInst, dbData) + if err != nil { + log.Printf("Update Instance DB Entry for release %s has error.", releaseName) + } + }() + } else { + dbData.Status = "DONE" + err = db.DBconn.Update(v.storeName, key, v.tagInst, dbData) + if err != nil { + log.Printf("Update Instance DB Entry for release %s has error.", releaseName) + } + } + + return resp, nil +} + +// Get returns the full instance for corresponding ID +func (v *InstanceClient) GetFull(id string) (InstanceDbData, error) { key := InstanceKey{ ID: id, } - err = db.DBconn.Create(v.storeName, key, v.tagInst, resp) + value, err := db.DBconn.Read(v.storeName, key, v.tagInst) if err != nil { - return InstanceResponse{}, pkgerrors.Wrap(err, "Creating Instance DB Entry") + return InstanceDbData{}, pkgerrors.Wrap(err, "Get Instance") } - return resp, nil + //value is a byte array + if value != nil { + resp := InstanceDbData{} + err = db.DBconn.Unmarshal(value, &resp) + if err != nil { + return InstanceDbData{}, pkgerrors.Wrap(err, "Unmarshaling Instance Value") + } + //In case that we are communicating with an old db, some field will missing -> fill it with default value + if resp.Status == "" { + //For instance that is in Db -> consider it's DONE + resp.Status = "DONE" + } + if resp.PreInstallTimeout == 0 { + resp.PreInstallTimeout = 60 + } + if resp.PostInstallTimeout == 0 { + resp.PostInstallTimeout = 600 + } + if resp.PreDeleteTimeout == 0 { + resp.PreInstallTimeout = 60 + } + if resp.PostDeleteTimeout == 0 { + resp.PostDeleteTimeout = 600 + } + return resp, nil + } + + return InstanceDbData{}, pkgerrors.New("Error getting Instance") } // Get returns the instance for corresponding ID @@ -214,6 +429,7 @@ func (v *InstanceClient) Get(id string) (InstanceResponse, error) { // Query returns state of instance's filtered resources func (v *InstanceClient) Query(id, apiVersion, kind, name, labels string) (InstanceStatus, error) { + queryClient := NewQueryClient() //Read the status from the DB key := InstanceKey{ ID: id, @@ -231,54 +447,21 @@ func (v *InstanceClient) Query(id, apiVersion, kind, name, labels string) (Insta return InstanceStatus{}, pkgerrors.Wrap(err, "Unmarshaling Instance Value") } - k8sClient := KubernetesClient{} - err = k8sClient.Init(resResp.Request.CloudRegion, id) + resources, err := queryClient.Query(resResp.Namespace, resResp.Request.CloudRegion, apiVersion, kind, name, labels, id) if err != nil { - return InstanceStatus{}, pkgerrors.Wrap(err, "Getting CloudRegion Information") - } - - var resourcesStatus []ResourceStatus - if labels != "" { - resList, err := k8sClient.queryResources(apiVersion, kind, labels, resResp.Namespace) - if err != nil { - return InstanceStatus{}, pkgerrors.Wrap(err, "Querying Resources") - } - // If user specifies both label and name, we want to pick up only single resource from these matching label - if name != "" { - //Assigning 0-length, because we may actually not find matching name - resourcesStatus = make([]ResourceStatus, 0) - for _, res := range resList { - if res.Name == name { - resourcesStatus = append(resourcesStatus, res) - break - } - } - } else { - resourcesStatus = resList - } - } else if name != "" { - resIdentifier := helm.KubernetesResource{ - Name: name, - GVK: schema.FromAPIVersionAndKind(apiVersion, kind), - } - res, err := k8sClient.GetResourceStatus(resIdentifier, resResp.Namespace) - if err != nil { - return InstanceStatus{}, pkgerrors.Wrap(err, "Querying Resource") - } - resourcesStatus = []ResourceStatus{res} + return InstanceStatus{}, pkgerrors.Wrap(err, "Querying Resources") } resp := InstanceStatus{ Request: resResp.Request, - ResourceCount: int32(len(resourcesStatus)), - ResourcesStatus: resourcesStatus, + ResourceCount: resources.ResourceCount, + ResourcesStatus: resources.ResourcesStatus, } return resp, nil } // Status returns the status for the instance func (v *InstanceClient) Status(id string) (InstanceStatus, error) { - //Read the status from the DB key := InstanceKey{ ID: id, @@ -294,7 +477,7 @@ func (v *InstanceClient) Status(id string) (InstanceStatus, error) { return InstanceStatus{}, pkgerrors.New("Status is not available") } - resResp := InstanceResponse{} + resResp := InstanceDbData{} err = db.DBconn.Unmarshal(value, &resResp) if err != nil { return InstanceStatus{}, pkgerrors.Wrap(err, "Unmarshaling Instance Value") @@ -312,25 +495,36 @@ func (v *InstanceClient) Status(id string) (InstanceStatus, error) { cumulatedErrorMsg = append(cumulatedErrorMsg, err.Error()) } + isReady := true generalStatus := make([]ResourceStatus, 0, len(resResp.Resources)) Main: - for _, resource := range resResp.Resources { + for _, oneResource := range resResp.Resources { for _, pod := range podsStatus { - if resource.GVK == pod.GVK && resource.Name == pod.Name { + if oneResource.GVK == pod.GVK && oneResource.Name == pod.Name { continue Main //Don't double check pods if someone decided to define pod explicitly in helm chart } } - status, err := k8sClient.GetResourceStatus(resource, resResp.Namespace) + status, err := k8sClient.GetResourceStatus(oneResource, resResp.Namespace) if err != nil { cumulatedErrorMsg = append(cumulatedErrorMsg, err.Error()) + isReady = false } else { generalStatus = append(generalStatus, status) + ready, err := v.checkRssStatus(oneResource, k8sClient, resResp.Namespace, status) + + if !ready || err != nil { + isReady = false + if err != nil { + cumulatedErrorMsg = append(cumulatedErrorMsg, err.Error()) + } + } } } + //We still need to iterate through rss list even the status is not DONE, to gather status of rss + pod for the response resp := InstanceStatus{ Request: resResp.Request, ResourceCount: int32(len(generalStatus) + len(podsStatus)), - Ready: false, //FIXME To determine readiness, some parsing of status fields is necessary + Ready: isReady && resResp.Status == "DONE", ResourcesStatus: append(generalStatus, podsStatus...), } @@ -344,6 +538,68 @@ Main: return resp, nil } +func (v *InstanceClient) checkRssStatus(rss helm.KubernetesResource, k8sClient KubernetesClient, namespace string, status ResourceStatus) (bool, error) { + readyChecker := statuscheck.NewReadyChecker(k8sClient.clientSet, statuscheck.PausedAsReady(true), statuscheck.CheckJobs(true)) + ctx, cancel := context.WithTimeout(context.Background(), time.Duration(60)*time.Second) + defer cancel() + + apiVersion, kind := rss.GVK.ToAPIVersionAndKind() + log.Printf("apiVersion: %s, Kind: %s", apiVersion, kind) + restClient, err := k8sClient.getRestApi(apiVersion) + if err != nil { + return false, err + } + mapper := k8sClient.GetMapper() + mapping, err := mapper.RESTMapping(schema.GroupKind{ + Group: rss.GVK.Group, + Kind: rss.GVK.Kind, + }, rss.GVK.Version) + resourceInfo := resource.Info{ + Client: restClient, + Mapping: mapping, + Namespace: namespace, + Name: rss.Name, + Source: "", + Object: nil, + ResourceVersion: "", + } + + var parsedRes runtime.Object + //TODO: Should we care about different api version for a same kind? + switch kind { + case "Pod": + parsedRes = new(corev1.Pod) + case "Job": + parsedRes = new(batchv1.Job) + case "Deployment": + parsedRes = new(appsv1.Deployment) + case "PersistentVolumeClaim": + parsedRes = new(corev1.PersistentVolume) + case "Service": + parsedRes = new(corev1.Service) + case "DaemonSet": + parsedRes = new(appsv1.DaemonSet) + case "CustomResourceDefinition": + parsedRes = new(apiextv1.CustomResourceDefinition) + case "StatefulSet": + parsedRes = new(appsv1.StatefulSet) + case "ReplicationController": + parsedRes = new(corev1.ReplicationController) + case "ReplicaSet": + parsedRes = new(appsv1.ReplicaSet) + default: + //For not listed resource, consider ready + return true, nil + } + err = runtime.DefaultUnstructuredConverter.FromUnstructured(status.Status.Object, parsedRes) + if err != nil { + return false, err + } + resourceInfo.Object = parsedRes + ready, err := readyChecker.IsReady(ctx, &resourceInfo) + return ready, err +} + // List returns the instance for corresponding ID // Empty string returns all func (v *InstanceClient) List(rbname, rbversion, profilename string) ([]InstanceMiniResponse, error) { @@ -358,7 +614,7 @@ func (v *InstanceClient) List(rbname, rbversion, profilename string) ([]Instance for key, value := range dbres { //value is a byte array if value != nil { - resp := InstanceResponse{} + resp := InstanceDbData{} err = db.DBconn.Unmarshal(value, &resp) if err != nil { log.Printf("[Instance] Error: %s Unmarshaling Instance: %s", err.Error(), key) @@ -385,6 +641,11 @@ func (v *InstanceClient) List(rbname, rbversion, profilename string) ([]Instance continue } + if resp.Status == "PRE-INSTALL" { + //DO not add instance which is in pre-install phase + continue + } + results = append(results, miniresp) } } @@ -423,7 +684,6 @@ func (v *InstanceClient) Find(rbName string, version string, profile string, lab if add { ret = append(ret, resp) } - } return ret, nil @@ -431,29 +691,249 @@ func (v *InstanceClient) Find(rbName string, version string, profile string, lab // Delete the Instance from database func (v *InstanceClient) Delete(id string) error { - inst, err := v.Get(id) + inst, err := v.GetFull(id) if err != nil { return pkgerrors.Wrap(err, "Error getting Instance") } + key := InstanceKey{ + ID: id, + } + if inst.Status == "DELETED" { + //The instance is deleted when the plugin comes back -> just remove from Db + err = db.DBconn.Delete(v.storeName, key, v.tagInst) + if err != nil { + log.Printf("Delete Instance DB Entry for release %s has error.", inst.ReleaseName) + } + return nil + } else if inst.Status != "DONE"{ + //Recover is ongoing, do nothing here + return nil + } k8sClient := KubernetesClient{} err = k8sClient.Init(inst.Request.CloudRegion, inst.ID) if err != nil { return pkgerrors.Wrap(err, "Getting CloudRegion Information") } + inst.Status = "PRE-DELETE" + inst.HookProgress = "" + err = db.DBconn.Update(v.storeName, key, v.tagInst, inst) + if err != nil { + log.Printf("Update Instance DB Entry for release %s has error.", inst.ReleaseName) + } + + hookClient := NewHookClient(inst.Namespace, id, v.storeName, v.tagInst) + if len(hookClient.getHookByEvent(inst.Hooks, release.HookPreDelete)) != 0 { + err = hookClient.ExecHook(k8sClient, inst.Hooks, release.HookPreDelete, inst.PreDeleteTimeout, 0, &inst) + if err != nil { + log.Printf(" Instance: %s, Error running pre-delete hooks error: %s", id, err) + inst.Status = "PRE-DELETE-FAILED" + err2 := db.DBconn.Update(v.storeName, key, v.tagInst, inst) + if err2 != nil { + log.Printf("Update Instance DB Entry for release %s has error.", inst.ReleaseName) + } + return pkgerrors.Wrap(err, "Error running pre-delete hooks") + } + } + + inst.Status = "DELETING" + err = db.DBconn.Update(v.storeName, key, v.tagInst, inst) + if err != nil { + log.Printf("Update Instance DB Entry for release %s has error.", inst.ReleaseName) + } err = k8sClient.deleteResources(inst.Resources, inst.Namespace) if err != nil { return pkgerrors.Wrap(err, "Deleting Instance Resources") } + if len(hookClient.getHookByEvent(inst.Hooks, release.HookPostDelete)) != 0 { + go func() { + inst.HookProgress = "" + if err := v.runPostDelete(k8sClient, hookClient, &inst, 0, true); err != nil { + log.Printf(err.Error()) + } + }() + } else { + err = db.DBconn.Delete(v.storeName, key, v.tagInst) + if err != nil { + return pkgerrors.Wrap(err, "Delete Instance") + } + } + return nil +} + +//Continue the instantiation +func (v *InstanceClient) RecoverCreateOrDelete(id string) error { + instance, err := v.GetFull(id) + if err != nil { + return pkgerrors.Wrap(err, "Error getting instance " + id + ", skip this instance. Error detail") + } + log.Printf("Instance " + id + ", status: " + instance.Status + ", HookProgress: " + instance.HookProgress) + //have to resolve again template for this instance because all templates are in /tmp -> will be deleted when container restarts + overrideValues := []string{} + if instance.Request.OverrideValues != nil { + for k, v := range instance.Request.OverrideValues { + overrideValues = append(overrideValues, k + "=" + v) + } + } key := InstanceKey{ ID: id, } - err = db.DBconn.Delete(v.storeName, key, v.tagInst) + log.Printf(" Resolving template for release %s", instance.Request.ReleaseName) + _, hookList, _, err := rb.NewProfileClient().Resolve(instance.Request.RBName, instance.Request.RBVersion, instance.Request.ProfileName, overrideValues, instance.Request.ReleaseName) + instance.Hooks = hookList + err = db.DBconn.Update(v.storeName, key, v.tagInst, instance) + if err != nil { + return pkgerrors.Wrap(err, "Update Instance DB Entry") + } + + if strings.Contains(instance.Status, "FAILED"){ + log.Printf(" This instance has failed during instantiation, not going to recover") + return nil + } else if !strings.Contains(instance.Status, "-INSTALL") && !strings.Contains(instance.Status, "-DELETE") { + log.Printf(" This instance is not in hook state, not going to recover") + return nil + } + + splitHookProgress := strings.Split(instance.HookProgress,"/") + completedHooks,err := strconv.Atoi(splitHookProgress[0]) + if err != nil { + return pkgerrors.Wrap(err, "Error getting completed PRE-INSTALL hooks for instance " + instance.ID + ", skip. Error detail") + } + + //we can add an option to delete instances that will not be recovered from database to clean the db + if (instance.Status != "POST-INSTALL") && (instance.Status != "PRE-DELETE") && (instance.Status != "POST-DELETE") { + if instance.Status == "PRE-INSTALL" { + //Plugin quits during pre-install hooks -> Will do nothing because from SO point of view, there's no instance ID and will be reported as fail and be rolled back + log.Printf(" The plugin quits during pre-install hook of this instance, not going to recover") + } + return nil + } + k8sClient := KubernetesClient{} + err = k8sClient.Init(instance.Request.CloudRegion, id) + if err != nil { + log.Printf(" Error getting CloudRegion %s", instance.Request.CloudRegion) + return nil + } + hookClient := NewHookClient(instance.Namespace, id, v.storeName, v.tagInst) + switch instance.Status { + case "POST-INSTALL": + //Plugin quits during post-install hooks -> continue + go func() { + log.Printf(" The plugin quits during post-install hook of this instance, continue post-install hook") + err = hookClient.ExecHook(k8sClient, instance.Hooks, release.HookPostInstall, instance.PostInstallTimeout, completedHooks, &instance) + log.Printf("dbData.HookProgress %s", instance.HookProgress) + if err != nil { + instance.Status = "POST-INSTALL-FAILED" + log.Printf(" Instance: %s, Error running postinstall hooks error: %s", id, err) + } else { + instance.Status = "DONE" + } + err = db.DBconn.Update(v.storeName, key, v.tagInst, instance) + if err != nil { + log.Printf("Update Instance DB Entry for release %s has error.", instance.ReleaseName) + } + }() + case "PRE-DELETE": + //Plugin quits during pre-delete hooks -> This already effects the instance -> should continue the deletion + go func() { + log.Printf(" The plugin quits during pre-delete hook of this instance, continue pre-delete hook") + err = hookClient.ExecHook(k8sClient, instance.Hooks, release.HookPreDelete, instance.PreDeleteTimeout, completedHooks, &instance) + if err != nil { + log.Printf(" Instance: %s, Error running pre-delete hooks error: %s", id, err) + instance.Status = "PRE-DELETE-FAILED" + err = db.DBconn.Update(v.storeName, key, v.tagInst, instance) + if err != nil { + log.Printf("Update Instance DB Entry for release %s has error.", instance.ReleaseName) + } + return + } + + err = k8sClient.deleteResources(instance.Resources, instance.Namespace) + if err != nil { + log.Printf(" Error running deleting instance resources, error: %s", err) + return + } + //will not delete the instance in Db to avoid error when SO call delete again and there is not instance in DB + //the instance in DB will be deleted when SO call delete again. + instance.HookProgress = "" + if err := v.runPostDelete(k8sClient, hookClient, &instance, 0, false); err != nil { + log.Printf(err.Error()) + } + }() + case "POST-DELETE": + //Plugin quits during post-delete hooks -> continue + go func() { + log.Printf(" The plugin quits during post-delete hook of this instance, continue post-delete hook") + if err := v.runPostDelete(k8sClient, hookClient, &instance, completedHooks, true); err != nil { + log.Printf(err.Error()) + } + }() + default: + log.Printf(" This instance is not in hook state, not going to recover") + } + + return nil +} + +func (v *InstanceClient) runPostDelete(k8sClient KubernetesClient, hookClient *HookClient, instance *InstanceDbData, startIndex int, clearDb bool) error { + key := InstanceKey{ + ID: instance.ID, + } + instance.Status = "POST-DELETE" + err := db.DBconn.Update(v.storeName, key, v.tagInst, instance) if err != nil { - return pkgerrors.Wrap(err, "Delete Instance") + log.Printf("Update Instance DB Entry for release %s has error.", instance.ReleaseName) + } + err = hookClient.ExecHook(k8sClient, instance.Hooks, release.HookPostDelete, instance.PostDeleteTimeout, startIndex, instance) + if err != nil { + //If this case happen, user should clean the cluster + log.Printf(" Instance: %s, Error running post-delete hooks error: %s", instance.ID, err) + instance.Status = "POST-DELETE-FAILED" + err2 := db.DBconn.Update(v.storeName, key, v.tagInst, instance) + if err2 != nil { + log.Printf("Update Instance DB Entry for release %s has error.", instance.ReleaseName) + return pkgerrors.Wrap(err2, "Delete Instance DB Entry") + } + return pkgerrors.Wrap(err, "Error running post-delete hooks") } + if clearDb { + err = db.DBconn.Delete(v.storeName, key, v.tagInst) + if err != nil { + log.Printf("Delete Instance DB Entry for release %s has error.", instance.ReleaseName) + return pkgerrors.Wrap(err, "Delete Instance DB Entry") + } + } else { + instance.Status = "DELETED" + err := db.DBconn.Update(v.storeName, key, v.tagInst, instance) + if err != nil { + log.Printf("Update Instance DB Entry for release %s has error.", instance.ReleaseName) + return pkgerrors.Wrap(err, "Update Instance DB Entry") + } + } + + go func() { + //Clear all hook rss that does not have delete-on-success deletion policy + log.Printf("Clean leftover hook resource") + var remainHookRss []helm.KubernetesResource + for _, h := range instance.Hooks { + res := helm.KubernetesResource{ + GVK: h.KRT.GVK, + Name: h.Hook.Name, + } + if _, err := k8sClient.GetResourceStatus(res, hookClient.kubeNameSpace); err == nil { + remainHookRss = append(remainHookRss, res) + log.Printf(" Rss %s will be deleted.", res.Name) + } + } + if len(remainHookRss) > 0 { + err = k8sClient.deleteResources(remainHookRss, hookClient.kubeNameSpace) + if err != nil { + log.Printf("Error cleaning Hook Rss, please do it manually if needed. Error: %s", err.Error()) + } + } + }() return nil } |