diff options
Diffstat (limited to 'src/k8splugin/internal/app')
-rw-r--r-- | src/k8splugin/internal/app/client.go | 24 | ||||
-rw-r--r-- | src/k8splugin/internal/app/config.go | 210 | ||||
-rw-r--r-- | src/k8splugin/internal/app/config_backend.go | 207 | ||||
-rw-r--r-- | src/k8splugin/internal/app/config_test.go | 5 | ||||
-rw-r--r-- | src/k8splugin/internal/app/instance.go | 11 |
5 files changed, 342 insertions, 115 deletions
diff --git a/src/k8splugin/internal/app/client.go b/src/k8splugin/internal/app/client.go index 9813333e..06c4c464 100644 --- a/src/k8splugin/internal/app/client.go +++ b/src/k8splugin/internal/app/client.go @@ -20,25 +20,30 @@ package app import ( "context" "io/ioutil" + appsv1 "k8s.io/api/apps/v1" + //appsv1beta1 "k8s.io/api/apps/v1beta1" //appsv1beta2 "k8s.io/api/apps/v1beta2" batchv1 "k8s.io/api/batch/v1" corev1 "k8s.io/api/core/v1" + //extensionsv1beta1 "k8s.io/api/extensions/v1beta1" //apiextv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1" //apiextv1beta1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1beta1" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "os" "strings" "time" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + + logger "log" + "github.com/onap/multicloud-k8s/src/k8splugin/internal/config" "github.com/onap/multicloud-k8s/src/k8splugin/internal/connection" "github.com/onap/multicloud-k8s/src/k8splugin/internal/helm" log "github.com/onap/multicloud-k8s/src/k8splugin/internal/logutils" "github.com/onap/multicloud-k8s/src/k8splugin/internal/plugin" - logger "log" pkgerrors "github.com/pkg/errors" "k8s.io/apimachinery/pkg/api/meta" @@ -545,9 +550,18 @@ func (k *KubernetesClient) DeleteKind(resource helm.KubernetesResource, namespac return pkgerrors.Wrap(err, "Error loading plugin") } - err = pluginImpl.Delete(resource, namespace, k) - if err != nil { - return pkgerrors.Wrap(err, "Error deleting "+resource.Name) + name, err := pluginImpl.Get(resource, namespace, k) + + if (err == nil && name == resource.Name) || (err != nil && strings.Contains(err.Error(), "not found") == false) { + err = pluginImpl.Delete(resource, namespace, k) + if err != nil { + return pkgerrors.Wrap(err, "Error deleting "+resource.Name) + } + } else { + log.Warn("Resource does not exist, Skipping delete", log.Fields{ + "gvk": resource.GVK, + "resource": resource.Name, + }) } return nil diff --git a/src/k8splugin/internal/app/config.go b/src/k8splugin/internal/app/config.go index 94acadcc..8952c16d 100644 --- a/src/k8splugin/internal/app/config.go +++ b/src/k8splugin/internal/app/config.go @@ -18,10 +18,11 @@ package app import ( + "log" "strconv" "strings" - "github.com/onap/multicloud-k8s/src/k8splugin/internal/db" + "github.com/onap/multicloud-k8s/src/k8splugin/internal/helm" pkgerrors "github.com/pkg/errors" ) @@ -66,8 +67,10 @@ type ConfigManager interface { Help() map[string]string Update(instanceID, configName string, p Config) (ConfigResult, error) Delete(instanceID, configName string) (ConfigResult, error) - Rollback(instanceID string, p ConfigRollback) error - Tagit(instanceID string, p ConfigTagit) error + DeleteAll(instanceID, configName string) error + Rollback(instanceID string, configName string, p ConfigRollback) error + Cleanup(instanceID string) error + Tagit(instanceID string, configName string, p ConfigTagit) error } // ConfigClient implements the ConfigManager @@ -94,7 +97,7 @@ func (v *ConfigClient) Help() map[string]string { // Create an entry for the config in the database func (v *ConfigClient) Create(instanceID string, p Config) (ConfigResult, error) { - + log.Printf("[Config Create] Instance %s", instanceID) // Check required fields if p.ConfigName == "" || p.TemplateName == "" || len(p.Values) == 0 { return ConfigResult{}, pkgerrors.New("Incomplete Configuration Provided") @@ -120,10 +123,12 @@ func (v *ConfigClient) Create(instanceID string, p Config) (ConfigResult, error) // Acquire per profile Mutex lock.Lock() defer lock.Unlock() - err = applyConfig(instanceID, p, profileChannel, "POST") + var appliedResources ([]helm.KubernetesResource) + appliedResources, err = applyConfig(instanceID, p, profileChannel, "POST", nil) if err != nil { return ConfigResult{}, pkgerrors.Wrap(err, "Apply Config failed") } + log.Printf("POST result: %s", appliedResources) // Create Config DB Entry err = cs.createConfig(p) if err != nil { @@ -132,8 +137,9 @@ func (v *ConfigClient) Create(instanceID string, p Config) (ConfigResult, error) // Create Version Entry in DB for Config cvs := ConfigVersionStore{ instanceID: instanceID, + configName: p.ConfigName, } - version, err := cvs.createConfigVersion(p, Config{}, "POST") + version, err := cvs.createConfigVersion(p, Config{}, "POST", appliedResources) if err != nil { return ConfigResult{}, pkgerrors.Wrap(err, "Create Config Version DB Entry") } @@ -153,7 +159,7 @@ func (v *ConfigClient) Create(instanceID string, p Config) (ConfigResult, error) // Update an entry for the config in the database func (v *ConfigClient) Update(instanceID, configName string, p Config) (ConfigResult, error) { - + log.Printf("[Config Update] Instance %s Config %s", instanceID, configName) // Check required fields if len(p.Values) == 0 { return ConfigResult{}, pkgerrors.New("Incomplete Configuration Provided") @@ -176,10 +182,12 @@ func (v *ConfigClient) Update(instanceID, configName string, p Config) (ConfigRe // Acquire per profile Mutex lock.Lock() defer lock.Unlock() - err = applyConfig(instanceID, p, profileChannel, "PUT") + var appliedResources ([]helm.KubernetesResource) + appliedResources, err = applyConfig(instanceID, p, profileChannel, "PUT", nil) if err != nil { return ConfigResult{}, pkgerrors.Wrap(err, "Apply Config failed") } + log.Printf("PUT result: %s", appliedResources) // Update Config DB Entry configPrev, err := cs.updateConfig(p) if err != nil { @@ -188,8 +196,9 @@ func (v *ConfigClient) Update(instanceID, configName string, p Config) (ConfigRe // Create Version Entry in DB for Config cvs := ConfigVersionStore{ instanceID: instanceID, + configName: configName, } - version, err := cvs.createConfigVersion(p, configPrev, "PUT") + version, err := cvs.createConfigVersion(p, configPrev, "PUT", appliedResources) if err != nil { return ConfigResult{}, pkgerrors.Wrap(err, "Create Config Version DB Entry") } @@ -245,8 +254,49 @@ func (v *ConfigClient) List(instanceID string) ([]Config, error) { } // Delete the Config from database -func (v *ConfigClient) Delete(instanceID, configName string) (ConfigResult, error) { +func (v *ConfigClient) DeleteAll(instanceID, configName string) error { + log.Printf("[Config Delete All] Instance %s Config %s", instanceID, configName) + // Check if Config exists + cs := ConfigStore{ + instanceID: instanceID, + configName: configName, + } + _, err := cs.getConfig() + if err != nil { + return pkgerrors.Wrap(err, "Update Error - Config doesn't exist") + } + // Get Version Entry in DB for Config + cvs := ConfigVersionStore{ + instanceID: instanceID, + configName: configName, + } + currentVersion, err := cvs.getCurrentVersion(configName) + if err != nil { + return pkgerrors.Wrap(err, "Current version get failed") + } + _, _, action, _, err := cvs.getConfigVersion(configName, currentVersion) + if err != nil { + return pkgerrors.Wrap(err, "Config version get failed") + } + + if action != "DELETE" { + _, err = v.Delete(instanceID, configName) + if err != nil { + return pkgerrors.Wrap(err, "Config DELETE version failed") + } + } + // Delete Config from DB + _, err = cs.deleteConfig() + if err != nil { + return pkgerrors.Wrap(err, "Delete Config DB Entry") + } + cvs.cleanupIstanceTags(configName) + return nil +} +// Apply update with delete operation +func (v *ConfigClient) Delete(instanceID, configName string) (ConfigResult, error) { + log.Printf("[Config Delete] Instance %s Config %s", instanceID, configName) // Resolving rbName, Version, etc. not to break response rbName, rbVersion, profileName, _, err := resolveModelFromInstance(instanceID) if err != nil { @@ -259,28 +309,39 @@ func (v *ConfigClient) Delete(instanceID, configName string) (ConfigResult, erro } p, err := cs.getConfig() if err != nil { - return ConfigResult{}, pkgerrors.Wrap(err, "Update Error - Config doesn't exist") + return ConfigResult{}, pkgerrors.Wrap(err, "Delete Error - Config doesn't exist") } lock, profileChannel := getProfileData(instanceID) // Acquire per profile Mutex lock.Lock() defer lock.Unlock() - err = applyConfig(instanceID, p, profileChannel, "DELETE") + // Create Version Entry in DB for Config + cvs := ConfigVersionStore{ + instanceID: instanceID, + configName: configName, + } + currentVersion, err := cvs.getCurrentVersion(configName) if err != nil { - return ConfigResult{}, pkgerrors.Wrap(err, "Apply Config failed") + return ConfigResult{}, pkgerrors.Wrap(err, "Current version get failed") } - // Delete Config from DB - configPrev, err := cs.deleteConfig() + _, _, _, resources, err := cvs.getConfigVersion(configName, currentVersion) if err != nil { - return ConfigResult{}, pkgerrors.Wrap(err, "Delete Config DB Entry") + return ConfigResult{}, pkgerrors.Wrap(err, "Config version get failed") } - // Create Version Entry in DB for Config - cvs := ConfigVersionStore{ - instanceID: instanceID, + + _, err = applyConfig(instanceID, p, profileChannel, "DELETE", resources) + if err != nil { + return ConfigResult{}, pkgerrors.Wrap(err, "Apply Config failed") + } + log.Printf("DELETE resources: [%s]", resources) + // Update Config from DB + configPrev, err := cs.updateConfig(p) + if err != nil { + return ConfigResult{}, pkgerrors.Wrap(err, "Update Config DB Entry") } - version, err := cvs.createConfigVersion(Config{}, configPrev, "DELETE") + version, err := cvs.createConfigVersion(p, configPrev, "DELETE", []helm.KubernetesResource{}) if err != nil { - return ConfigResult{}, pkgerrors.Wrap(err, "Delete Config Version DB Entry") + return ConfigResult{}, pkgerrors.Wrap(err, "Create Delete Config Version DB Entry") } // Create Result structure @@ -297,13 +358,13 @@ func (v *ConfigClient) Delete(instanceID, configName string) (ConfigResult, erro } // Rollback starts from current version and rollbacks to the version desired -func (v *ConfigClient) Rollback(instanceID string, rback ConfigRollback) error { - +func (v *ConfigClient) Rollback(instanceID string, configName string, rback ConfigRollback) error { + log.Printf("[Config Rollback] Instance %s Config %s", instanceID, configName) var reqVersion string var err error if rback.AnyOf.ConfigTag != "" { - reqVersion, err = v.GetTagVersion(instanceID, rback.AnyOf.ConfigTag) + reqVersion, err = v.GetTagVersion(instanceID, configName, rback.AnyOf.ConfigTag) if err != nil { return pkgerrors.Wrap(err, "Rollback Invalid tag") } @@ -326,8 +387,9 @@ func (v *ConfigClient) Rollback(instanceID string, rback ConfigRollback) error { cvs := ConfigVersionStore{ instanceID: instanceID, + configName: configName, } - currentVersion, err := cvs.getCurrentVersion() + currentVersion, err := cvs.getCurrentVersion(configName) if err != nil { return pkgerrors.Wrap(err, "Rollback Get Current Config Version ") } @@ -338,40 +400,35 @@ func (v *ConfigClient) Rollback(instanceID string, rback ConfigRollback) error { //Rollback all the intermettinent configurations for i := currentVersion; i > rollbackIndex; i-- { - configNew, configPrev, action, err := cvs.getConfigVersion(i) + configNew, configPrev, _, resources, err := cvs.getConfigVersion(configName, i) if err != nil { return pkgerrors.Wrap(err, "Rollback Get Config Version") } + _, _, prevAction, _, err := cvs.getConfigVersion(configName, i-1) + if err != nil { + return pkgerrors.Wrap(err, "Rollback Get Prev Config Version") + } cs := ConfigStore{ instanceID: instanceID, configName: configNew.ConfigName, } - if action == "PUT" { - // PUT is proceeded by PUT or POST - err = applyConfig(instanceID, configPrev, profileChannel, "PUT") + if prevAction != "DELETE" { + appliedResources, err := applyConfig(instanceID, configPrev, profileChannel, prevAction, nil) if err != nil { return pkgerrors.Wrap(err, "Apply Config failed") } + log.Printf("%s result: %s", prevAction, appliedResources) _, err = cs.updateConfig(configPrev) if err != nil { return pkgerrors.Wrap(err, "Update Config DB Entry") } - } else if action == "POST" { + } else { // POST is always preceeded by Config not existing - err = applyConfig(instanceID, configNew, profileChannel, "DELETE") - if err != nil { - return pkgerrors.Wrap(err, "Delete Config failed") - } - _, err = cs.deleteConfig() - if err != nil { - return pkgerrors.Wrap(err, "Delete Config DB Entry") - } - } else if action == "DELETE" { - // DELETE is proceeded by PUT or POST - err = applyConfig(instanceID, configPrev, profileChannel, "PUT") + _, err := applyConfig(instanceID, configNew, profileChannel, prevAction, resources) if err != nil { return pkgerrors.Wrap(err, "Delete Config failed") } + log.Printf("DELETE resources: %s", resources) _, err = cs.updateConfig(configPrev) if err != nil { return pkgerrors.Wrap(err, "Update Config DB Entry") @@ -380,7 +437,7 @@ func (v *ConfigClient) Rollback(instanceID string, rback ConfigRollback) error { } for i := currentVersion; i > rollbackIndex; i-- { // Delete rolled back items - err = cvs.deleteConfigVersion() + err = cvs.deleteConfigVersion(configName) if err != nil { return pkgerrors.Wrap(err, "Delete Config Version ") } @@ -389,12 +446,8 @@ func (v *ConfigClient) Rollback(instanceID string, rback ConfigRollback) error { } // Tagit tags the current version with the tag provided -func (v *ConfigClient) Tagit(instanceID string, tag ConfigTagit) error { - - rbName, rbVersion, profileName, _, err := resolveModelFromInstance(instanceID) - if err != nil { - return pkgerrors.Wrap(err, "Retrieving model info") - } +func (v *ConfigClient) Tagit(instanceID string, configName string, tag ConfigTagit) error { + log.Printf("[Config Tag It] Instance %s Config %s", instanceID, configName) lock, _ := getProfileData(instanceID) // Acquire per profile Mutex lock.Lock() @@ -402,39 +455,54 @@ func (v *ConfigClient) Tagit(instanceID string, tag ConfigTagit) error { cvs := ConfigVersionStore{ instanceID: instanceID, + configName: configName, } - currentVersion, err := cvs.getCurrentVersion() + err := cvs.tagCurrentVersion(configName, tag.TagName) if err != nil { - return pkgerrors.Wrap(err, "Get Current Config Version ") + return pkgerrors.Wrap(err, "Tag of current version failed") } - tagKey := constructKey(rbName, rbVersion, profileName, instanceID, v.tagTag, tag.TagName) + return nil +} - err = db.Etcd.Put(tagKey, strconv.Itoa(int(currentVersion))) +// GetTagVersion returns the version associated with the tag +func (v *ConfigClient) GetTagVersion(instanceID, configName string, tagName string) (string, error) { + log.Printf("[Config Get Tag Version] Instance %s Config %s", instanceID, configName) + cvs := ConfigVersionStore{ + instanceID: instanceID, + configName: configName, + } + value, err := cvs.getTagVersion(configName, tagName) if err != nil { - return pkgerrors.Wrap(err, "TagIt store DB") + return "", pkgerrors.Wrap(err, "Tag of current version failed") } - return nil + + return value, nil } -// GetTagVersion returns the version associated with the tag -func (v *ConfigClient) GetTagVersion(instanceID, tagName string) (string, error) { +// Cleanup version used only when instance is being deleted. We do not pass errors and we try to delete data +func (v *ConfigClient) Cleanup(instanceID string) error { + log.Printf("[Config Cleanup] Instance %s", instanceID) + configs, err := v.List(instanceID) - rbName, rbVersion, profileName, _, err := resolveModelFromInstance(instanceID) if err != nil { - return "", pkgerrors.Wrap(err, "Retrieving model info") + return pkgerrors.Wrap(err, "Retrieving active config list info") } - tagKey := constructKey(rbName, rbVersion, profileName, instanceID, v.tagTag, tagName) - value, err := db.Etcd.Get(tagKey) - if err != nil { - return "", pkgerrors.Wrap(err, "Config DB Entry Not found") + for _, config := range configs { + err = v.DeleteAll(instanceID, config.ConfigName) + if err != nil { + log.Printf("Config %s delete failed: %s", config.ConfigName, err.Error()) + } } - return string(value), nil + + removeProfileData(instanceID) + + return nil } // ApplyAllConfig starts from first configuration version and applies all versions in sequence -func (v *ConfigClient) ApplyAllConfig(instanceID string) error { - +func (v *ConfigClient) ApplyAllConfig(instanceID string, configName string) error { + log.Printf("[Config Apply All] Instance %s Config %s", instanceID, configName) lock, profileChannel := getProfileData(instanceID) // Acquire per profile Mutex lock.Lock() @@ -442,8 +510,9 @@ func (v *ConfigClient) ApplyAllConfig(instanceID string) error { cvs := ConfigVersionStore{ instanceID: instanceID, + configName: configName, } - currentVersion, err := cvs.getCurrentVersion() + currentVersion, err := cvs.getCurrentVersion(configName) if err != nil { return pkgerrors.Wrap(err, "Get Current Config Version ") } @@ -453,14 +522,19 @@ func (v *ConfigClient) ApplyAllConfig(instanceID string) error { //Apply all configurations var i uint for i = 1; i <= currentVersion; i++ { - configNew, _, action, err := cvs.getConfigVersion(i) + configNew, _, action, resources, err := cvs.getConfigVersion(configName, i) if err != nil { return pkgerrors.Wrap(err, "Get Config Version") } - err = applyConfig(instanceID, configNew, profileChannel, action) + if action != "DELETE" { + resources = nil + } + var appliedResources ([]helm.KubernetesResource) + appliedResources, err = applyConfig(instanceID, configNew, profileChannel, action, resources) if err != nil { return pkgerrors.Wrap(err, "Apply Config failed") } + log.Printf("%s result: %s", action, appliedResources) } return nil } diff --git a/src/k8splugin/internal/app/config_backend.go b/src/k8splugin/internal/app/config_backend.go index 30a480df..4fedb386 100644 --- a/src/k8splugin/internal/app/config_backend.go +++ b/src/k8splugin/internal/app/config_backend.go @@ -38,9 +38,10 @@ import ( //ConfigStore contains the values that will be stored in the database type configVersionDBContent struct { - ConfigNew Config `json:"config-new"` - ConfigPrev Config `json:"config-prev"` - Action string `json:"action"` // CRUD opration for this config + ConfigNew Config `json:"config-new"` + ConfigPrev Config `json:"config-prev"` + Action string `json:"action"` // CRUD opration for this config + Resources []helm.KubernetesResource `json:"resources"` } //ConfigStore to Store the Config @@ -52,11 +53,13 @@ type ConfigStore struct { //ConfigVersionStore to Store the Versions of the Config type ConfigVersionStore struct { instanceID string + configName string } type configResourceList struct { resourceTemplates []helm.KubernetesResourceTemplate - createdResources []helm.KubernetesResource + resources []helm.KubernetesResource + updatedResources chan []helm.KubernetesResource profile rb.Profile action string } @@ -71,6 +74,7 @@ const ( storeName = "config" tagCounter = "counter" tagVersion = "configversion" + tagName = "configtag" tagConfig = "configdata" ) @@ -222,10 +226,46 @@ func (c ConfigStore) deleteConfig() (Config, error) { return configPrev, nil } +//Cleanup stored data in etcd before instance is being deleted +func (c ConfigVersionStore) cleanupIstanceTags(configName string) error { + + rbName, rbVersion, profileName, _, err := resolveModelFromInstance(c.instanceID) + if err != nil { + return pkgerrors.Wrap(err, "Retrieving model info") + } + + versionKey := constructKey(rbName, rbVersion, profileName, c.instanceID, tagVersion, configName) + err = db.Etcd.DeletePrefix(versionKey) + if err != nil { + log.Printf("Deleting versions of instance failed: %s", err.Error()) + } + + counterKey := constructKey(rbName, rbVersion, profileName, c.instanceID, tagCounter, configName) + err = db.Etcd.DeletePrefix(counterKey) + if err != nil { + log.Printf("Deleting counters of instance failed: %s", err.Error()) + } + + nameKey := constructKey(rbName, rbVersion, profileName, c.instanceID, tagName, configName) + err = db.Etcd.DeletePrefix(nameKey) + if err != nil { + log.Printf("Deleting counters of instance failed: %s", err.Error()) + } + + return nil +} + // Create a version for the configuration. If previous config provided that is also stored -func (c ConfigVersionStore) createConfigVersion(configNew, configPrev Config, action string) (uint, error) { +func (c ConfigVersionStore) createConfigVersion(configNew, configPrev Config, action string, resources []helm.KubernetesResource) (uint, error) { - version, err := c.incrementVersion() + configName := "" + if configNew.ConfigName != "" { + configName = configNew.ConfigName + } else { + configName = configPrev.ConfigName + } + + version, err := c.incrementVersion(configName) if err != nil { return 0, pkgerrors.Wrap(err, "Get Next Version") @@ -234,12 +274,14 @@ func (c ConfigVersionStore) createConfigVersion(configNew, configPrev Config, ac if err != nil { return 0, pkgerrors.Wrap(err, "Retrieving model info") } - versionKey := constructKey(rbName, rbVersion, profileName, c.instanceID, tagVersion, strconv.Itoa(int(version))) + + versionKey := constructKey(rbName, rbVersion, profileName, c.instanceID, tagVersion, configName, strconv.Itoa(int(version))) var cs configVersionDBContent cs.Action = action cs.ConfigNew = configNew cs.ConfigPrev = configPrev + cs.Resources = resources //[]helm.KubernetesResource{} configValue, err := db.Serialize(cs) if err != nil { @@ -253,9 +295,9 @@ func (c ConfigVersionStore) createConfigVersion(configNew, configPrev Config, ac } // Delete current version of the configuration. Configuration always deleted from top -func (c ConfigVersionStore) deleteConfigVersion() error { +func (c ConfigVersionStore) deleteConfigVersion(configName string) error { - counter, err := c.getCurrentVersion() + counter, err := c.getCurrentVersion(configName) if err != nil { return pkgerrors.Wrap(err, "Get Next Version") @@ -264,13 +306,13 @@ func (c ConfigVersionStore) deleteConfigVersion() error { if err != nil { return pkgerrors.Wrap(err, "Retrieving model info") } - versionKey := constructKey(rbName, rbVersion, profileName, c.instanceID, tagVersion, strconv.Itoa(int(counter))) + versionKey := constructKey(rbName, rbVersion, profileName, c.instanceID, tagVersion, configName, strconv.Itoa(int(counter))) err = db.Etcd.Delete(versionKey) if err != nil { return pkgerrors.Wrap(err, "Delete Config DB Entry") } - err = c.decrementVersion() + err = c.decrementVersion(configName) if err != nil { return pkgerrors.Wrap(err, "Decrement Version") } @@ -279,37 +321,37 @@ func (c ConfigVersionStore) deleteConfigVersion() error { // Read the specified version of the configuration and return its prev and current value. // Also returns the action for the config version -func (c ConfigVersionStore) getConfigVersion(version uint) (Config, Config, string, error) { +func (c ConfigVersionStore) getConfigVersion(configName string, version uint) (Config, Config, string, []helm.KubernetesResource, error) { rbName, rbVersion, profileName, _, err := resolveModelFromInstance(c.instanceID) if err != nil { - return Config{}, Config{}, "", pkgerrors.Wrap(err, "Retrieving model info") + return Config{}, Config{}, "", []helm.KubernetesResource{}, pkgerrors.Wrap(err, "Retrieving model info") } - versionKey := constructKey(rbName, rbVersion, profileName, c.instanceID, tagVersion, strconv.Itoa(int(version))) + versionKey := constructKey(rbName, rbVersion, profileName, c.instanceID, tagVersion, configName, strconv.Itoa(int(version))) configBytes, err := db.Etcd.Get(versionKey) if err != nil { - return Config{}, Config{}, "", pkgerrors.Wrap(err, "Get Config Version ") + return Config{}, Config{}, "", []helm.KubernetesResource{}, pkgerrors.Wrap(err, "Get Config Version ") } if configBytes != nil { pr := configVersionDBContent{} err = db.DeSerialize(string(configBytes), &pr) if err != nil { - return Config{}, Config{}, "", pkgerrors.Wrap(err, "DeSerialize Config Version") + return Config{}, Config{}, "", []helm.KubernetesResource{}, pkgerrors.Wrap(err, "DeSerialize Config Version") } - return pr.ConfigNew, pr.ConfigPrev, pr.Action, nil + return pr.ConfigNew, pr.ConfigPrev, pr.Action, pr.Resources, nil } - return Config{}, Config{}, "", pkgerrors.Wrap(err, "Invalid data ") + return Config{}, Config{}, "", []helm.KubernetesResource{}, pkgerrors.Wrap(err, "Invalid data ") } // Get the counter for the version -func (c ConfigVersionStore) getCurrentVersion() (uint, error) { +func (c ConfigVersionStore) getCurrentVersion(configName string) (uint, error) { rbName, rbVersion, profileName, _, err := resolveModelFromInstance(c.instanceID) if err != nil { return 0, pkgerrors.Wrap(err, "Retrieving model info") } - cfgKey := constructKey(rbName, rbVersion, profileName, c.instanceID, tagCounter) + cfgKey := constructKey(rbName, rbVersion, profileName, c.instanceID, tagCounter, configName) value, err := db.Etcd.Get(cfgKey) if err != nil { @@ -329,13 +371,13 @@ func (c ConfigVersionStore) getCurrentVersion() (uint, error) { } // Update the counter for the version -func (c ConfigVersionStore) updateVersion(counter uint) error { +func (c ConfigVersionStore) updateVersion(configName string, counter uint) error { rbName, rbVersion, profileName, _, err := resolveModelFromInstance(c.instanceID) if err != nil { return pkgerrors.Wrap(err, "Retrieving model info") } - cfgKey := constructKey(rbName, rbVersion, profileName, c.instanceID, tagCounter) + cfgKey := constructKey(rbName, rbVersion, profileName, c.instanceID, tagCounter, configName) err = db.Etcd.Put(cfgKey, strconv.Itoa(int(counter))) if err != nil { return pkgerrors.Wrap(err, "Counter DB Entry") @@ -344,15 +386,15 @@ func (c ConfigVersionStore) updateVersion(counter uint) error { } // Increment the version counter -func (c ConfigVersionStore) incrementVersion() (uint, error) { +func (c ConfigVersionStore) incrementVersion(configName string) (uint, error) { - counter, err := c.getCurrentVersion() + counter, err := c.getCurrentVersion(configName) if err != nil { return 0, pkgerrors.Wrap(err, "Get Next Counter Value") } //This is done while Profile lock is taken counter++ - err = c.updateVersion(counter) + err = c.updateVersion(configName, counter) if err != nil { return 0, pkgerrors.Wrap(err, "Store Next Counter Value") } @@ -361,15 +403,15 @@ func (c ConfigVersionStore) incrementVersion() (uint, error) { } // Decrement the version counter -func (c ConfigVersionStore) decrementVersion() error { +func (c ConfigVersionStore) decrementVersion(configName string) error { - counter, err := c.getCurrentVersion() + counter, err := c.getCurrentVersion(configName) if err != nil { return pkgerrors.Wrap(err, "Get Next Counter Value") } //This is done while Profile lock is taken counter-- - err = c.updateVersion(counter) + err = c.updateVersion(configName, counter) if err != nil { return pkgerrors.Wrap(err, "Store Next Counter Value") } @@ -377,45 +419,87 @@ func (c ConfigVersionStore) decrementVersion() error { return nil } +// Get tag version +func (c ConfigVersionStore) getTagVersion(configName, tagNameValue string) (string, error) { + rbName, rbVersion, profileName, _, err := resolveModelFromInstance(c.instanceID) + if err != nil { + return "", pkgerrors.Wrap(err, "Retrieving model info") + } + tagKey := constructKey(rbName, rbVersion, profileName, c.instanceID, tagName, configName, tagNameValue) + + value, err := db.Etcd.Get(tagKey) + if err != nil { + return "", pkgerrors.Wrap(err, "Config DB Entry Not found") + } + return string(value), nil +} + +// Tag current version +func (c ConfigVersionStore) tagCurrentVersion(configName, tagNameValue string) error { + currentVersion, err := c.getCurrentVersion(configName) + if err != nil { + return pkgerrors.Wrap(err, "Get Current Config Version ") + } + rbName, rbVersion, profileName, _, err := resolveModelFromInstance(c.instanceID) + if err != nil { + return pkgerrors.Wrap(err, "Retrieving model info") + } + tagKey := constructKey(rbName, rbVersion, profileName, c.instanceID, tagName, configName, tagNameValue) + + err = db.Etcd.Put(tagKey, strconv.Itoa(int(currentVersion))) + if err != nil { + return pkgerrors.Wrap(err, "TagIt store DB") + } + return nil +} + // Apply Config -func applyConfig(instanceID string, p Config, pChannel chan configResourceList, action string) error { +func applyConfig(instanceID string, p Config, pChannel chan configResourceList, action string, resources []helm.KubernetesResource) ([]helm.KubernetesResource, error) { rbName, rbVersion, profileName, releaseName, err := resolveModelFromInstance(instanceID) if err != nil { - return pkgerrors.Wrap(err, "Retrieving model info") + return []helm.KubernetesResource{}, pkgerrors.Wrap(err, "Retrieving model info") } // Get Template and Resolve the template with values crl, err := resolve(rbName, rbVersion, profileName, p, releaseName) if err != nil { - return pkgerrors.Wrap(err, "Resolve Config") + return []helm.KubernetesResource{}, pkgerrors.Wrap(err, "Resolve Config") } + var updatedResources (chan []helm.KubernetesResource) = make(chan []helm.KubernetesResource) crl.action = action + crl.resources = resources + crl.updatedResources = updatedResources // Send the configResourceList to the channel. Using select for non-blocking channel + log.Printf("Before Sent to goroutine %v", crl.profile) select { case pChannel <- crl: log.Printf("Message Sent to goroutine %v", crl.profile) default: } - return nil + var resultResources []helm.KubernetesResource = <-updatedResources + return resultResources, nil } // Per Profile Go routine to apply the configuration to Cloud Region func scheduleResources(c chan configResourceList) { // Keep thread running + log.Printf("[scheduleResources]: START thread") for { data := <-c //TODO: ADD Check to see if Application running ic := NewInstanceClient() resp, err := ic.Find(data.profile.RBName, data.profile.RBVersion, data.profile.ProfileName, nil) - if err != nil || len(resp) == 0 { + if (err != nil || len(resp) == 0) && data.action != "STOP" { log.Println("Error finding a running instance. Retrying later...") - time.Sleep(time.Second * 10) + data.updatedResources <- []helm.KubernetesResource{} continue } + breakThread := false switch { case data.action == "POST": log.Printf("[scheduleResources]: POST %v %v", data.profile, data.resourceTemplates) + var resources []helm.KubernetesResource for _, inst := range resp { k8sClient := KubernetesClient{} err = k8sClient.Init(inst.Request.CloudRegion, inst.ID) @@ -425,11 +509,11 @@ func scheduleResources(c chan configResourceList) { continue } //assuming - the resource is not exist already - data.createdResources, err = k8sClient.createResources(data.resourceTemplates, inst.Namespace) + resources, err = k8sClient.createResources(data.resourceTemplates, inst.Namespace) errCreate := err if err != nil { // assuming - the err represent the resource is already exist, so going for update - data.createdResources, err = k8sClient.updateResources(data.resourceTemplates, inst.Namespace) + resources, err = k8sClient.updateResources(data.resourceTemplates, inst.Namespace) if err != nil { log.Printf("Error Creating resources: %s", errCreate.Error()) log.Printf("Error Updating resources: %s", err.Error()) @@ -437,12 +521,28 @@ func scheduleResources(c chan configResourceList) { } } } - //TODO: Needs to add code to call Kubectl create + data.updatedResources <- resources case data.action == "PUT": log.Printf("[scheduleResources]: PUT %v %v", data.profile, data.resourceTemplates) - //TODO: Needs to add code to call Kubectl apply + var resources []helm.KubernetesResource + for _, inst := range resp { + k8sClient := KubernetesClient{} + err = k8sClient.Init(inst.Request.CloudRegion, inst.ID) + if err != nil { + log.Printf("Getting CloudRegion Information: %s", err.Error()) + //Move onto the next cloud region + continue + } + + resources, err = k8sClient.updateResources(data.resourceTemplates, inst.Namespace) + if err != nil { + log.Printf("Error Updating resources: %s", err.Error()) + continue + } + } + data.updatedResources <- resources case data.action == "DELETE": - log.Printf("[scheduleResources]: DELETE %v %v", data.profile, data.resourceTemplates) + log.Printf("[scheduleResources]: DELETE %v %v", data.profile, data.resources) for _, inst := range resp { k8sClient := KubernetesClient{} err = k8sClient.Init(inst.Request.CloudRegion, inst.ID) @@ -451,14 +551,22 @@ func scheduleResources(c chan configResourceList) { //Move onto the next cloud region continue } - err = k8sClient.deleteResources(data.createdResources, inst.Namespace) + err = k8sClient.deleteResources(data.resources, inst.Namespace) if err != nil { log.Printf("Error Deleting resources: %s", err.Error()) continue } } + data.updatedResources <- []helm.KubernetesResource{} + + case data.action == "STOP": + breakThread = true + } + if breakThread { + break } } + log.Printf("[scheduleResources]: STOP thread") } //Resolve returns the path where the helm chart merged with @@ -549,6 +657,25 @@ func getProfileData(key string) (*sync.Mutex, chan configResourceList) { if !ok { profileData.resourceChannel[key] = make(chan configResourceList) go scheduleResources(profileData.resourceChannel[key]) + time.Sleep(time.Second * 5) } return profileData.profileLockMap[key], profileData.resourceChannel[key] } + +func removeProfileData(key string) { + profileData.Lock() + defer profileData.Unlock() + _, ok := profileData.profileLockMap[key] + if ok { + delete(profileData.profileLockMap, key) + } + _, ok = profileData.resourceChannel[key] + if ok { + log.Printf("Stop config thread for %s", key) + crl := configResourceList{ + action: "STOP", + } + profileData.resourceChannel[key] <- crl + delete(profileData.resourceChannel, key) + } +} diff --git a/src/k8splugin/internal/app/config_test.go b/src/k8splugin/internal/app/config_test.go index 9ee96881..0cc3c3ce 100644 --- a/src/k8splugin/internal/app/config_test.go +++ b/src/k8splugin/internal/app/config_test.go @@ -293,7 +293,7 @@ func TestRollbackConfig(t *testing.T) { } } testCase.rollbackConfig.AnyOf.ConfigVersion = "2" - err = impl.Rollback(testCase.instanceID, testCase.rollbackConfig) + err = impl.Rollback(testCase.instanceID, testCase.inp.ConfigName, testCase.rollbackConfig) if err != nil { if testCase.expectedError == "" { t.Fatalf("Create returned an unexpected error %s", err) @@ -319,3 +319,6 @@ func TestRollbackConfig(t *testing.T) { }) } } + +func main() { +} diff --git a/src/k8splugin/internal/app/instance.go b/src/k8splugin/internal/app/instance.go index ad36aaa5..b7f382ad 100644 --- a/src/k8splugin/internal/app/instance.go +++ b/src/k8splugin/internal/app/instance.go @@ -709,7 +709,8 @@ func (v *InstanceClient) Delete(id string) error { return nil } else if inst.Status != "DONE" { //Recover is ongoing, do nothing here - return nil + //return nil + //TODO: implement recovery } k8sClient := KubernetesClient{} @@ -717,6 +718,7 @@ func (v *InstanceClient) Delete(id string) error { if err != nil { return pkgerrors.Wrap(err, "Getting CloudRegion Information") } + inst.Status = "PRE-DELETE" inst.HookProgress = "" err = db.DBconn.Update(v.storeName, key, v.tagInst, inst) @@ -743,6 +745,13 @@ func (v *InstanceClient) Delete(id string) error { if err != nil { log.Printf("Update Instance DB Entry for release %s has error.", inst.ReleaseName) } + + configClient := NewConfigClient() + err = configClient.Cleanup(id) + if err != nil { + return pkgerrors.Wrap(err, "Cleanup Config Resources") + } + err = k8sClient.deleteResources(inst.Resources, inst.Namespace) if err != nil { return pkgerrors.Wrap(err, "Deleting Instance Resources") |