summaryrefslogtreecommitdiffstats
path: root/src/k8splugin/internal/app
diff options
context:
space:
mode:
Diffstat (limited to 'src/k8splugin/internal/app')
-rw-r--r--src/k8splugin/internal/app/client.go24
-rw-r--r--src/k8splugin/internal/app/config.go210
-rw-r--r--src/k8splugin/internal/app/config_backend.go207
-rw-r--r--src/k8splugin/internal/app/config_test.go5
-rw-r--r--src/k8splugin/internal/app/instance.go11
5 files changed, 342 insertions, 115 deletions
diff --git a/src/k8splugin/internal/app/client.go b/src/k8splugin/internal/app/client.go
index 9813333e..06c4c464 100644
--- a/src/k8splugin/internal/app/client.go
+++ b/src/k8splugin/internal/app/client.go
@@ -20,25 +20,30 @@ package app
import (
"context"
"io/ioutil"
+
appsv1 "k8s.io/api/apps/v1"
+
//appsv1beta1 "k8s.io/api/apps/v1beta1"
//appsv1beta2 "k8s.io/api/apps/v1beta2"
batchv1 "k8s.io/api/batch/v1"
corev1 "k8s.io/api/core/v1"
+
//extensionsv1beta1 "k8s.io/api/extensions/v1beta1"
//apiextv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
//apiextv1beta1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1beta1"
- metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"os"
"strings"
"time"
+ metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
+
+ logger "log"
+
"github.com/onap/multicloud-k8s/src/k8splugin/internal/config"
"github.com/onap/multicloud-k8s/src/k8splugin/internal/connection"
"github.com/onap/multicloud-k8s/src/k8splugin/internal/helm"
log "github.com/onap/multicloud-k8s/src/k8splugin/internal/logutils"
"github.com/onap/multicloud-k8s/src/k8splugin/internal/plugin"
- logger "log"
pkgerrors "github.com/pkg/errors"
"k8s.io/apimachinery/pkg/api/meta"
@@ -545,9 +550,18 @@ func (k *KubernetesClient) DeleteKind(resource helm.KubernetesResource, namespac
return pkgerrors.Wrap(err, "Error loading plugin")
}
- err = pluginImpl.Delete(resource, namespace, k)
- if err != nil {
- return pkgerrors.Wrap(err, "Error deleting "+resource.Name)
+ name, err := pluginImpl.Get(resource, namespace, k)
+
+ if (err == nil && name == resource.Name) || (err != nil && strings.Contains(err.Error(), "not found") == false) {
+ err = pluginImpl.Delete(resource, namespace, k)
+ if err != nil {
+ return pkgerrors.Wrap(err, "Error deleting "+resource.Name)
+ }
+ } else {
+ log.Warn("Resource does not exist, Skipping delete", log.Fields{
+ "gvk": resource.GVK,
+ "resource": resource.Name,
+ })
}
return nil
diff --git a/src/k8splugin/internal/app/config.go b/src/k8splugin/internal/app/config.go
index 94acadcc..8952c16d 100644
--- a/src/k8splugin/internal/app/config.go
+++ b/src/k8splugin/internal/app/config.go
@@ -18,10 +18,11 @@
package app
import (
+ "log"
"strconv"
"strings"
- "github.com/onap/multicloud-k8s/src/k8splugin/internal/db"
+ "github.com/onap/multicloud-k8s/src/k8splugin/internal/helm"
pkgerrors "github.com/pkg/errors"
)
@@ -66,8 +67,10 @@ type ConfigManager interface {
Help() map[string]string
Update(instanceID, configName string, p Config) (ConfigResult, error)
Delete(instanceID, configName string) (ConfigResult, error)
- Rollback(instanceID string, p ConfigRollback) error
- Tagit(instanceID string, p ConfigTagit) error
+ DeleteAll(instanceID, configName string) error
+ Rollback(instanceID string, configName string, p ConfigRollback) error
+ Cleanup(instanceID string) error
+ Tagit(instanceID string, configName string, p ConfigTagit) error
}
// ConfigClient implements the ConfigManager
@@ -94,7 +97,7 @@ func (v *ConfigClient) Help() map[string]string {
// Create an entry for the config in the database
func (v *ConfigClient) Create(instanceID string, p Config) (ConfigResult, error) {
-
+ log.Printf("[Config Create] Instance %s", instanceID)
// Check required fields
if p.ConfigName == "" || p.TemplateName == "" || len(p.Values) == 0 {
return ConfigResult{}, pkgerrors.New("Incomplete Configuration Provided")
@@ -120,10 +123,12 @@ func (v *ConfigClient) Create(instanceID string, p Config) (ConfigResult, error)
// Acquire per profile Mutex
lock.Lock()
defer lock.Unlock()
- err = applyConfig(instanceID, p, profileChannel, "POST")
+ var appliedResources ([]helm.KubernetesResource)
+ appliedResources, err = applyConfig(instanceID, p, profileChannel, "POST", nil)
if err != nil {
return ConfigResult{}, pkgerrors.Wrap(err, "Apply Config failed")
}
+ log.Printf("POST result: %s", appliedResources)
// Create Config DB Entry
err = cs.createConfig(p)
if err != nil {
@@ -132,8 +137,9 @@ func (v *ConfigClient) Create(instanceID string, p Config) (ConfigResult, error)
// Create Version Entry in DB for Config
cvs := ConfigVersionStore{
instanceID: instanceID,
+ configName: p.ConfigName,
}
- version, err := cvs.createConfigVersion(p, Config{}, "POST")
+ version, err := cvs.createConfigVersion(p, Config{}, "POST", appliedResources)
if err != nil {
return ConfigResult{}, pkgerrors.Wrap(err, "Create Config Version DB Entry")
}
@@ -153,7 +159,7 @@ func (v *ConfigClient) Create(instanceID string, p Config) (ConfigResult, error)
// Update an entry for the config in the database
func (v *ConfigClient) Update(instanceID, configName string, p Config) (ConfigResult, error) {
-
+ log.Printf("[Config Update] Instance %s Config %s", instanceID, configName)
// Check required fields
if len(p.Values) == 0 {
return ConfigResult{}, pkgerrors.New("Incomplete Configuration Provided")
@@ -176,10 +182,12 @@ func (v *ConfigClient) Update(instanceID, configName string, p Config) (ConfigRe
// Acquire per profile Mutex
lock.Lock()
defer lock.Unlock()
- err = applyConfig(instanceID, p, profileChannel, "PUT")
+ var appliedResources ([]helm.KubernetesResource)
+ appliedResources, err = applyConfig(instanceID, p, profileChannel, "PUT", nil)
if err != nil {
return ConfigResult{}, pkgerrors.Wrap(err, "Apply Config failed")
}
+ log.Printf("PUT result: %s", appliedResources)
// Update Config DB Entry
configPrev, err := cs.updateConfig(p)
if err != nil {
@@ -188,8 +196,9 @@ func (v *ConfigClient) Update(instanceID, configName string, p Config) (ConfigRe
// Create Version Entry in DB for Config
cvs := ConfigVersionStore{
instanceID: instanceID,
+ configName: configName,
}
- version, err := cvs.createConfigVersion(p, configPrev, "PUT")
+ version, err := cvs.createConfigVersion(p, configPrev, "PUT", appliedResources)
if err != nil {
return ConfigResult{}, pkgerrors.Wrap(err, "Create Config Version DB Entry")
}
@@ -245,8 +254,49 @@ func (v *ConfigClient) List(instanceID string) ([]Config, error) {
}
// Delete the Config from database
-func (v *ConfigClient) Delete(instanceID, configName string) (ConfigResult, error) {
+func (v *ConfigClient) DeleteAll(instanceID, configName string) error {
+ log.Printf("[Config Delete All] Instance %s Config %s", instanceID, configName)
+ // Check if Config exists
+ cs := ConfigStore{
+ instanceID: instanceID,
+ configName: configName,
+ }
+ _, err := cs.getConfig()
+ if err != nil {
+ return pkgerrors.Wrap(err, "Update Error - Config doesn't exist")
+ }
+ // Get Version Entry in DB for Config
+ cvs := ConfigVersionStore{
+ instanceID: instanceID,
+ configName: configName,
+ }
+ currentVersion, err := cvs.getCurrentVersion(configName)
+ if err != nil {
+ return pkgerrors.Wrap(err, "Current version get failed")
+ }
+ _, _, action, _, err := cvs.getConfigVersion(configName, currentVersion)
+ if err != nil {
+ return pkgerrors.Wrap(err, "Config version get failed")
+ }
+
+ if action != "DELETE" {
+ _, err = v.Delete(instanceID, configName)
+ if err != nil {
+ return pkgerrors.Wrap(err, "Config DELETE version failed")
+ }
+ }
+ // Delete Config from DB
+ _, err = cs.deleteConfig()
+ if err != nil {
+ return pkgerrors.Wrap(err, "Delete Config DB Entry")
+ }
+ cvs.cleanupIstanceTags(configName)
+ return nil
+}
+// Apply update with delete operation
+func (v *ConfigClient) Delete(instanceID, configName string) (ConfigResult, error) {
+ log.Printf("[Config Delete] Instance %s Config %s", instanceID, configName)
// Resolving rbName, Version, etc. not to break response
rbName, rbVersion, profileName, _, err := resolveModelFromInstance(instanceID)
if err != nil {
@@ -259,28 +309,39 @@ func (v *ConfigClient) Delete(instanceID, configName string) (ConfigResult, erro
}
p, err := cs.getConfig()
if err != nil {
- return ConfigResult{}, pkgerrors.Wrap(err, "Update Error - Config doesn't exist")
+ return ConfigResult{}, pkgerrors.Wrap(err, "Delete Error - Config doesn't exist")
}
lock, profileChannel := getProfileData(instanceID)
// Acquire per profile Mutex
lock.Lock()
defer lock.Unlock()
- err = applyConfig(instanceID, p, profileChannel, "DELETE")
+ // Create Version Entry in DB for Config
+ cvs := ConfigVersionStore{
+ instanceID: instanceID,
+ configName: configName,
+ }
+ currentVersion, err := cvs.getCurrentVersion(configName)
if err != nil {
- return ConfigResult{}, pkgerrors.Wrap(err, "Apply Config failed")
+ return ConfigResult{}, pkgerrors.Wrap(err, "Current version get failed")
}
- // Delete Config from DB
- configPrev, err := cs.deleteConfig()
+ _, _, _, resources, err := cvs.getConfigVersion(configName, currentVersion)
if err != nil {
- return ConfigResult{}, pkgerrors.Wrap(err, "Delete Config DB Entry")
+ return ConfigResult{}, pkgerrors.Wrap(err, "Config version get failed")
}
- // Create Version Entry in DB for Config
- cvs := ConfigVersionStore{
- instanceID: instanceID,
+
+ _, err = applyConfig(instanceID, p, profileChannel, "DELETE", resources)
+ if err != nil {
+ return ConfigResult{}, pkgerrors.Wrap(err, "Apply Config failed")
+ }
+ log.Printf("DELETE resources: [%s]", resources)
+ // Update Config from DB
+ configPrev, err := cs.updateConfig(p)
+ if err != nil {
+ return ConfigResult{}, pkgerrors.Wrap(err, "Update Config DB Entry")
}
- version, err := cvs.createConfigVersion(Config{}, configPrev, "DELETE")
+ version, err := cvs.createConfigVersion(p, configPrev, "DELETE", []helm.KubernetesResource{})
if err != nil {
- return ConfigResult{}, pkgerrors.Wrap(err, "Delete Config Version DB Entry")
+ return ConfigResult{}, pkgerrors.Wrap(err, "Create Delete Config Version DB Entry")
}
// Create Result structure
@@ -297,13 +358,13 @@ func (v *ConfigClient) Delete(instanceID, configName string) (ConfigResult, erro
}
// Rollback starts from current version and rollbacks to the version desired
-func (v *ConfigClient) Rollback(instanceID string, rback ConfigRollback) error {
-
+func (v *ConfigClient) Rollback(instanceID string, configName string, rback ConfigRollback) error {
+ log.Printf("[Config Rollback] Instance %s Config %s", instanceID, configName)
var reqVersion string
var err error
if rback.AnyOf.ConfigTag != "" {
- reqVersion, err = v.GetTagVersion(instanceID, rback.AnyOf.ConfigTag)
+ reqVersion, err = v.GetTagVersion(instanceID, configName, rback.AnyOf.ConfigTag)
if err != nil {
return pkgerrors.Wrap(err, "Rollback Invalid tag")
}
@@ -326,8 +387,9 @@ func (v *ConfigClient) Rollback(instanceID string, rback ConfigRollback) error {
cvs := ConfigVersionStore{
instanceID: instanceID,
+ configName: configName,
}
- currentVersion, err := cvs.getCurrentVersion()
+ currentVersion, err := cvs.getCurrentVersion(configName)
if err != nil {
return pkgerrors.Wrap(err, "Rollback Get Current Config Version ")
}
@@ -338,40 +400,35 @@ func (v *ConfigClient) Rollback(instanceID string, rback ConfigRollback) error {
//Rollback all the intermettinent configurations
for i := currentVersion; i > rollbackIndex; i-- {
- configNew, configPrev, action, err := cvs.getConfigVersion(i)
+ configNew, configPrev, _, resources, err := cvs.getConfigVersion(configName, i)
if err != nil {
return pkgerrors.Wrap(err, "Rollback Get Config Version")
}
+ _, _, prevAction, _, err := cvs.getConfigVersion(configName, i-1)
+ if err != nil {
+ return pkgerrors.Wrap(err, "Rollback Get Prev Config Version")
+ }
cs := ConfigStore{
instanceID: instanceID,
configName: configNew.ConfigName,
}
- if action == "PUT" {
- // PUT is proceeded by PUT or POST
- err = applyConfig(instanceID, configPrev, profileChannel, "PUT")
+ if prevAction != "DELETE" {
+ appliedResources, err := applyConfig(instanceID, configPrev, profileChannel, prevAction, nil)
if err != nil {
return pkgerrors.Wrap(err, "Apply Config failed")
}
+ log.Printf("%s result: %s", prevAction, appliedResources)
_, err = cs.updateConfig(configPrev)
if err != nil {
return pkgerrors.Wrap(err, "Update Config DB Entry")
}
- } else if action == "POST" {
+ } else {
// POST is always preceeded by Config not existing
- err = applyConfig(instanceID, configNew, profileChannel, "DELETE")
- if err != nil {
- return pkgerrors.Wrap(err, "Delete Config failed")
- }
- _, err = cs.deleteConfig()
- if err != nil {
- return pkgerrors.Wrap(err, "Delete Config DB Entry")
- }
- } else if action == "DELETE" {
- // DELETE is proceeded by PUT or POST
- err = applyConfig(instanceID, configPrev, profileChannel, "PUT")
+ _, err := applyConfig(instanceID, configNew, profileChannel, prevAction, resources)
if err != nil {
return pkgerrors.Wrap(err, "Delete Config failed")
}
+ log.Printf("DELETE resources: %s", resources)
_, err = cs.updateConfig(configPrev)
if err != nil {
return pkgerrors.Wrap(err, "Update Config DB Entry")
@@ -380,7 +437,7 @@ func (v *ConfigClient) Rollback(instanceID string, rback ConfigRollback) error {
}
for i := currentVersion; i > rollbackIndex; i-- {
// Delete rolled back items
- err = cvs.deleteConfigVersion()
+ err = cvs.deleteConfigVersion(configName)
if err != nil {
return pkgerrors.Wrap(err, "Delete Config Version ")
}
@@ -389,12 +446,8 @@ func (v *ConfigClient) Rollback(instanceID string, rback ConfigRollback) error {
}
// Tagit tags the current version with the tag provided
-func (v *ConfigClient) Tagit(instanceID string, tag ConfigTagit) error {
-
- rbName, rbVersion, profileName, _, err := resolveModelFromInstance(instanceID)
- if err != nil {
- return pkgerrors.Wrap(err, "Retrieving model info")
- }
+func (v *ConfigClient) Tagit(instanceID string, configName string, tag ConfigTagit) error {
+ log.Printf("[Config Tag It] Instance %s Config %s", instanceID, configName)
lock, _ := getProfileData(instanceID)
// Acquire per profile Mutex
lock.Lock()
@@ -402,39 +455,54 @@ func (v *ConfigClient) Tagit(instanceID string, tag ConfigTagit) error {
cvs := ConfigVersionStore{
instanceID: instanceID,
+ configName: configName,
}
- currentVersion, err := cvs.getCurrentVersion()
+ err := cvs.tagCurrentVersion(configName, tag.TagName)
if err != nil {
- return pkgerrors.Wrap(err, "Get Current Config Version ")
+ return pkgerrors.Wrap(err, "Tag of current version failed")
}
- tagKey := constructKey(rbName, rbVersion, profileName, instanceID, v.tagTag, tag.TagName)
+ return nil
+}
- err = db.Etcd.Put(tagKey, strconv.Itoa(int(currentVersion)))
+// GetTagVersion returns the version associated with the tag
+func (v *ConfigClient) GetTagVersion(instanceID, configName string, tagName string) (string, error) {
+ log.Printf("[Config Get Tag Version] Instance %s Config %s", instanceID, configName)
+ cvs := ConfigVersionStore{
+ instanceID: instanceID,
+ configName: configName,
+ }
+ value, err := cvs.getTagVersion(configName, tagName)
if err != nil {
- return pkgerrors.Wrap(err, "TagIt store DB")
+ return "", pkgerrors.Wrap(err, "Tag of current version failed")
}
- return nil
+
+ return value, nil
}
-// GetTagVersion returns the version associated with the tag
-func (v *ConfigClient) GetTagVersion(instanceID, tagName string) (string, error) {
+// Cleanup version used only when instance is being deleted. We do not pass errors and we try to delete data
+func (v *ConfigClient) Cleanup(instanceID string) error {
+ log.Printf("[Config Cleanup] Instance %s", instanceID)
+ configs, err := v.List(instanceID)
- rbName, rbVersion, profileName, _, err := resolveModelFromInstance(instanceID)
if err != nil {
- return "", pkgerrors.Wrap(err, "Retrieving model info")
+ return pkgerrors.Wrap(err, "Retrieving active config list info")
}
- tagKey := constructKey(rbName, rbVersion, profileName, instanceID, v.tagTag, tagName)
- value, err := db.Etcd.Get(tagKey)
- if err != nil {
- return "", pkgerrors.Wrap(err, "Config DB Entry Not found")
+ for _, config := range configs {
+ err = v.DeleteAll(instanceID, config.ConfigName)
+ if err != nil {
+ log.Printf("Config %s delete failed: %s", config.ConfigName, err.Error())
+ }
}
- return string(value), nil
+
+ removeProfileData(instanceID)
+
+ return nil
}
// ApplyAllConfig starts from first configuration version and applies all versions in sequence
-func (v *ConfigClient) ApplyAllConfig(instanceID string) error {
-
+func (v *ConfigClient) ApplyAllConfig(instanceID string, configName string) error {
+ log.Printf("[Config Apply All] Instance %s Config %s", instanceID, configName)
lock, profileChannel := getProfileData(instanceID)
// Acquire per profile Mutex
lock.Lock()
@@ -442,8 +510,9 @@ func (v *ConfigClient) ApplyAllConfig(instanceID string) error {
cvs := ConfigVersionStore{
instanceID: instanceID,
+ configName: configName,
}
- currentVersion, err := cvs.getCurrentVersion()
+ currentVersion, err := cvs.getCurrentVersion(configName)
if err != nil {
return pkgerrors.Wrap(err, "Get Current Config Version ")
}
@@ -453,14 +522,19 @@ func (v *ConfigClient) ApplyAllConfig(instanceID string) error {
//Apply all configurations
var i uint
for i = 1; i <= currentVersion; i++ {
- configNew, _, action, err := cvs.getConfigVersion(i)
+ configNew, _, action, resources, err := cvs.getConfigVersion(configName, i)
if err != nil {
return pkgerrors.Wrap(err, "Get Config Version")
}
- err = applyConfig(instanceID, configNew, profileChannel, action)
+ if action != "DELETE" {
+ resources = nil
+ }
+ var appliedResources ([]helm.KubernetesResource)
+ appliedResources, err = applyConfig(instanceID, configNew, profileChannel, action, resources)
if err != nil {
return pkgerrors.Wrap(err, "Apply Config failed")
}
+ log.Printf("%s result: %s", action, appliedResources)
}
return nil
}
diff --git a/src/k8splugin/internal/app/config_backend.go b/src/k8splugin/internal/app/config_backend.go
index 30a480df..4fedb386 100644
--- a/src/k8splugin/internal/app/config_backend.go
+++ b/src/k8splugin/internal/app/config_backend.go
@@ -38,9 +38,10 @@ import (
//ConfigStore contains the values that will be stored in the database
type configVersionDBContent struct {
- ConfigNew Config `json:"config-new"`
- ConfigPrev Config `json:"config-prev"`
- Action string `json:"action"` // CRUD opration for this config
+ ConfigNew Config `json:"config-new"`
+ ConfigPrev Config `json:"config-prev"`
+ Action string `json:"action"` // CRUD opration for this config
+ Resources []helm.KubernetesResource `json:"resources"`
}
//ConfigStore to Store the Config
@@ -52,11 +53,13 @@ type ConfigStore struct {
//ConfigVersionStore to Store the Versions of the Config
type ConfigVersionStore struct {
instanceID string
+ configName string
}
type configResourceList struct {
resourceTemplates []helm.KubernetesResourceTemplate
- createdResources []helm.KubernetesResource
+ resources []helm.KubernetesResource
+ updatedResources chan []helm.KubernetesResource
profile rb.Profile
action string
}
@@ -71,6 +74,7 @@ const (
storeName = "config"
tagCounter = "counter"
tagVersion = "configversion"
+ tagName = "configtag"
tagConfig = "configdata"
)
@@ -222,10 +226,46 @@ func (c ConfigStore) deleteConfig() (Config, error) {
return configPrev, nil
}
+//Cleanup stored data in etcd before instance is being deleted
+func (c ConfigVersionStore) cleanupIstanceTags(configName string) error {
+
+ rbName, rbVersion, profileName, _, err := resolveModelFromInstance(c.instanceID)
+ if err != nil {
+ return pkgerrors.Wrap(err, "Retrieving model info")
+ }
+
+ versionKey := constructKey(rbName, rbVersion, profileName, c.instanceID, tagVersion, configName)
+ err = db.Etcd.DeletePrefix(versionKey)
+ if err != nil {
+ log.Printf("Deleting versions of instance failed: %s", err.Error())
+ }
+
+ counterKey := constructKey(rbName, rbVersion, profileName, c.instanceID, tagCounter, configName)
+ err = db.Etcd.DeletePrefix(counterKey)
+ if err != nil {
+ log.Printf("Deleting counters of instance failed: %s", err.Error())
+ }
+
+ nameKey := constructKey(rbName, rbVersion, profileName, c.instanceID, tagName, configName)
+ err = db.Etcd.DeletePrefix(nameKey)
+ if err != nil {
+ log.Printf("Deleting counters of instance failed: %s", err.Error())
+ }
+
+ return nil
+}
+
// Create a version for the configuration. If previous config provided that is also stored
-func (c ConfigVersionStore) createConfigVersion(configNew, configPrev Config, action string) (uint, error) {
+func (c ConfigVersionStore) createConfigVersion(configNew, configPrev Config, action string, resources []helm.KubernetesResource) (uint, error) {
- version, err := c.incrementVersion()
+ configName := ""
+ if configNew.ConfigName != "" {
+ configName = configNew.ConfigName
+ } else {
+ configName = configPrev.ConfigName
+ }
+
+ version, err := c.incrementVersion(configName)
if err != nil {
return 0, pkgerrors.Wrap(err, "Get Next Version")
@@ -234,12 +274,14 @@ func (c ConfigVersionStore) createConfigVersion(configNew, configPrev Config, ac
if err != nil {
return 0, pkgerrors.Wrap(err, "Retrieving model info")
}
- versionKey := constructKey(rbName, rbVersion, profileName, c.instanceID, tagVersion, strconv.Itoa(int(version)))
+
+ versionKey := constructKey(rbName, rbVersion, profileName, c.instanceID, tagVersion, configName, strconv.Itoa(int(version)))
var cs configVersionDBContent
cs.Action = action
cs.ConfigNew = configNew
cs.ConfigPrev = configPrev
+ cs.Resources = resources //[]helm.KubernetesResource{}
configValue, err := db.Serialize(cs)
if err != nil {
@@ -253,9 +295,9 @@ func (c ConfigVersionStore) createConfigVersion(configNew, configPrev Config, ac
}
// Delete current version of the configuration. Configuration always deleted from top
-func (c ConfigVersionStore) deleteConfigVersion() error {
+func (c ConfigVersionStore) deleteConfigVersion(configName string) error {
- counter, err := c.getCurrentVersion()
+ counter, err := c.getCurrentVersion(configName)
if err != nil {
return pkgerrors.Wrap(err, "Get Next Version")
@@ -264,13 +306,13 @@ func (c ConfigVersionStore) deleteConfigVersion() error {
if err != nil {
return pkgerrors.Wrap(err, "Retrieving model info")
}
- versionKey := constructKey(rbName, rbVersion, profileName, c.instanceID, tagVersion, strconv.Itoa(int(counter)))
+ versionKey := constructKey(rbName, rbVersion, profileName, c.instanceID, tagVersion, configName, strconv.Itoa(int(counter)))
err = db.Etcd.Delete(versionKey)
if err != nil {
return pkgerrors.Wrap(err, "Delete Config DB Entry")
}
- err = c.decrementVersion()
+ err = c.decrementVersion(configName)
if err != nil {
return pkgerrors.Wrap(err, "Decrement Version")
}
@@ -279,37 +321,37 @@ func (c ConfigVersionStore) deleteConfigVersion() error {
// Read the specified version of the configuration and return its prev and current value.
// Also returns the action for the config version
-func (c ConfigVersionStore) getConfigVersion(version uint) (Config, Config, string, error) {
+func (c ConfigVersionStore) getConfigVersion(configName string, version uint) (Config, Config, string, []helm.KubernetesResource, error) {
rbName, rbVersion, profileName, _, err := resolveModelFromInstance(c.instanceID)
if err != nil {
- return Config{}, Config{}, "", pkgerrors.Wrap(err, "Retrieving model info")
+ return Config{}, Config{}, "", []helm.KubernetesResource{}, pkgerrors.Wrap(err, "Retrieving model info")
}
- versionKey := constructKey(rbName, rbVersion, profileName, c.instanceID, tagVersion, strconv.Itoa(int(version)))
+ versionKey := constructKey(rbName, rbVersion, profileName, c.instanceID, tagVersion, configName, strconv.Itoa(int(version)))
configBytes, err := db.Etcd.Get(versionKey)
if err != nil {
- return Config{}, Config{}, "", pkgerrors.Wrap(err, "Get Config Version ")
+ return Config{}, Config{}, "", []helm.KubernetesResource{}, pkgerrors.Wrap(err, "Get Config Version ")
}
if configBytes != nil {
pr := configVersionDBContent{}
err = db.DeSerialize(string(configBytes), &pr)
if err != nil {
- return Config{}, Config{}, "", pkgerrors.Wrap(err, "DeSerialize Config Version")
+ return Config{}, Config{}, "", []helm.KubernetesResource{}, pkgerrors.Wrap(err, "DeSerialize Config Version")
}
- return pr.ConfigNew, pr.ConfigPrev, pr.Action, nil
+ return pr.ConfigNew, pr.ConfigPrev, pr.Action, pr.Resources, nil
}
- return Config{}, Config{}, "", pkgerrors.Wrap(err, "Invalid data ")
+ return Config{}, Config{}, "", []helm.KubernetesResource{}, pkgerrors.Wrap(err, "Invalid data ")
}
// Get the counter for the version
-func (c ConfigVersionStore) getCurrentVersion() (uint, error) {
+func (c ConfigVersionStore) getCurrentVersion(configName string) (uint, error) {
rbName, rbVersion, profileName, _, err := resolveModelFromInstance(c.instanceID)
if err != nil {
return 0, pkgerrors.Wrap(err, "Retrieving model info")
}
- cfgKey := constructKey(rbName, rbVersion, profileName, c.instanceID, tagCounter)
+ cfgKey := constructKey(rbName, rbVersion, profileName, c.instanceID, tagCounter, configName)
value, err := db.Etcd.Get(cfgKey)
if err != nil {
@@ -329,13 +371,13 @@ func (c ConfigVersionStore) getCurrentVersion() (uint, error) {
}
// Update the counter for the version
-func (c ConfigVersionStore) updateVersion(counter uint) error {
+func (c ConfigVersionStore) updateVersion(configName string, counter uint) error {
rbName, rbVersion, profileName, _, err := resolveModelFromInstance(c.instanceID)
if err != nil {
return pkgerrors.Wrap(err, "Retrieving model info")
}
- cfgKey := constructKey(rbName, rbVersion, profileName, c.instanceID, tagCounter)
+ cfgKey := constructKey(rbName, rbVersion, profileName, c.instanceID, tagCounter, configName)
err = db.Etcd.Put(cfgKey, strconv.Itoa(int(counter)))
if err != nil {
return pkgerrors.Wrap(err, "Counter DB Entry")
@@ -344,15 +386,15 @@ func (c ConfigVersionStore) updateVersion(counter uint) error {
}
// Increment the version counter
-func (c ConfigVersionStore) incrementVersion() (uint, error) {
+func (c ConfigVersionStore) incrementVersion(configName string) (uint, error) {
- counter, err := c.getCurrentVersion()
+ counter, err := c.getCurrentVersion(configName)
if err != nil {
return 0, pkgerrors.Wrap(err, "Get Next Counter Value")
}
//This is done while Profile lock is taken
counter++
- err = c.updateVersion(counter)
+ err = c.updateVersion(configName, counter)
if err != nil {
return 0, pkgerrors.Wrap(err, "Store Next Counter Value")
}
@@ -361,15 +403,15 @@ func (c ConfigVersionStore) incrementVersion() (uint, error) {
}
// Decrement the version counter
-func (c ConfigVersionStore) decrementVersion() error {
+func (c ConfigVersionStore) decrementVersion(configName string) error {
- counter, err := c.getCurrentVersion()
+ counter, err := c.getCurrentVersion(configName)
if err != nil {
return pkgerrors.Wrap(err, "Get Next Counter Value")
}
//This is done while Profile lock is taken
counter--
- err = c.updateVersion(counter)
+ err = c.updateVersion(configName, counter)
if err != nil {
return pkgerrors.Wrap(err, "Store Next Counter Value")
}
@@ -377,45 +419,87 @@ func (c ConfigVersionStore) decrementVersion() error {
return nil
}
+// Get tag version
+func (c ConfigVersionStore) getTagVersion(configName, tagNameValue string) (string, error) {
+ rbName, rbVersion, profileName, _, err := resolveModelFromInstance(c.instanceID)
+ if err != nil {
+ return "", pkgerrors.Wrap(err, "Retrieving model info")
+ }
+ tagKey := constructKey(rbName, rbVersion, profileName, c.instanceID, tagName, configName, tagNameValue)
+
+ value, err := db.Etcd.Get(tagKey)
+ if err != nil {
+ return "", pkgerrors.Wrap(err, "Config DB Entry Not found")
+ }
+ return string(value), nil
+}
+
+// Tag current version
+func (c ConfigVersionStore) tagCurrentVersion(configName, tagNameValue string) error {
+ currentVersion, err := c.getCurrentVersion(configName)
+ if err != nil {
+ return pkgerrors.Wrap(err, "Get Current Config Version ")
+ }
+ rbName, rbVersion, profileName, _, err := resolveModelFromInstance(c.instanceID)
+ if err != nil {
+ return pkgerrors.Wrap(err, "Retrieving model info")
+ }
+ tagKey := constructKey(rbName, rbVersion, profileName, c.instanceID, tagName, configName, tagNameValue)
+
+ err = db.Etcd.Put(tagKey, strconv.Itoa(int(currentVersion)))
+ if err != nil {
+ return pkgerrors.Wrap(err, "TagIt store DB")
+ }
+ return nil
+}
+
// Apply Config
-func applyConfig(instanceID string, p Config, pChannel chan configResourceList, action string) error {
+func applyConfig(instanceID string, p Config, pChannel chan configResourceList, action string, resources []helm.KubernetesResource) ([]helm.KubernetesResource, error) {
rbName, rbVersion, profileName, releaseName, err := resolveModelFromInstance(instanceID)
if err != nil {
- return pkgerrors.Wrap(err, "Retrieving model info")
+ return []helm.KubernetesResource{}, pkgerrors.Wrap(err, "Retrieving model info")
}
// Get Template and Resolve the template with values
crl, err := resolve(rbName, rbVersion, profileName, p, releaseName)
if err != nil {
- return pkgerrors.Wrap(err, "Resolve Config")
+ return []helm.KubernetesResource{}, pkgerrors.Wrap(err, "Resolve Config")
}
+ var updatedResources (chan []helm.KubernetesResource) = make(chan []helm.KubernetesResource)
crl.action = action
+ crl.resources = resources
+ crl.updatedResources = updatedResources
// Send the configResourceList to the channel. Using select for non-blocking channel
+ log.Printf("Before Sent to goroutine %v", crl.profile)
select {
case pChannel <- crl:
log.Printf("Message Sent to goroutine %v", crl.profile)
default:
}
- return nil
+ var resultResources []helm.KubernetesResource = <-updatedResources
+ return resultResources, nil
}
// Per Profile Go routine to apply the configuration to Cloud Region
func scheduleResources(c chan configResourceList) {
// Keep thread running
+ log.Printf("[scheduleResources]: START thread")
for {
data := <-c
//TODO: ADD Check to see if Application running
ic := NewInstanceClient()
resp, err := ic.Find(data.profile.RBName, data.profile.RBVersion, data.profile.ProfileName, nil)
- if err != nil || len(resp) == 0 {
+ if (err != nil || len(resp) == 0) && data.action != "STOP" {
log.Println("Error finding a running instance. Retrying later...")
- time.Sleep(time.Second * 10)
+ data.updatedResources <- []helm.KubernetesResource{}
continue
}
+ breakThread := false
switch {
case data.action == "POST":
log.Printf("[scheduleResources]: POST %v %v", data.profile, data.resourceTemplates)
+ var resources []helm.KubernetesResource
for _, inst := range resp {
k8sClient := KubernetesClient{}
err = k8sClient.Init(inst.Request.CloudRegion, inst.ID)
@@ -425,11 +509,11 @@ func scheduleResources(c chan configResourceList) {
continue
}
//assuming - the resource is not exist already
- data.createdResources, err = k8sClient.createResources(data.resourceTemplates, inst.Namespace)
+ resources, err = k8sClient.createResources(data.resourceTemplates, inst.Namespace)
errCreate := err
if err != nil {
// assuming - the err represent the resource is already exist, so going for update
- data.createdResources, err = k8sClient.updateResources(data.resourceTemplates, inst.Namespace)
+ resources, err = k8sClient.updateResources(data.resourceTemplates, inst.Namespace)
if err != nil {
log.Printf("Error Creating resources: %s", errCreate.Error())
log.Printf("Error Updating resources: %s", err.Error())
@@ -437,12 +521,28 @@ func scheduleResources(c chan configResourceList) {
}
}
}
- //TODO: Needs to add code to call Kubectl create
+ data.updatedResources <- resources
case data.action == "PUT":
log.Printf("[scheduleResources]: PUT %v %v", data.profile, data.resourceTemplates)
- //TODO: Needs to add code to call Kubectl apply
+ var resources []helm.KubernetesResource
+ for _, inst := range resp {
+ k8sClient := KubernetesClient{}
+ err = k8sClient.Init(inst.Request.CloudRegion, inst.ID)
+ if err != nil {
+ log.Printf("Getting CloudRegion Information: %s", err.Error())
+ //Move onto the next cloud region
+ continue
+ }
+
+ resources, err = k8sClient.updateResources(data.resourceTemplates, inst.Namespace)
+ if err != nil {
+ log.Printf("Error Updating resources: %s", err.Error())
+ continue
+ }
+ }
+ data.updatedResources <- resources
case data.action == "DELETE":
- log.Printf("[scheduleResources]: DELETE %v %v", data.profile, data.resourceTemplates)
+ log.Printf("[scheduleResources]: DELETE %v %v", data.profile, data.resources)
for _, inst := range resp {
k8sClient := KubernetesClient{}
err = k8sClient.Init(inst.Request.CloudRegion, inst.ID)
@@ -451,14 +551,22 @@ func scheduleResources(c chan configResourceList) {
//Move onto the next cloud region
continue
}
- err = k8sClient.deleteResources(data.createdResources, inst.Namespace)
+ err = k8sClient.deleteResources(data.resources, inst.Namespace)
if err != nil {
log.Printf("Error Deleting resources: %s", err.Error())
continue
}
}
+ data.updatedResources <- []helm.KubernetesResource{}
+
+ case data.action == "STOP":
+ breakThread = true
+ }
+ if breakThread {
+ break
}
}
+ log.Printf("[scheduleResources]: STOP thread")
}
//Resolve returns the path where the helm chart merged with
@@ -549,6 +657,25 @@ func getProfileData(key string) (*sync.Mutex, chan configResourceList) {
if !ok {
profileData.resourceChannel[key] = make(chan configResourceList)
go scheduleResources(profileData.resourceChannel[key])
+ time.Sleep(time.Second * 5)
}
return profileData.profileLockMap[key], profileData.resourceChannel[key]
}
+
+func removeProfileData(key string) {
+ profileData.Lock()
+ defer profileData.Unlock()
+ _, ok := profileData.profileLockMap[key]
+ if ok {
+ delete(profileData.profileLockMap, key)
+ }
+ _, ok = profileData.resourceChannel[key]
+ if ok {
+ log.Printf("Stop config thread for %s", key)
+ crl := configResourceList{
+ action: "STOP",
+ }
+ profileData.resourceChannel[key] <- crl
+ delete(profileData.resourceChannel, key)
+ }
+}
diff --git a/src/k8splugin/internal/app/config_test.go b/src/k8splugin/internal/app/config_test.go
index 9ee96881..0cc3c3ce 100644
--- a/src/k8splugin/internal/app/config_test.go
+++ b/src/k8splugin/internal/app/config_test.go
@@ -293,7 +293,7 @@ func TestRollbackConfig(t *testing.T) {
}
}
testCase.rollbackConfig.AnyOf.ConfigVersion = "2"
- err = impl.Rollback(testCase.instanceID, testCase.rollbackConfig)
+ err = impl.Rollback(testCase.instanceID, testCase.inp.ConfigName, testCase.rollbackConfig)
if err != nil {
if testCase.expectedError == "" {
t.Fatalf("Create returned an unexpected error %s", err)
@@ -319,3 +319,6 @@ func TestRollbackConfig(t *testing.T) {
})
}
}
+
+func main() {
+}
diff --git a/src/k8splugin/internal/app/instance.go b/src/k8splugin/internal/app/instance.go
index ad36aaa5..b7f382ad 100644
--- a/src/k8splugin/internal/app/instance.go
+++ b/src/k8splugin/internal/app/instance.go
@@ -709,7 +709,8 @@ func (v *InstanceClient) Delete(id string) error {
return nil
} else if inst.Status != "DONE" {
//Recover is ongoing, do nothing here
- return nil
+ //return nil
+ //TODO: implement recovery
}
k8sClient := KubernetesClient{}
@@ -717,6 +718,7 @@ func (v *InstanceClient) Delete(id string) error {
if err != nil {
return pkgerrors.Wrap(err, "Getting CloudRegion Information")
}
+
inst.Status = "PRE-DELETE"
inst.HookProgress = ""
err = db.DBconn.Update(v.storeName, key, v.tagInst, inst)
@@ -743,6 +745,13 @@ func (v *InstanceClient) Delete(id string) error {
if err != nil {
log.Printf("Update Instance DB Entry for release %s has error.", inst.ReleaseName)
}
+
+ configClient := NewConfigClient()
+ err = configClient.Cleanup(id)
+ if err != nil {
+ return pkgerrors.Wrap(err, "Cleanup Config Resources")
+ }
+
err = k8sClient.deleteResources(inst.Resources, inst.Namespace)
if err != nil {
return pkgerrors.Wrap(err, "Deleting Instance Resources")