From 401aba14c5f5e55480afb491af2bf953cabc6ac2 Mon Sep 17 00:00:00 2001 From: Kiran Kamineni Date: Wed, 15 May 2019 16:47:39 -0700 Subject: Move config to app and connect to instance Move config instantiation to app and connect it to the instance to allow updates and so on. Issue-ID: MULTICLOUD-464 Change-Id: Ic994ef78a6e0d2db5e695e33b7b8a302c74c10da Signed-off-by: Kiran Kamineni --- src/k8splugin/api/api.go | 4 +- src/k8splugin/api/confighandler.go | 15 +- src/k8splugin/internal/app/config.go | 438 ++++++++++++++++++++++++ src/k8splugin/internal/app/config_backend.go | 475 +++++++++++++++++++++++++++ src/k8splugin/internal/app/config_test.go | 259 +++++++++++++++ src/k8splugin/internal/rb/config.go | 436 ------------------------ src/k8splugin/internal/rb/config_backend.go | 439 ------------------------- src/k8splugin/internal/rb/config_test.go | 259 --------------- 8 files changed, 1182 insertions(+), 1143 deletions(-) create mode 100644 src/k8splugin/internal/app/config.go create mode 100644 src/k8splugin/internal/app/config_backend.go create mode 100644 src/k8splugin/internal/app/config_test.go delete mode 100644 src/k8splugin/internal/rb/config.go delete mode 100644 src/k8splugin/internal/rb/config_backend.go delete mode 100644 src/k8splugin/internal/rb/config_test.go diff --git a/src/k8splugin/api/api.go b/src/k8splugin/api/api.go index 5fed28a0..0ddbcf83 100644 --- a/src/k8splugin/api/api.go +++ b/src/k8splugin/api/api.go @@ -25,7 +25,7 @@ import ( func NewRouter(defClient rb.DefinitionManager, profileClient rb.ProfileManager, instClient app.InstanceManager, - configClient rb.ConfigManager, + configClient app.ConfigManager, templateClient rb.ConfigTemplateManager) *mux.Router { router := mux.NewRouter() @@ -91,7 +91,7 @@ func NewRouter(defClient rb.DefinitionManager, // Config value if configClient == nil { - configClient = rb.NewConfigClient() + configClient = app.NewConfigClient() } configHandler := rbConfigHandler{client: configClient} resRouter.HandleFunc("/definition/{rbname}/{rbversion}/profile/{prname}/config", configHandler.createHandler).Methods("POST") diff --git a/src/k8splugin/api/confighandler.go b/src/k8splugin/api/confighandler.go index 93098d61..9bd9db83 100644 --- a/src/k8splugin/api/confighandler.go +++ b/src/k8splugin/api/confighandler.go @@ -18,9 +18,10 @@ package api import ( "encoding/json" - "k8splugin/internal/rb" "net/http" + "k8splugin/internal/app" + "github.com/gorilla/mux" ) @@ -29,12 +30,12 @@ import ( type rbConfigHandler struct { // Interface that implements bundle Definition operations // We will set this variable with a mock interface for testing - client rb.ConfigManager + client app.ConfigManager } // createHandler handles creation of the definition entry in the database func (h rbConfigHandler) createHandler(w http.ResponseWriter, r *http.Request) { - var p rb.Config + var p app.Config vars := mux.Vars(r) rbName := vars["rbname"] rbVersion := vars["rbversion"] @@ -73,7 +74,7 @@ func (h rbConfigHandler) createHandler(w http.ResponseWriter, r *http.Request) { } // getHandler handles GET operations on a particular config -// Returns a rb.Definition +// Returns a app.Definition func (h rbConfigHandler) getHandler(w http.ResponseWriter, r *http.Request) { vars := mux.Vars(r) rbName := vars["rbname"] @@ -128,7 +129,7 @@ func (h rbConfigHandler) updateHandler(w http.ResponseWriter, r *http.Request) { prName := vars["prname"] cfgName := vars["cfgname"] - var p rb.Config + var p app.Config if r.Body == nil { http.Error(w, "Empty body", http.StatusBadRequest) @@ -168,7 +169,7 @@ func (h rbConfigHandler) rollbackHandler(w http.ResponseWriter, r *http.Request) return } - var p rb.ConfigRollback + var p app.ConfigRollback err := json.NewDecoder(r.Body).Decode(&p) if err != nil { http.Error(w, err.Error(), http.StatusUnprocessableEntity) @@ -194,7 +195,7 @@ func (h rbConfigHandler) tagitHandler(w http.ResponseWriter, r *http.Request) { return } - var p rb.ConfigTagit + var p app.ConfigTagit err := json.NewDecoder(r.Body).Decode(&p) if err != nil { http.Error(w, err.Error(), http.StatusUnprocessableEntity) diff --git a/src/k8splugin/internal/app/config.go b/src/k8splugin/internal/app/config.go new file mode 100644 index 00000000..f7e81358 --- /dev/null +++ b/src/k8splugin/internal/app/config.go @@ -0,0 +1,438 @@ +/* + * Copyright 2018 Intel Corporation, Inc + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package app + +import ( + "strconv" + "strings" + + "k8splugin/internal/db" + + pkgerrors "github.com/pkg/errors" +) + +// Config contains the parameters needed for configuration +type Config struct { + ConfigName string `json:"config-name"` + TemplateName string `json:"template-name"` + Description string `json:"description"` + Values map[string]interface{} `json:"values"` +} + +//ConfigResult output for Create, Update and delete +type ConfigResult struct { + DefinitionName string `json:"rb-name"` + DefinitionVersion string `json:"rb-version"` + ProfileName string `json:"profile-name"` + ConfigName string `json:"config-name"` + TemplateName string `json:"template-name"` + ConfigVersion uint `json:"config-verion"` +} + +//ConfigRollback input +type ConfigRollback struct { + AnyOf struct { + ConfigVersion string `json:"config-version,omitempty"` + ConfigTag string `json:"config-tag,omitempty"` + } `json:"anyOf"` +} + +//ConfigTagit for Tagging configurations +type ConfigTagit struct { + TagName string `json:"tag-name"` +} + +// ConfigManager is an interface exposes the config functionality +type ConfigManager interface { + Create(rbName, rbVersion, profileName string, p Config) (ConfigResult, error) + Get(rbName, rbVersion, profileName, configName string) (Config, error) + Help() map[string]string + Update(rbName, rbVersion, profileName, configName string, p Config) (ConfigResult, error) + Delete(rbName, rbVersion, profileName, configName string) (ConfigResult, error) + Rollback(rbName, rbVersion, profileName string, p ConfigRollback) error + Tagit(rbName, rbVersion, profileName string, p ConfigTagit) error +} + +// ConfigClient implements the ConfigManager +// It will also be used to maintain some localized state +type ConfigClient struct { + tagTag string +} + +// NewConfigClient returns an instance of the ConfigClient +// which implements the ConfigManager +func NewConfigClient() *ConfigClient { + return &ConfigClient{ + tagTag: "tag", + } +} + +// Help returns some information on how to create the content +// for the config in the form of html formatted page +func (v *ConfigClient) Help() map[string]string { + ret := make(map[string]string) + + return ret +} + +// Create an entry for the config in the database +func (v *ConfigClient) Create(rbName, rbVersion, profileName string, p Config) (ConfigResult, error) { + + // Check required fields + if p.ConfigName == "" || p.TemplateName == "" || len(p.Values) == 0 { + return ConfigResult{}, pkgerrors.New("Incomplete Configuration Provided") + } + cs := ConfigStore{ + rbName: rbName, + rbVersion: rbVersion, + profileName: profileName, + configName: p.ConfigName, + } + _, err := cs.getConfig() + if err == nil { + return ConfigResult{}, pkgerrors.Wrap(err, "Create Error - Config exists") + } else { + if strings.Contains(err.Error(), "Key doesn't exist") == false { + return ConfigResult{}, pkgerrors.Wrap(err, "Create Error") + } + } + lock, profileChannel := getProfileData(rbName + rbVersion + profileName) + // Acquire per profile Mutex + lock.Lock() + defer lock.Unlock() + err = applyConfig(rbName, rbVersion, profileName, p, profileChannel, "POST") + if err != nil { + return ConfigResult{}, pkgerrors.Wrap(err, "Apply Config failed") + } + // Create Config DB Entry + err = cs.createConfig(p) + if err != nil { + return ConfigResult{}, pkgerrors.Wrap(err, "Create Config DB Entry") + } + // Create Version Entry in DB for Config + cvs := ConfigVersionStore{ + rbName: rbName, + rbVersion: rbVersion, + profileName: profileName, + } + version, err := cvs.createConfigVersion(p, Config{}, "POST") + if err != nil { + return ConfigResult{}, pkgerrors.Wrap(err, "Create Config Version DB Entry") + } + // Create Result structure + cfgRes := ConfigResult{ + DefinitionName: rbName, + DefinitionVersion: rbVersion, + ProfileName: profileName, + ConfigName: p.ConfigName, + TemplateName: p.TemplateName, + ConfigVersion: version, + } + return cfgRes, nil +} + +// Update an entry for the config in the database +func (v *ConfigClient) Update(rbName, rbVersion, profileName, configName string, p Config) (ConfigResult, error) { + + // Check required fields + if len(p.Values) == 0 { + return ConfigResult{}, pkgerrors.New("Incomplete Configuration Provided") + } + // Check if Config exists + cs := ConfigStore{ + rbName: rbName, + rbVersion: rbVersion, + profileName: profileName, + configName: configName, + } + _, err := cs.getConfig() + if err != nil { + return ConfigResult{}, pkgerrors.Wrap(err, "Update Error - Config doesn't exist") + } + lock, profileChannel := getProfileData(rbName + rbVersion + profileName) + // Acquire per profile Mutex + lock.Lock() + defer lock.Unlock() + err = applyConfig(rbName, rbVersion, profileName, p, profileChannel, "PUT") + if err != nil { + return ConfigResult{}, pkgerrors.Wrap(err, "Apply Config failed") + } + // Update Config DB Entry + configPrev, err := cs.updateConfig(p) + if err != nil { + return ConfigResult{}, pkgerrors.Wrap(err, "Update Config DB Entry") + } + // Create Version Entry in DB for Config + cvs := ConfigVersionStore{ + rbName: rbName, + rbVersion: rbVersion, + profileName: profileName, + } + version, err := cvs.createConfigVersion(p, configPrev, "PUT") + if err != nil { + return ConfigResult{}, pkgerrors.Wrap(err, "Create Config Version DB Entry") + } + // Create Result structure + cfgRes := ConfigResult{ + DefinitionName: rbName, + DefinitionVersion: rbVersion, + ProfileName: profileName, + ConfigName: p.ConfigName, + TemplateName: p.TemplateName, + ConfigVersion: version, + } + return cfgRes, nil +} + +// Get config entry in the database +func (v *ConfigClient) Get(rbName, rbVersion, profileName, configName string) (Config, error) { + + // Acquire per profile Mutex + lock, _ := getProfileData(rbName + rbVersion + profileName) + lock.Lock() + defer lock.Unlock() + // Read Config DB + cs := ConfigStore{ + rbName: rbName, + rbVersion: rbVersion, + profileName: profileName, + configName: configName, + } + cfg, err := cs.getConfig() + if err != nil { + return Config{}, pkgerrors.Wrap(err, "Get Config DB Entry") + } + return cfg, nil +} + +// Delete the Config from database +func (v *ConfigClient) Delete(rbName, rbVersion, profileName, configName string) (ConfigResult, error) { + + // Check if Config exists + cs := ConfigStore{ + rbName: rbName, + rbVersion: rbVersion, + profileName: profileName, + configName: configName, + } + p, err := cs.getConfig() + if err != nil { + return ConfigResult{}, pkgerrors.Wrap(err, "Update Error - Config doesn't exist") + } + lock, profileChannel := getProfileData(rbName + rbVersion + profileName) + // Acquire per profile Mutex + lock.Lock() + defer lock.Unlock() + err = applyConfig(rbName, rbVersion, profileName, p, profileChannel, "DELETE") + if err != nil { + return ConfigResult{}, pkgerrors.Wrap(err, "Apply Config failed") + } + // Delete Config from DB + configPrev, err := cs.deleteConfig() + if err != nil { + return ConfigResult{}, pkgerrors.Wrap(err, "Delete Config DB Entry") + } + // Create Version Entry in DB for Config + cvs := ConfigVersionStore{ + rbName: rbName, + rbVersion: rbVersion, + profileName: profileName, + } + version, err := cvs.createConfigVersion(Config{}, configPrev, "DELETE") + if err != nil { + return ConfigResult{}, pkgerrors.Wrap(err, "Delete Config Version DB Entry") + } + // Create Result structure + cfgRes := ConfigResult{ + DefinitionName: rbName, + DefinitionVersion: rbVersion, + ProfileName: profileName, + ConfigName: configName, + TemplateName: configPrev.TemplateName, + ConfigVersion: version, + } + return cfgRes, nil +} + +// Rollback starts from current version and rollbacks to the version desired +func (v *ConfigClient) Rollback(rbName, rbVersion, profileName string, rback ConfigRollback) error { + + var reqVersion string + var err error + + if rback.AnyOf.ConfigTag != "" { + reqVersion, err = v.GetTagVersion(rbName, rbVersion, profileName, rback.AnyOf.ConfigTag) + if err != nil { + return pkgerrors.Wrap(err, "Rollback Invalid tag") + } + } else if rback.AnyOf.ConfigVersion != "" { + reqVersion = rback.AnyOf.ConfigVersion + } else { + return pkgerrors.Errorf("No valid Index for Rollback") + } + + index, err := strconv.Atoi(reqVersion) + if err != nil { + return pkgerrors.Wrap(err, "Rollback Invalid Index") + } + rollbackIndex := uint(index) + + lock, profileChannel := getProfileData(rbName + rbVersion + profileName) + // Acquire per profile Mutex + lock.Lock() + defer lock.Unlock() + + cvs := ConfigVersionStore{ + rbName: rbName, + rbVersion: rbVersion, + profileName: profileName, + } + currentVersion, err := cvs.getCurrentVersion() + if err != nil { + return pkgerrors.Wrap(err, "Rollback Get Current Config Version ") + } + + if rollbackIndex < 1 && rollbackIndex >= currentVersion { + return pkgerrors.Wrap(err, "Rollback Invalid Config Version") + } + + //Rollback all the intermettinent configurations + for i := currentVersion; i > rollbackIndex; i-- { + configNew, configPrev, action, err := cvs.getConfigVersion(i) + if err != nil { + return pkgerrors.Wrap(err, "Rollback Get Config Version") + } + cs := ConfigStore{ + rbName: rbName, + rbVersion: rbVersion, + profileName: profileName, + configName: configNew.ConfigName, + } + if action == "PUT" { + // PUT is proceeded by PUT or POST + err = applyConfig(rbName, rbVersion, profileName, configPrev, profileChannel, "PUT") + if err != nil { + return pkgerrors.Wrap(err, "Apply Config failed") + } + _, err = cs.updateConfig(configPrev) + if err != nil { + return pkgerrors.Wrap(err, "Update Config DB Entry") + } + } else if action == "POST" { + // POST is always preceeded by Config not existing + err = applyConfig(rbName, rbVersion, profileName, configNew, profileChannel, "DELETE") + if err != nil { + return pkgerrors.Wrap(err, "Delete Config failed") + } + _, err = cs.deleteConfig() + if err != nil { + return pkgerrors.Wrap(err, "Delete Config DB Entry") + } + } else if action == "DELETE" { + // DELETE is proceeded by PUT or POST + err = applyConfig(rbName, rbVersion, profileName, configPrev, profileChannel, "PUT") + if err != nil { + return pkgerrors.Wrap(err, "Delete Config failed") + } + _, err = cs.updateConfig(configPrev) + if err != nil { + return pkgerrors.Wrap(err, "Update Config DB Entry") + } + } + } + for i := currentVersion; i > rollbackIndex; i-- { + // Delete rolled back items + err = cvs.deleteConfigVersion() + if err != nil { + return pkgerrors.Wrap(err, "Delete Config Version ") + } + } + return nil +} + +// Tagit tags the current version with the tag provided +func (v *ConfigClient) Tagit(rbName, rbVersion, profileName string, tag ConfigTagit) error { + + lock, _ := getProfileData(rbName + rbVersion + profileName) + // Acquire per profile Mutex + lock.Lock() + defer lock.Unlock() + + cvs := ConfigVersionStore{ + rbName: rbName, + rbVersion: rbVersion, + profileName: profileName, + } + currentVersion, err := cvs.getCurrentVersion() + if err != nil { + return pkgerrors.Wrap(err, "Get Current Config Version ") + } + tagKey := constructKey(rbName, rbVersion, profileName, v.tagTag, tag.TagName) + + err = db.Etcd.Put(tagKey, strconv.Itoa(int(currentVersion))) + if err != nil { + return pkgerrors.Wrap(err, "TagIt store DB") + } + return nil +} + +// GetTagVersion returns the version associated with the tag +func (v *ConfigClient) GetTagVersion(rbName, rbVersion, profileName, tagName string) (string, error) { + + tagKey := constructKey(rbName, rbVersion, profileName, v.tagTag, tagName) + + value, err := db.Etcd.Get(tagKey) + if err != nil { + return "", pkgerrors.Wrap(err, "Config DB Entry Not found") + } + return string(value), nil +} + +// ApplyAllConfig starts from first configuration version and applies all versions in sequence +func (v *ConfigClient) ApplyAllConfig(rbName, rbVersion, profileName string) error { + + lock, profileChannel := getProfileData(rbName + rbVersion + profileName) + // Acquire per profile Mutex + lock.Lock() + defer lock.Unlock() + + cvs := ConfigVersionStore{ + rbName: rbName, + rbVersion: rbVersion, + profileName: profileName, + } + currentVersion, err := cvs.getCurrentVersion() + if err != nil { + return pkgerrors.Wrap(err, "Get Current Config Version ") + } + if currentVersion < 1 { + return pkgerrors.Wrap(err, "No Config Version to Apply") + } + //Apply all configurations + var i uint + for i = 1; i <= currentVersion; i++ { + configNew, _, action, err := cvs.getConfigVersion(i) + if err != nil { + return pkgerrors.Wrap(err, "Get Config Version") + } + err = applyConfig(rbName, rbVersion, profileName, configNew, profileChannel, action) + if err != nil { + return pkgerrors.Wrap(err, "Apply Config failed") + } + } + return nil +} diff --git a/src/k8splugin/internal/app/config_backend.go b/src/k8splugin/internal/app/config_backend.go new file mode 100644 index 00000000..763aed0d --- /dev/null +++ b/src/k8splugin/internal/app/config_backend.go @@ -0,0 +1,475 @@ +/* + * Copyright 2018 Intel Corporation, Inc + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package app + +import ( + "bytes" + "encoding/json" + "io/ioutil" + "log" + "path/filepath" + "strconv" + "strings" + "sync" + "time" + + "k8splugin/internal/db" + "k8splugin/internal/helm" + "k8splugin/internal/rb" + + "github.com/ghodss/yaml" + pkgerrors "github.com/pkg/errors" +) + +//ConfigStore contains the values that will be stored in the database +type configVersionDBContent struct { + ConfigNew Config `json:"config-new"` + ConfigPrev Config `json:"config-prev"` + Action string `json:"action"` // CRUD opration for this config +} + +//ConfigStore to Store the Config +type ConfigStore struct { + rbName string + rbVersion string + profileName string + configName string +} + +//ConfigVersionStore to Store the Versions of the Config +type ConfigVersionStore struct { + rbName string + rbVersion string + profileName string +} + +type configResourceList struct { + resourceTemplates []helm.KubernetesResourceTemplate + createdResources []helm.KubernetesResource + profile rb.Profile + action string +} + +type profileDataManager struct { + profileLockMap map[string]*sync.Mutex + resourceChannel map[string](chan configResourceList) + sync.Mutex +} + +const ( + storeName = "config" + tagCounter = "counter" + tagVersion = "configversion" + tagConfig = "configdata" +) + +var profileData = profileDataManager{ + profileLockMap: map[string]*sync.Mutex{}, + resourceChannel: map[string]chan configResourceList{}, +} + +// Construct key for storing data +func constructKey(strs ...string) string { + + var sb strings.Builder + sb.WriteString("onapk8s") + sb.WriteString("/") + sb.WriteString(storeName) + sb.WriteString("/") + for _, str := range strs { + sb.WriteString(str) + sb.WriteString("/") + } + return sb.String() + +} + +// Create an entry for the config in the database +func (c ConfigStore) createConfig(p Config) error { + + cfgKey := constructKey(c.rbName, c.rbVersion, c.profileName, tagConfig, p.ConfigName) + _, err := db.Etcd.Get(cfgKey) + if err == nil { + return pkgerrors.Wrap(err, "Config DB Entry Already exists") + } + configValue, err := db.Serialize(p) + if err != nil { + return pkgerrors.Wrap(err, "Serialize Config Value") + } + err = db.Etcd.Put(cfgKey, configValue) + if err != nil { + return pkgerrors.Wrap(err, "Config DB Entry") + } + return nil +} + +// Update the config entry in the database. Updates with the new value +// Returns the previous value of the Config +func (c ConfigStore) updateConfig(p Config) (Config, error) { + + cfgKey := constructKey(c.rbName, c.rbVersion, c.profileName, tagConfig, p.ConfigName) + value, err := db.Etcd.Get(cfgKey) + configPrev := Config{} + if err == nil { + // If updating Config after rollback then previous config may not exist + err = db.DeSerialize(string(value), &configPrev) + if err != nil { + return Config{}, pkgerrors.Wrap(err, "DeSerialize Config Value") + } + } + configValue, err := db.Serialize(p) + if err != nil { + return Config{}, pkgerrors.Wrap(err, "Serialize Config Value") + } + err = db.Etcd.Put(cfgKey, configValue) + if err != nil { + return Config{}, pkgerrors.Wrap(err, "Config DB Entry") + } + return configPrev, nil +} + +// Read the config entry in the database +func (c ConfigStore) getConfig() (Config, error) { + cfgKey := constructKey(c.rbName, c.rbVersion, c.profileName, tagConfig, c.configName) + value, err := db.Etcd.Get(cfgKey) + if err != nil { + return Config{}, pkgerrors.Wrap(err, "Get Config DB Entry") + } + //value is a byte array + if value != nil { + cfg := Config{} + err = db.DeSerialize(string(value), &cfg) + if err != nil { + return Config{}, pkgerrors.Wrap(err, "Unmarshaling Config Value") + } + return cfg, nil + } + return Config{}, pkgerrors.Wrap(err, "Get Config DB Entry") +} + +// Delete the config entry in the database +func (c ConfigStore) deleteConfig() (Config, error) { + + cfgKey := constructKey(c.rbName, c.rbVersion, c.profileName, tagConfig, c.configName) + value, err := db.Etcd.Get(cfgKey) + if err != nil { + return Config{}, pkgerrors.Wrap(err, "Config DB Entry Not found") + } + configPrev := Config{} + err = db.DeSerialize(string(value), &configPrev) + if err != nil { + return Config{}, pkgerrors.Wrap(err, "DeSerialize Config Value") + } + + err = db.Etcd.Delete(cfgKey) + if err != nil { + return Config{}, pkgerrors.Wrap(err, "Config DB Entry") + } + return configPrev, nil +} + +// Create a version for the configuration. If previous config provided that is also stored +func (c ConfigVersionStore) createConfigVersion(configNew, configPrev Config, action string) (uint, error) { + + version, err := c.incrementVersion() + + if err != nil { + return 0, pkgerrors.Wrap(err, "Get Next Version") + } + versionKey := constructKey(c.rbName, c.rbVersion, c.profileName, tagVersion, strconv.Itoa(int(version))) + + var cs configVersionDBContent + cs.Action = action + cs.ConfigNew = configNew + cs.ConfigPrev = configPrev + + configValue, err := db.Serialize(cs) + if err != nil { + return 0, pkgerrors.Wrap(err, "Serialize Config Value") + } + err = db.Etcd.Put(versionKey, configValue) + if err != nil { + return 0, pkgerrors.Wrap(err, "Create Config DB Entry") + } + return version, nil +} + +// Delete current version of the configuration. Configuration always deleted from top +func (c ConfigVersionStore) deleteConfigVersion() error { + + counter, err := c.getCurrentVersion() + + if err != nil { + return pkgerrors.Wrap(err, "Get Next Version") + } + versionKey := constructKey(c.rbName, c.rbVersion, c.profileName, tagVersion, strconv.Itoa(int(counter))) + + err = db.Etcd.Delete(versionKey) + if err != nil { + return pkgerrors.Wrap(err, "Delete Config DB Entry") + } + err = c.decrementVersion() + if err != nil { + return pkgerrors.Wrap(err, "Decrement Version") + } + return nil +} + +// Read the specified version of the configuration and return its prev and current value. +// Also returns the action for the config version +func (c ConfigVersionStore) getConfigVersion(version uint) (Config, Config, string, error) { + + versionKey := constructKey(c.rbName, c.rbVersion, c.profileName, tagVersion, strconv.Itoa(int(version))) + configBytes, err := db.Etcd.Get(versionKey) + if err != nil { + return Config{}, Config{}, "", pkgerrors.Wrap(err, "Get Config Version ") + } + + if configBytes != nil { + pr := configVersionDBContent{} + err = db.DeSerialize(string(configBytes), &pr) + if err != nil { + return Config{}, Config{}, "", pkgerrors.Wrap(err, "DeSerialize Config Version") + } + return pr.ConfigNew, pr.ConfigPrev, pr.Action, nil + } + return Config{}, Config{}, "", pkgerrors.Wrap(err, "Invalid data ") +} + +// Get the counter for the version +func (c ConfigVersionStore) getCurrentVersion() (uint, error) { + + cfgKey := constructKey(c.rbName, c.rbVersion, c.profileName, tagCounter) + + value, err := db.Etcd.Get(cfgKey) + if err != nil { + if strings.Contains(err.Error(), "Key doesn't exist") == true { + // Counter not started yet, 0 is invalid value + return 0, nil + } else { + return 0, pkgerrors.Wrap(err, "Get Current Version") + } + } + + index, err := strconv.Atoi(string(value)) + if err != nil { + return 0, pkgerrors.Wrap(err, "Invalid counter") + } + return uint(index), nil +} + +// Update the counter for the version +func (c ConfigVersionStore) updateVersion(counter uint) error { + + cfgKey := constructKey(c.rbName, c.rbVersion, c.profileName, tagCounter) + err := db.Etcd.Put(cfgKey, strconv.Itoa(int(counter))) + if err != nil { + return pkgerrors.Wrap(err, "Counter DB Entry") + } + return nil +} + +// Increment the version counter +func (c ConfigVersionStore) incrementVersion() (uint, error) { + + counter, err := c.getCurrentVersion() + if err != nil { + return 0, pkgerrors.Wrap(err, "Get Next Counter Value") + } + //This is done while Profile lock is taken + counter++ + err = c.updateVersion(counter) + if err != nil { + return 0, pkgerrors.Wrap(err, "Store Next Counter Value") + } + + return counter, nil +} + +// Decrement the version counter +func (c ConfigVersionStore) decrementVersion() error { + + counter, err := c.getCurrentVersion() + if err != nil { + return pkgerrors.Wrap(err, "Get Next Counter Value") + } + //This is done while Profile lock is taken + counter-- + err = c.updateVersion(counter) + if err != nil { + return pkgerrors.Wrap(err, "Store Next Counter Value") + } + + return nil +} + +// Apply Config +func applyConfig(rbName, rbVersion, profileName string, p Config, pChannel chan configResourceList, action string) error { + + // Get Template and Resolve the template with values + crl, err := resolve(rbName, rbVersion, profileName, p) + if err != nil { + return pkgerrors.Wrap(err, "Resolve Config") + } + crl.action = action + // Send the configResourceList to the channel. Using select for non-blocking channel + select { + case pChannel <- crl: + log.Printf("Message Sent to goroutine %v", crl.profile) + default: + } + + return nil +} + +// Per Profile Go routine to apply the configuration to Cloud Region +func scheduleResources(c chan configResourceList) { + // Keep thread running + for { + data := <-c + //TODO: ADD Check to see if Application running + ic := NewInstanceClient() + resp, err := ic.Find(data.profile.RBName, data.profile.RBVersion, data.profile.ProfileName) + if err != nil || len(resp) == 0 { + log.Println("Error finding a running instance. Retrying later...") + time.Sleep(time.Second * 10) + continue + } + switch { + case data.action == "POST": + log.Printf("[scheduleResources]: POST %v %v", data.profile, data.resourceTemplates) + for _, inst := range resp { + k8sClient := KubernetesClient{} + err = k8sClient.init(inst.CloudRegion) + if err != nil { + log.Printf("Getting CloudRegion Information: %s", err.Error()) + //Move onto the next cloud region + continue + } + data.createdResources, err = k8sClient.createResources(data.resourceTemplates, inst.Namespace) + if err != nil { + log.Printf("Error Creating resources: %s", err.Error()) + continue + } + } + //TODO: Needs to add code to call Kubectl create + case data.action == "PUT": + log.Printf("[scheduleResources]: PUT %v %v", data.profile, data.resourceTemplates) + //TODO: Needs to add code to call Kubectl apply + case data.action == "DELETE": + log.Printf("[scheduleResources]: DELETE %v %v", data.profile, data.resourceTemplates) + for _, inst := range resp { + k8sClient := KubernetesClient{} + err = k8sClient.init(inst.CloudRegion) + if err != nil { + log.Printf("Getting CloudRegion Information: %s", err.Error()) + //Move onto the next cloud region + continue + } + err = k8sClient.deleteResources(data.createdResources, inst.Namespace) + if err != nil { + log.Printf("Error Deleting resources: %s", err.Error()) + continue + } + } + } + } +} + +//Resolve returns the path where the helm chart merged with +//configuration overrides resides. +var resolve = func(rbName, rbVersion, profileName string, p Config) (configResourceList, error) { + + var resTemplates []helm.KubernetesResourceTemplate + + profile, err := rb.NewProfileClient().Get(rbName, rbVersion, profileName) + if err != nil { + return configResourceList{}, pkgerrors.Wrap(err, "Reading Profile Data") + } + + t, err := rb.NewConfigTemplateClient().Get(rbName, rbVersion, p.TemplateName) + if err != nil { + return configResourceList{}, pkgerrors.Wrap(err, "Getting Template") + } + if t.ChartName == "" { + return configResourceList{}, pkgerrors.New("Invalid template no Chart.yaml file found") + } + + def, err := rb.NewConfigTemplateClient().Download(rbName, rbVersion, p.TemplateName) + if err != nil { + return configResourceList{}, pkgerrors.Wrap(err, "Downloading Template") + } + + //Create a temp file in the system temp folder for values input + b, err := json.Marshal(p.Values) + if err != nil { + return configResourceList{}, pkgerrors.Wrap(err, "Error Marshalling config data") + } + data, err := yaml.JSONToYAML(b) + if err != nil { + return configResourceList{}, pkgerrors.Wrap(err, "JSON to YAML") + } + + outputfile, err := ioutil.TempFile("", "helm-config-values-") + if err != nil { + return configResourceList{}, pkgerrors.Wrap(err, "Got error creating temp file") + } + _, err = outputfile.Write([]byte(data)) + if err != nil { + return configResourceList{}, pkgerrors.Wrap(err, "Got error writting temp file") + } + defer outputfile.Close() + + chartBasePath, err := rb.ExtractTarBall(bytes.NewBuffer(def)) + if err != nil { + return configResourceList{}, pkgerrors.Wrap(err, "Extracting Template") + } + + helmClient := helm.NewTemplateClient(profile.KubernetesVersion, + profile.Namespace, + profile.ReleaseName) + + chartPath := filepath.Join(chartBasePath, t.ChartName) + resTemplates, err = helmClient.GenerateKubernetesArtifacts(chartPath, + []string{outputfile.Name()}, + nil) + if err != nil { + return configResourceList{}, pkgerrors.Wrap(err, "Generate final k8s yaml") + } + crl := configResourceList{ + resourceTemplates: resTemplates, + profile: profile, + } + + return crl, nil +} + +// Get the Mutex for the Profile +func getProfileData(key string) (*sync.Mutex, chan configResourceList) { + profileData.Lock() + defer profileData.Unlock() + _, ok := profileData.profileLockMap[key] + if !ok { + profileData.profileLockMap[key] = &sync.Mutex{} + } + _, ok = profileData.resourceChannel[key] + if !ok { + profileData.resourceChannel[key] = make(chan configResourceList) + go scheduleResources(profileData.resourceChannel[key]) + } + return profileData.profileLockMap[key], profileData.resourceChannel[key] +} diff --git a/src/k8splugin/internal/app/config_test.go b/src/k8splugin/internal/app/config_test.go new file mode 100644 index 00000000..11a300ff --- /dev/null +++ b/src/k8splugin/internal/app/config_test.go @@ -0,0 +1,259 @@ +/* + * Copyright 2018 Intel Corporation, Inc + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package app + +import ( + "k8splugin/internal/db" + "reflect" + "strings" + "testing" + // pkgerrors "github.com/pkg/errors" +) + +func TestCreateConfig(t *testing.T) { + testCases := []struct { + label string + rbName string + rbVersion string + profileName string + inp Config + expectedError string + mockdb *db.MockEtcdClient + expected ConfigResult + }{ + { + label: "Create Config", + rbName: "testdef1", + rbVersion: "v1", + profileName: "testprofile1", + inp: Config{ + ConfigName: "testconfig1", + TemplateName: "testtemplate1", + Values: map[string]interface{}{ + "values": "{\"namespace\": \"kafka\", \"topic\": {\"name\":\"orders\", \"cluster\":\"my-cluster\", \"partitions\": 10,\"replicas\": 2, }}"}, + }, + expected: ConfigResult{ + DefinitionName: "testdef1", + DefinitionVersion: "v1", + ProfileName: "testprofile1", + ConfigName: "testconfig1", + TemplateName: "testtemplate1", + ConfigVersion: 1, + }, + expectedError: "", + mockdb: &db.MockEtcdClient{ + Items: nil, + Err: nil, + }, + }, + } + + for _, testCase := range testCases { + t.Run(testCase.label, func(t *testing.T) { + db.Etcd = testCase.mockdb + resolve = func(rbName, rbVersion, profileName string, p Config) (configResourceList, error) { + return configResourceList{}, nil + } + impl := NewConfigClient() + got, err := impl.Create(testCase.rbName, testCase.rbVersion, testCase.profileName, testCase.inp) + if err != nil { + if testCase.expectedError == "" { + t.Fatalf("Create returned an unexpected error %s", err) + } + if strings.Contains(err.Error(), testCase.expectedError) == false { + t.Fatalf("Create returned an unexpected error %s", err) + } + } else { + if reflect.DeepEqual(testCase.expected, got) == false { + t.Errorf("Create Resource Bundle returned unexpected body: got %v;"+ + " expected %v", got, testCase.expected) + } + } + }) + } +} + +func TestRollbackConfig(t *testing.T) { + testCases := []struct { + label string + rbName string + rbVersion string + profileName string + inp Config + inpUpdate1 Config + inpUpdate2 Config + expectedError string + mockdb *db.MockEtcdClient + expected1 ConfigResult + expected2 ConfigResult + expected3 ConfigResult + expected4 ConfigResult + rollbackConfig ConfigRollback + }{ + { + label: "Rollback Config", + rbName: "testdef1", + rbVersion: "v1", + profileName: "testprofile1", + inp: Config{ + ConfigName: "testconfig1", + TemplateName: "testtemplate1", + Values: map[string]interface{}{ + "values": "{\"namespace\": \"kafka\", \"topic\": {\"name\":\"orders\", \"cluster\":\"my-cluster\", \"partitions\": 10,\"replicas\": 2, }}"}, + }, + inpUpdate1: Config{ + ConfigName: "testconfig1", + TemplateName: "testtemplate1", + Values: map[string]interface{}{ + "values": "{\"namespace\": \"kafka\", \"topic\": {\"name\":\"orders\", \"cluster\":\"my-cluster\", \"partitions\": 20,\"replicas\": 2, }}"}, + }, + inpUpdate2: Config{ + ConfigName: "testconfig1", + TemplateName: "testtemplate1", + Values: map[string]interface{}{ + "values": "{\"namespace\": \"kafka\", \"topic\": {\"name\":\"orders\", \"cluster\":\"my-cluster\", \"partitions\": 30,\"replicas\": 2, }}"}, + }, + expected1: ConfigResult{ + DefinitionName: "testdef1", + DefinitionVersion: "v1", + ProfileName: "testprofile1", + ConfigName: "testconfig1", + TemplateName: "testtemplate1", + ConfigVersion: 1, + }, + expected2: ConfigResult{ + DefinitionName: "testdef1", + DefinitionVersion: "v1", + ProfileName: "testprofile1", + ConfigName: "testconfig1", + TemplateName: "testtemplate1", + ConfigVersion: 2, + }, + expected3: ConfigResult{ + DefinitionName: "testdef1", + DefinitionVersion: "v1", + ProfileName: "testprofile1", + ConfigName: "testconfig1", + TemplateName: "testtemplate1", + ConfigVersion: 3, + }, + expected4: ConfigResult{ + DefinitionName: "testdef1", + DefinitionVersion: "v1", + ProfileName: "testprofile1", + ConfigName: "testconfig1", + TemplateName: "testtemplate1", + ConfigVersion: 4, + }, + expectedError: "", + mockdb: &db.MockEtcdClient{ + Items: nil, + Err: nil, + }, + }, + } + + for _, testCase := range testCases { + t.Run(testCase.label, func(t *testing.T) { + db.Etcd = testCase.mockdb + resolve = func(rbName, rbVersion, profileName string, p Config) (configResourceList, error) { + return configResourceList{}, nil + } + impl := NewConfigClient() + got, err := impl.Create(testCase.rbName, testCase.rbVersion, testCase.profileName, testCase.inp) + if err != nil { + if testCase.expectedError == "" { + t.Fatalf("Create returned an unexpected error %s", err) + } + if strings.Contains(err.Error(), testCase.expectedError) == false { + t.Fatalf("Create returned an unexpected error %s", err) + } + } else { + if reflect.DeepEqual(testCase.expected1, got) == false { + t.Errorf("Create Resource Bundle returned unexpected body: got %v;"+ + " expected %v", got, testCase.expected1) + } + } + got, err = impl.Update(testCase.rbName, testCase.rbVersion, testCase.profileName, testCase.inp.ConfigName, testCase.inpUpdate1) + if err != nil { + if testCase.expectedError == "" { + t.Fatalf("Create returned an unexpected error %s", err) + } + if strings.Contains(err.Error(), testCase.expectedError) == false { + t.Fatalf("Create returned an unexpected error %s", err) + } + } else { + if reflect.DeepEqual(testCase.expected2, got) == false { + t.Errorf("Create Resource Bundle returned unexpected body: got %v;"+ + " expected %v", got, testCase.expected2) + } + } + got, err = impl.Update(testCase.rbName, testCase.rbVersion, testCase.profileName, testCase.inp.ConfigName, testCase.inpUpdate2) + if err != nil { + if testCase.expectedError == "" { + t.Fatalf("Create returned an unexpected error %s", err) + } + if strings.Contains(err.Error(), testCase.expectedError) == false { + t.Fatalf("Create returned an unexpected error %s", err) + } + } else { + if reflect.DeepEqual(testCase.expected3, got) == false { + t.Errorf("Create Resource Bundle returned unexpected body: got %v;"+ + " expected %v", got, testCase.expected3) + } + } + got, err = impl.Delete(testCase.rbName, testCase.rbVersion, testCase.profileName, testCase.inp.ConfigName) + if err != nil { + if testCase.expectedError == "" { + t.Fatalf("Create returned an unexpected error %s", err) + } + if strings.Contains(err.Error(), testCase.expectedError) == false { + t.Fatalf("Create returned an unexpected error %s", err) + } + } else { + if reflect.DeepEqual(testCase.expected4, got) == false { + t.Errorf("Create Resource Bundle returned unexpected body: got %v;"+ + " expected %v", got, testCase.expected4) + } + } + testCase.rollbackConfig.AnyOf.ConfigVersion = "2" + err = impl.Rollback(testCase.rbName, testCase.rbVersion, testCase.profileName, testCase.rollbackConfig) + if err != nil { + if testCase.expectedError == "" { + t.Fatalf("Create returned an unexpected error %s", err) + } + if strings.Contains(err.Error(), testCase.expectedError) == false { + t.Fatalf("Create returned an unexpected error %s", err) + } + } + rollbackConfig, err := impl.Get(testCase.rbName, testCase.rbVersion, testCase.profileName, testCase.inp.ConfigName) + if err != nil { + if testCase.expectedError == "" { + t.Fatalf("Create returned an unexpected error %s", err) + } + if strings.Contains(err.Error(), testCase.expectedError) == false { + t.Fatalf("Create returned an unexpected error %s", err) + } + } else { + if reflect.DeepEqual(testCase.inpUpdate1, rollbackConfig) == false { + t.Errorf("Rollback config failed: got %v;"+ + " expected %v", rollbackConfig, testCase.inpUpdate1) + } + } + }) + } +} diff --git a/src/k8splugin/internal/rb/config.go b/src/k8splugin/internal/rb/config.go deleted file mode 100644 index 3bd8347b..00000000 --- a/src/k8splugin/internal/rb/config.go +++ /dev/null @@ -1,436 +0,0 @@ -/* - * Copyright 2018 Intel Corporation, Inc - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package rb - -import ( - pkgerrors "github.com/pkg/errors" - "k8splugin/internal/db" - "strconv" - "strings" -) - -// Config contains the parameters needed for configuration -type Config struct { - ConfigName string `json:"config-name"` - TemplateName string `json:"template-name"` - Description string `json:"description"` - Values map[string]interface{} `json:"values"` -} - -//ConfigResult output for Create, Update and delete -type ConfigResult struct { - DefinitionName string `json:"rb-name"` - DefinitionVersion string `json:"rb-version"` - ProfileName string `json:"profile-name"` - ConfigName string `json:"config-name"` - TemplateName string `json:"template-name"` - ConfigVersion uint `json:"config-verion"` -} - -//ConfigRollback input -type ConfigRollback struct { - AnyOf struct { - ConfigVersion string `json:"config-version,omitempty"` - ConfigTag string `json:"config-tag,omitempty"` - } `json:"anyOf"` -} - -//ConfigTagit for Tagging configurations -type ConfigTagit struct { - TagName string `json:"tag-name"` -} - -// ConfigManager is an interface exposes the config functionality -type ConfigManager interface { - Create(rbName, rbVersion, profileName string, p Config) (ConfigResult, error) - Get(rbName, rbVersion, profileName, configName string) (Config, error) - Help() map[string]string - Update(rbName, rbVersion, profileName, configName string, p Config) (ConfigResult, error) - Delete(rbName, rbVersion, profileName, configName string) (ConfigResult, error) - Rollback(rbName, rbVersion, profileName string, p ConfigRollback) error - Tagit(rbName, rbVersion, profileName string, p ConfigTagit) error -} - -// ConfigClient implements the ConfigManager -// It will also be used to maintain some localized state -type ConfigClient struct { - tagTag string -} - -// NewConfigClient returns an instance of the ConfigClient -// which implements the ConfigManager -func NewConfigClient() *ConfigClient { - return &ConfigClient{ - tagTag: "tag", - } -} - -// Help returns some information on how to create the content -// for the config in the form of html formatted page -func (v *ConfigClient) Help() map[string]string { - ret := make(map[string]string) - - return ret -} - -// Create an entry for the config in the database -func (v *ConfigClient) Create(rbName, rbVersion, profileName string, p Config) (ConfigResult, error) { - - // Check required fields - if p.ConfigName == "" || p.TemplateName == "" || len(p.Values) == 0 { - return ConfigResult{}, pkgerrors.New("Incomplete Configuration Provided") - } - cs := ConfigStore{ - rbName: rbName, - rbVersion: rbVersion, - profileName: profileName, - configName: p.ConfigName, - } - _, err := cs.getConfig() - if err == nil { - return ConfigResult{}, pkgerrors.Wrap(err, "Create Error - Config exists") - } else { - if strings.Contains(err.Error(), "Key doesn't exist") == false { - return ConfigResult{}, pkgerrors.Wrap(err, "Create Error") - } - } - lock, profileChannel := getProfileData(rbName + rbVersion + profileName) - // Acquire per profile Mutex - lock.Lock() - defer lock.Unlock() - err = applyConfig(rbName, rbVersion, profileName, p, profileChannel, "POST") - if err != nil { - return ConfigResult{}, pkgerrors.Wrap(err, "Apply Config failed") - } - // Create Config DB Entry - err = cs.createConfig(p) - if err != nil { - return ConfigResult{}, pkgerrors.Wrap(err, "Create Config DB Entry") - } - // Create Version Entry in DB for Config - cvs := ConfigVersionStore{ - rbName: rbName, - rbVersion: rbVersion, - profileName: profileName, - } - version, err := cvs.createConfigVersion(p, Config{}, "POST") - if err != nil { - return ConfigResult{}, pkgerrors.Wrap(err, "Create Config Version DB Entry") - } - // Create Result structure - cfgRes := ConfigResult{ - DefinitionName: rbName, - DefinitionVersion: rbVersion, - ProfileName: profileName, - ConfigName: p.ConfigName, - TemplateName: p.TemplateName, - ConfigVersion: version, - } - return cfgRes, nil -} - -// Update an entry for the config in the database -func (v *ConfigClient) Update(rbName, rbVersion, profileName, configName string, p Config) (ConfigResult, error) { - - // Check required fields - if len(p.Values) == 0 { - return ConfigResult{}, pkgerrors.New("Incomplete Configuration Provided") - } - // Check if Config exists - cs := ConfigStore{ - rbName: rbName, - rbVersion: rbVersion, - profileName: profileName, - configName: configName, - } - _, err := cs.getConfig() - if err != nil { - return ConfigResult{}, pkgerrors.Wrap(err, "Update Error - Config doesn't exist") - } - lock, profileChannel := getProfileData(rbName + rbVersion + profileName) - // Acquire per profile Mutex - lock.Lock() - defer lock.Unlock() - err = applyConfig(rbName, rbVersion, profileName, p, profileChannel, "PUT") - if err != nil { - return ConfigResult{}, pkgerrors.Wrap(err, "Apply Config failed") - } - // Update Config DB Entry - configPrev, err := cs.updateConfig(p) - if err != nil { - return ConfigResult{}, pkgerrors.Wrap(err, "Update Config DB Entry") - } - // Create Version Entry in DB for Config - cvs := ConfigVersionStore{ - rbName: rbName, - rbVersion: rbVersion, - profileName: profileName, - } - version, err := cvs.createConfigVersion(p, configPrev, "PUT") - if err != nil { - return ConfigResult{}, pkgerrors.Wrap(err, "Create Config Version DB Entry") - } - // Create Result structure - cfgRes := ConfigResult{ - DefinitionName: rbName, - DefinitionVersion: rbVersion, - ProfileName: profileName, - ConfigName: p.ConfigName, - TemplateName: p.TemplateName, - ConfigVersion: version, - } - return cfgRes, nil -} - -// Get config entry in the database -func (v *ConfigClient) Get(rbName, rbVersion, profileName, configName string) (Config, error) { - - // Acquire per profile Mutex - lock, _ := getProfileData(rbName + rbVersion + profileName) - lock.Lock() - defer lock.Unlock() - // Read Config DB - cs := ConfigStore{ - rbName: rbName, - rbVersion: rbVersion, - profileName: profileName, - configName: configName, - } - cfg, err := cs.getConfig() - if err != nil { - return Config{}, pkgerrors.Wrap(err, "Get Config DB Entry") - } - return cfg, nil -} - -// Delete the Config from database -func (v *ConfigClient) Delete(rbName, rbVersion, profileName, configName string) (ConfigResult, error) { - - // Check if Config exists - cs := ConfigStore{ - rbName: rbName, - rbVersion: rbVersion, - profileName: profileName, - configName: configName, - } - p, err := cs.getConfig() - if err != nil { - return ConfigResult{}, pkgerrors.Wrap(err, "Update Error - Config doesn't exist") - } - lock, profileChannel := getProfileData(rbName + rbVersion + profileName) - // Acquire per profile Mutex - lock.Lock() - defer lock.Unlock() - err = applyConfig(rbName, rbVersion, profileName, p, profileChannel, "DELETE") - if err != nil { - return ConfigResult{}, pkgerrors.Wrap(err, "Apply Config failed") - } - // Delete Config from DB - configPrev, err := cs.deleteConfig() - if err != nil { - return ConfigResult{}, pkgerrors.Wrap(err, "Delete Config DB Entry") - } - // Create Version Entry in DB for Config - cvs := ConfigVersionStore{ - rbName: rbName, - rbVersion: rbVersion, - profileName: profileName, - } - version, err := cvs.createConfigVersion(Config{}, configPrev, "DELETE") - if err != nil { - return ConfigResult{}, pkgerrors.Wrap(err, "Delete Config Version DB Entry") - } - // Create Result structure - cfgRes := ConfigResult{ - DefinitionName: rbName, - DefinitionVersion: rbVersion, - ProfileName: profileName, - ConfigName: configName, - TemplateName: configPrev.TemplateName, - ConfigVersion: version, - } - return cfgRes, nil -} - -// Rollback starts from current version and rollbacks to the version desired -func (v *ConfigClient) Rollback(rbName, rbVersion, profileName string, rback ConfigRollback) error { - - var reqVersion string - var err error - - if rback.AnyOf.ConfigTag != "" { - reqVersion, err = v.GetTagVersion(rbName, rbVersion, profileName, rback.AnyOf.ConfigTag) - if err != nil { - return pkgerrors.Wrap(err, "Rollback Invalid tag") - } - } else if rback.AnyOf.ConfigVersion != "" { - reqVersion = rback.AnyOf.ConfigVersion - } else { - return pkgerrors.Errorf("No valid Index for Rollback") - } - - index, err := strconv.Atoi(reqVersion) - if err != nil { - return pkgerrors.Wrap(err, "Rollback Invalid Index") - } - rollbackIndex := uint(index) - - lock, profileChannel := getProfileData(rbName + rbVersion + profileName) - // Acquire per profile Mutex - lock.Lock() - defer lock.Unlock() - - cvs := ConfigVersionStore{ - rbName: rbName, - rbVersion: rbVersion, - profileName: profileName, - } - currentVersion, err := cvs.getCurrentVersion() - if err != nil { - return pkgerrors.Wrap(err, "Rollback Get Current Config Version ") - } - - if rollbackIndex < 1 && rollbackIndex >= currentVersion { - return pkgerrors.Wrap(err, "Rollback Invalid Config Version") - } - - //Rollback all the intermettinent configurations - for i := currentVersion; i > rollbackIndex; i-- { - configNew, configPrev, action, err := cvs.getConfigVersion(i) - if err != nil { - return pkgerrors.Wrap(err, "Rollback Get Config Version") - } - cs := ConfigStore{ - rbName: rbName, - rbVersion: rbVersion, - profileName: profileName, - configName: configNew.ConfigName, - } - if action == "PUT" { - // PUT is proceeded by PUT or POST - err = applyConfig(rbName, rbVersion, profileName, configPrev, profileChannel, "PUT") - if err != nil { - return pkgerrors.Wrap(err, "Apply Config failed") - } - _, err = cs.updateConfig(configPrev) - if err != nil { - return pkgerrors.Wrap(err, "Update Config DB Entry") - } - } else if action == "POST" { - // POST is always preceeded by Config not existing - err = applyConfig(rbName, rbVersion, profileName, configNew, profileChannel, "DELETE") - if err != nil { - return pkgerrors.Wrap(err, "Delete Config failed") - } - _, err = cs.deleteConfig() - if err != nil { - return pkgerrors.Wrap(err, "Delete Config DB Entry") - } - } else if action == "DELETE" { - // DELETE is proceeded by PUT or POST - err = applyConfig(rbName, rbVersion, profileName, configPrev, profileChannel, "PUT") - if err != nil { - return pkgerrors.Wrap(err, "Delete Config failed") - } - _, err = cs.updateConfig(configPrev) - if err != nil { - return pkgerrors.Wrap(err, "Update Config DB Entry") - } - } - } - for i := currentVersion; i > rollbackIndex; i-- { - // Delete rolled back items - err = cvs.deleteConfigVersion() - if err != nil { - return pkgerrors.Wrap(err, "Delete Config Version ") - } - } - return nil -} - -// Tagit tags the current version with the tag provided -func (v *ConfigClient) Tagit(rbName, rbVersion, profileName string, tag ConfigTagit) error { - - lock, _ := getProfileData(rbName + rbVersion + profileName) - // Acquire per profile Mutex - lock.Lock() - defer lock.Unlock() - - cvs := ConfigVersionStore{ - rbName: rbName, - rbVersion: rbVersion, - profileName: profileName, - } - currentVersion, err := cvs.getCurrentVersion() - if err != nil { - return pkgerrors.Wrap(err, "Get Current Config Version ") - } - tagKey := constructKey(rbName, rbVersion, profileName, v.tagTag, tag.TagName) - - err = db.Etcd.Put(tagKey, strconv.Itoa(int(currentVersion))) - if err != nil { - return pkgerrors.Wrap(err, "TagIt store DB") - } - return nil -} - -// GetTagVersion returns the version associated with the tag -func (v *ConfigClient) GetTagVersion(rbName, rbVersion, profileName, tagName string) (string, error) { - - tagKey := constructKey(rbName, rbVersion, profileName, v.tagTag, tagName) - - value, err := db.Etcd.Get(tagKey) - if err != nil { - return "", pkgerrors.Wrap(err, "Config DB Entry Not found") - } - return string(value), nil -} - -// ApplyAllConfig starts from first configuration version and applies all versions in sequence -func (v *ConfigClient) ApplyAllConfig(rbName, rbVersion, profileName string) error { - - lock, profileChannel := getProfileData(rbName + rbVersion + profileName) - // Acquire per profile Mutex - lock.Lock() - defer lock.Unlock() - - cvs := ConfigVersionStore{ - rbName: rbName, - rbVersion: rbVersion, - profileName: profileName, - } - currentVersion, err := cvs.getCurrentVersion() - if err != nil { - return pkgerrors.Wrap(err, "Get Current Config Version ") - } - if currentVersion < 1 { - return pkgerrors.Wrap(err, "No Config Version to Apply") - } - //Apply all configurations - var i uint - for i = 1; i <= currentVersion; i++ { - configNew, _, action, err := cvs.getConfigVersion(i) - if err != nil { - return pkgerrors.Wrap(err, "Get Config Version") - } - err = applyConfig(rbName, rbVersion, profileName, configNew, profileChannel, action) - if err != nil { - return pkgerrors.Wrap(err, "Apply Config failed") - } - } - return nil -} diff --git a/src/k8splugin/internal/rb/config_backend.go b/src/k8splugin/internal/rb/config_backend.go deleted file mode 100644 index e2fa5b3c..00000000 --- a/src/k8splugin/internal/rb/config_backend.go +++ /dev/null @@ -1,439 +0,0 @@ -/* - * Copyright 2018 Intel Corporation, Inc - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package rb - -import ( - "bytes" - "encoding/json" - "io/ioutil" - "log" - "path/filepath" - "strconv" - "strings" - "sync" - - "k8splugin/internal/db" - "k8splugin/internal/helm" - - "github.com/ghodss/yaml" - pkgerrors "github.com/pkg/errors" -) - -//ConfigStore contains the values that will be stored in the database -type configVersionDBContent struct { - ConfigNew Config `json:"config-new"` - ConfigPrev Config `json:"config-prev"` - Action string `json:"action"` // CRUD opration for this config -} - -//ConfigStore to Store the Config -type ConfigStore struct { - rbName string - rbVersion string - profileName string - configName string -} - -//ConfigVersionStore to Store the Versions of the Config -type ConfigVersionStore struct { - rbName string - rbVersion string - profileName string -} - -type configResourceList struct { - resourceTemplates []helm.KubernetesResourceTemplate - profile Profile - action string -} - -type profileDataManager struct { - profileLockMap map[string]*sync.Mutex - resourceChannel map[string](chan configResourceList) - sync.Mutex -} - -const ( - storeName = "config" - tagCounter = "counter" - tagVersion = "configversion" - tagConfig = "configdata" -) - -var profileData = profileDataManager{ - profileLockMap: map[string]*sync.Mutex{}, - resourceChannel: map[string]chan configResourceList{}, -} - -// Construct key for storing data -func constructKey(strs ...string) string { - - var sb strings.Builder - sb.WriteString("onapk8s") - sb.WriteString("/") - sb.WriteString(storeName) - sb.WriteString("/") - for _, str := range strs { - sb.WriteString(str) - sb.WriteString("/") - } - return sb.String() - -} - -// Create an entry for the config in the database -func (c ConfigStore) createConfig(p Config) error { - - cfgKey := constructKey(c.rbName, c.rbVersion, c.profileName, tagConfig, p.ConfigName) - _, err := db.Etcd.Get(cfgKey) - if err == nil { - return pkgerrors.Wrap(err, "Config DB Entry Already exists") - } - configValue, err := db.Serialize(p) - if err != nil { - return pkgerrors.Wrap(err, "Serialize Config Value") - } - err = db.Etcd.Put(cfgKey, configValue) - if err != nil { - return pkgerrors.Wrap(err, "Config DB Entry") - } - return nil -} - -// Update the config entry in the database. Updates with the new value -// Returns the previous value of the Config -func (c ConfigStore) updateConfig(p Config) (Config, error) { - - cfgKey := constructKey(c.rbName, c.rbVersion, c.profileName, tagConfig, p.ConfigName) - value, err := db.Etcd.Get(cfgKey) - configPrev := Config{} - if err == nil { - // If updating Config after rollback then previous config may not exist - err = db.DeSerialize(string(value), &configPrev) - if err != nil { - return Config{}, pkgerrors.Wrap(err, "DeSerialize Config Value") - } - } - configValue, err := db.Serialize(p) - if err != nil { - return Config{}, pkgerrors.Wrap(err, "Serialize Config Value") - } - err = db.Etcd.Put(cfgKey, configValue) - if err != nil { - return Config{}, pkgerrors.Wrap(err, "Config DB Entry") - } - return configPrev, nil -} - -// Read the config entry in the database -func (c ConfigStore) getConfig() (Config, error) { - cfgKey := constructKey(c.rbName, c.rbVersion, c.profileName, tagConfig, c.configName) - value, err := db.Etcd.Get(cfgKey) - if err != nil { - return Config{}, pkgerrors.Wrap(err, "Get Config DB Entry") - } - //value is a byte array - if value != nil { - cfg := Config{} - err = db.DeSerialize(string(value), &cfg) - if err != nil { - return Config{}, pkgerrors.Wrap(err, "Unmarshaling Config Value") - } - return cfg, nil - } - return Config{}, pkgerrors.Wrap(err, "Get Config DB Entry") -} - -// Delete the config entry in the database -func (c ConfigStore) deleteConfig() (Config, error) { - - cfgKey := constructKey(c.rbName, c.rbVersion, c.profileName, tagConfig, c.configName) - value, err := db.Etcd.Get(cfgKey) - if err != nil { - return Config{}, pkgerrors.Wrap(err, "Config DB Entry Not found") - } - configPrev := Config{} - err = db.DeSerialize(string(value), &configPrev) - if err != nil { - return Config{}, pkgerrors.Wrap(err, "DeSerialize Config Value") - } - - err = db.Etcd.Delete(cfgKey) - if err != nil { - return Config{}, pkgerrors.Wrap(err, "Config DB Entry") - } - return configPrev, nil -} - -// Create a version for the configuration. If previous config provided that is also stored -func (c ConfigVersionStore) createConfigVersion(configNew, configPrev Config, action string) (uint, error) { - - version, err := c.incrementVersion() - - if err != nil { - return 0, pkgerrors.Wrap(err, "Get Next Version") - } - versionKey := constructKey(c.rbName, c.rbVersion, c.profileName, tagVersion, strconv.Itoa(int(version))) - - var cs configVersionDBContent - cs.Action = action - cs.ConfigNew = configNew - cs.ConfigPrev = configPrev - - configValue, err := db.Serialize(cs) - if err != nil { - return 0, pkgerrors.Wrap(err, "Serialize Config Value") - } - err = db.Etcd.Put(versionKey, configValue) - if err != nil { - return 0, pkgerrors.Wrap(err, "Create Config DB Entry") - } - return version, nil -} - -// Delete current version of the configuration. Configuration always deleted from top -func (c ConfigVersionStore) deleteConfigVersion() error { - - counter, err := c.getCurrentVersion() - - if err != nil { - return pkgerrors.Wrap(err, "Get Next Version") - } - versionKey := constructKey(c.rbName, c.rbVersion, c.profileName, tagVersion, strconv.Itoa(int(counter))) - - err = db.Etcd.Delete(versionKey) - if err != nil { - return pkgerrors.Wrap(err, "Delete Config DB Entry") - } - err = c.decrementVersion() - if err != nil { - return pkgerrors.Wrap(err, "Decrement Version") - } - return nil -} - -// Read the specified version of the configuration and return its prev and current value. -// Also returns the action for the config version -func (c ConfigVersionStore) getConfigVersion(version uint) (Config, Config, string, error) { - - versionKey := constructKey(c.rbName, c.rbVersion, c.profileName, tagVersion, strconv.Itoa(int(version))) - configBytes, err := db.Etcd.Get(versionKey) - if err != nil { - return Config{}, Config{}, "", pkgerrors.Wrap(err, "Get Config Version ") - } - - if configBytes != nil { - pr := configVersionDBContent{} - err = db.DeSerialize(string(configBytes), &pr) - if err != nil { - return Config{}, Config{}, "", pkgerrors.Wrap(err, "DeSerialize Config Version") - } - return pr.ConfigNew, pr.ConfigPrev, pr.Action, nil - } - return Config{}, Config{}, "", pkgerrors.Wrap(err, "Invalid data ") -} - -// Get the counter for the version -func (c ConfigVersionStore) getCurrentVersion() (uint, error) { - - cfgKey := constructKey(c.rbName, c.rbVersion, c.profileName, tagCounter) - - value, err := db.Etcd.Get(cfgKey) - if err != nil { - if strings.Contains(err.Error(), "Key doesn't exist") == true { - // Counter not started yet, 0 is invalid value - return 0, nil - } else { - return 0, pkgerrors.Wrap(err, "Get Current Version") - } - } - - index, err := strconv.Atoi(string(value)) - if err != nil { - return 0, pkgerrors.Wrap(err, "Invalid counter") - } - return uint(index), nil -} - -// Update the counter for the version -func (c ConfigVersionStore) updateVersion(counter uint) error { - - cfgKey := constructKey(c.rbName, c.rbVersion, c.profileName, tagCounter) - err := db.Etcd.Put(cfgKey, strconv.Itoa(int(counter))) - if err != nil { - return pkgerrors.Wrap(err, "Counter DB Entry") - } - return nil -} - -// Increment the version counter -func (c ConfigVersionStore) incrementVersion() (uint, error) { - - counter, err := c.getCurrentVersion() - if err != nil { - return 0, pkgerrors.Wrap(err, "Get Next Counter Value") - } - //This is done while Profile lock is taken - counter++ - err = c.updateVersion(counter) - if err != nil { - return 0, pkgerrors.Wrap(err, "Store Next Counter Value") - } - - return counter, nil -} - -// Decrement the version counter -func (c ConfigVersionStore) decrementVersion() error { - - counter, err := c.getCurrentVersion() - if err != nil { - return pkgerrors.Wrap(err, "Get Next Counter Value") - } - //This is done while Profile lock is taken - counter-- - err = c.updateVersion(counter) - if err != nil { - return pkgerrors.Wrap(err, "Store Next Counter Value") - } - - return nil -} - -// Apply Config -func applyConfig(rbName, rbVersion, profileName string, p Config, pChannel chan configResourceList, action string) error { - - // Get Template and Resolve the template with values - crl, err := resolve(rbName, rbVersion, profileName, p) - if err != nil { - return pkgerrors.Wrap(err, "Resolve Config") - } - crl.action = action - // Send the configResourceList to the channel. Using select for non-blocking channel - select { - case pChannel <- crl: - log.Printf("Message Sent to goroutine %v", crl.profile) - default: - } - - return nil -} - -// Per Profile Go routine to apply the configuration to Cloud Region -func scheduleResources(c chan configResourceList) { - // Keep thread running - for { - data := <-c - //TODO: ADD Check to see if Application running - switch { - case data.action == "POST": - log.Printf("[scheduleResources]: POST %v %v", data.profile, data.resourceTemplates) - //TODO: Needs to add code to call Kubectl create - case data.action == "PUT": - log.Printf("[scheduleResources]: PUT %v %v", data.profile, data.resourceTemplates) - //TODO: Needs to add code to call Kubectl apply - case data.action == "DELETE": - log.Printf("[scheduleResources]: DELETE %v %v", data.profile, data.resourceTemplates) - //TODO: Needs to add code to call Kubectl delete - - } - } -} - -//Resolve returns the path where the helm chart merged with -//configuration overrides resides. -var resolve = func(rbName, rbVersion, profileName string, p Config) (configResourceList, error) { - - var resTemplates []helm.KubernetesResourceTemplate - - profile, err := NewProfileClient().Get(rbName, rbVersion, profileName) - if err != nil { - return configResourceList{}, pkgerrors.Wrap(err, "Reading Profile Data") - } - - t, err := NewConfigTemplateClient().Get(rbName, rbVersion, p.TemplateName) - if err != nil { - return configResourceList{}, pkgerrors.Wrap(err, "Getting Template") - } - if t.ChartName == "" { - return configResourceList{}, pkgerrors.New("Invalid template no Chart.yaml file found") - } - - def, err := NewConfigTemplateClient().Download(rbName, rbVersion, p.TemplateName) - if err != nil { - return configResourceList{}, pkgerrors.Wrap(err, "Downloading Template") - } - - //Create a temp file in the system temp folder for values input - b, err := json.Marshal(p.Values) - if err != nil { - return configResourceList{}, pkgerrors.Wrap(err, "Error Marshalling config data") - } - data, err := yaml.JSONToYAML(b) - if err != nil { - return configResourceList{}, pkgerrors.Wrap(err, "JSON to YAML") - } - - outputfile, err := ioutil.TempFile("", "helm-config-values-") - if err != nil { - return configResourceList{}, pkgerrors.Wrap(err, "Got error creating temp file") - } - _, err = outputfile.Write([]byte(data)) - if err != nil { - return configResourceList{}, pkgerrors.Wrap(err, "Got error writting temp file") - } - defer outputfile.Close() - - chartBasePath, err := ExtractTarBall(bytes.NewBuffer(def)) - if err != nil { - return configResourceList{}, pkgerrors.Wrap(err, "Extracting Template") - } - - helmClient := helm.NewTemplateClient(profile.KubernetesVersion, - profile.Namespace, - profile.ReleaseName) - - chartPath := filepath.Join(chartBasePath, t.ChartName) - resTemplates, err = helmClient.GenerateKubernetesArtifacts(chartPath, - []string{outputfile.Name()}, - nil) - if err != nil { - return configResourceList{}, pkgerrors.Wrap(err, "Generate final k8s yaml") - } - crl := configResourceList{ - resourceTemplates: resTemplates, - profile: profile, - } - - return crl, nil -} - -// Get the Mutex for the Profile -func getProfileData(key string) (*sync.Mutex, chan configResourceList) { - profileData.Lock() - defer profileData.Unlock() - _, ok := profileData.profileLockMap[key] - if !ok { - profileData.profileLockMap[key] = &sync.Mutex{} - } - _, ok = profileData.resourceChannel[key] - if !ok { - profileData.resourceChannel[key] = make(chan configResourceList) - go scheduleResources(profileData.resourceChannel[key]) - } - return profileData.profileLockMap[key], profileData.resourceChannel[key] -} diff --git a/src/k8splugin/internal/rb/config_test.go b/src/k8splugin/internal/rb/config_test.go deleted file mode 100644 index 9bf97a51..00000000 --- a/src/k8splugin/internal/rb/config_test.go +++ /dev/null @@ -1,259 +0,0 @@ -/* - * Copyright 2018 Intel Corporation, Inc - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package rb - -import ( - "k8splugin/internal/db" - "reflect" - "strings" - "testing" - // pkgerrors "github.com/pkg/errors" -) - -func TestCreateConfig(t *testing.T) { - testCases := []struct { - label string - rbName string - rbVersion string - profileName string - inp Config - expectedError string - mockdb *db.MockEtcdClient - expected ConfigResult - }{ - { - label: "Create Config", - rbName: "testdef1", - rbVersion: "v1", - profileName: "testprofile1", - inp: Config{ - ConfigName: "testconfig1", - TemplateName: "testtemplate1", - Values: map[string]interface{}{ - "values": "{\"namespace\": \"kafka\", \"topic\": {\"name\":\"orders\", \"cluster\":\"my-cluster\", \"partitions\": 10,\"replicas\": 2, }}"}, - }, - expected: ConfigResult{ - DefinitionName: "testdef1", - DefinitionVersion: "v1", - ProfileName: "testprofile1", - ConfigName: "testconfig1", - TemplateName: "testtemplate1", - ConfigVersion: 1, - }, - expectedError: "", - mockdb: &db.MockEtcdClient{ - Items: nil, - Err: nil, - }, - }, - } - - for _, testCase := range testCases { - t.Run(testCase.label, func(t *testing.T) { - db.Etcd = testCase.mockdb - resolve = func(rbName, rbVersion, profileName string, p Config) (configResourceList, error) { - return configResourceList{}, nil - } - impl := NewConfigClient() - got, err := impl.Create(testCase.rbName, testCase.rbVersion, testCase.profileName, testCase.inp) - if err != nil { - if testCase.expectedError == "" { - t.Fatalf("Create returned an unexpected error %s", err) - } - if strings.Contains(err.Error(), testCase.expectedError) == false { - t.Fatalf("Create returned an unexpected error %s", err) - } - } else { - if reflect.DeepEqual(testCase.expected, got) == false { - t.Errorf("Create Resource Bundle returned unexpected body: got %v;"+ - " expected %v", got, testCase.expected) - } - } - }) - } -} - -func TestRollbackConfig(t *testing.T) { - testCases := []struct { - label string - rbName string - rbVersion string - profileName string - inp Config - inpUpdate1 Config - inpUpdate2 Config - expectedError string - mockdb *db.MockEtcdClient - expected1 ConfigResult - expected2 ConfigResult - expected3 ConfigResult - expected4 ConfigResult - rollbackConfig ConfigRollback - }{ - { - label: "Rollback Config", - rbName: "testdef1", - rbVersion: "v1", - profileName: "testprofile1", - inp: Config{ - ConfigName: "testconfig1", - TemplateName: "testtemplate1", - Values: map[string]interface{}{ - "values": "{\"namespace\": \"kafka\", \"topic\": {\"name\":\"orders\", \"cluster\":\"my-cluster\", \"partitions\": 10,\"replicas\": 2, }}"}, - }, - inpUpdate1: Config{ - ConfigName: "testconfig1", - TemplateName: "testtemplate1", - Values: map[string]interface{}{ - "values": "{\"namespace\": \"kafka\", \"topic\": {\"name\":\"orders\", \"cluster\":\"my-cluster\", \"partitions\": 20,\"replicas\": 2, }}"}, - }, - inpUpdate2: Config{ - ConfigName: "testconfig1", - TemplateName: "testtemplate1", - Values: map[string]interface{}{ - "values": "{\"namespace\": \"kafka\", \"topic\": {\"name\":\"orders\", \"cluster\":\"my-cluster\", \"partitions\": 30,\"replicas\": 2, }}"}, - }, - expected1: ConfigResult{ - DefinitionName: "testdef1", - DefinitionVersion: "v1", - ProfileName: "testprofile1", - ConfigName: "testconfig1", - TemplateName: "testtemplate1", - ConfigVersion: 1, - }, - expected2: ConfigResult{ - DefinitionName: "testdef1", - DefinitionVersion: "v1", - ProfileName: "testprofile1", - ConfigName: "testconfig1", - TemplateName: "testtemplate1", - ConfigVersion: 2, - }, - expected3: ConfigResult{ - DefinitionName: "testdef1", - DefinitionVersion: "v1", - ProfileName: "testprofile1", - ConfigName: "testconfig1", - TemplateName: "testtemplate1", - ConfigVersion: 3, - }, - expected4: ConfigResult{ - DefinitionName: "testdef1", - DefinitionVersion: "v1", - ProfileName: "testprofile1", - ConfigName: "testconfig1", - TemplateName: "testtemplate1", - ConfigVersion: 4, - }, - expectedError: "", - mockdb: &db.MockEtcdClient{ - Items: nil, - Err: nil, - }, - }, - } - - for _, testCase := range testCases { - t.Run(testCase.label, func(t *testing.T) { - db.Etcd = testCase.mockdb - resolve = func(rbName, rbVersion, profileName string, p Config) (configResourceList, error) { - return configResourceList{}, nil - } - impl := NewConfigClient() - got, err := impl.Create(testCase.rbName, testCase.rbVersion, testCase.profileName, testCase.inp) - if err != nil { - if testCase.expectedError == "" { - t.Fatalf("Create returned an unexpected error %s", err) - } - if strings.Contains(err.Error(), testCase.expectedError) == false { - t.Fatalf("Create returned an unexpected error %s", err) - } - } else { - if reflect.DeepEqual(testCase.expected1, got) == false { - t.Errorf("Create Resource Bundle returned unexpected body: got %v;"+ - " expected %v", got, testCase.expected1) - } - } - got, err = impl.Update(testCase.rbName, testCase.rbVersion, testCase.profileName, testCase.inp.ConfigName, testCase.inpUpdate1) - if err != nil { - if testCase.expectedError == "" { - t.Fatalf("Create returned an unexpected error %s", err) - } - if strings.Contains(err.Error(), testCase.expectedError) == false { - t.Fatalf("Create returned an unexpected error %s", err) - } - } else { - if reflect.DeepEqual(testCase.expected2, got) == false { - t.Errorf("Create Resource Bundle returned unexpected body: got %v;"+ - " expected %v", got, testCase.expected2) - } - } - got, err = impl.Update(testCase.rbName, testCase.rbVersion, testCase.profileName, testCase.inp.ConfigName, testCase.inpUpdate2) - if err != nil { - if testCase.expectedError == "" { - t.Fatalf("Create returned an unexpected error %s", err) - } - if strings.Contains(err.Error(), testCase.expectedError) == false { - t.Fatalf("Create returned an unexpected error %s", err) - } - } else { - if reflect.DeepEqual(testCase.expected3, got) == false { - t.Errorf("Create Resource Bundle returned unexpected body: got %v;"+ - " expected %v", got, testCase.expected3) - } - } - got, err = impl.Delete(testCase.rbName, testCase.rbVersion, testCase.profileName, testCase.inp.ConfigName) - if err != nil { - if testCase.expectedError == "" { - t.Fatalf("Create returned an unexpected error %s", err) - } - if strings.Contains(err.Error(), testCase.expectedError) == false { - t.Fatalf("Create returned an unexpected error %s", err) - } - } else { - if reflect.DeepEqual(testCase.expected4, got) == false { - t.Errorf("Create Resource Bundle returned unexpected body: got %v;"+ - " expected %v", got, testCase.expected4) - } - } - testCase.rollbackConfig.AnyOf.ConfigVersion = "2" - err = impl.Rollback(testCase.rbName, testCase.rbVersion, testCase.profileName, testCase.rollbackConfig) - if err != nil { - if testCase.expectedError == "" { - t.Fatalf("Create returned an unexpected error %s", err) - } - if strings.Contains(err.Error(), testCase.expectedError) == false { - t.Fatalf("Create returned an unexpected error %s", err) - } - } - rollbackConfig, err := impl.Get(testCase.rbName, testCase.rbVersion, testCase.profileName, testCase.inp.ConfigName) - if err != nil { - if testCase.expectedError == "" { - t.Fatalf("Create returned an unexpected error %s", err) - } - if strings.Contains(err.Error(), testCase.expectedError) == false { - t.Fatalf("Create returned an unexpected error %s", err) - } - } else { - if reflect.DeepEqual(testCase.inpUpdate1, rollbackConfig) == false { - t.Errorf("Rollback config failed: got %v;"+ - " expected %v", rollbackConfig, testCase.inpUpdate1) - } - } - }) - } -} -- cgit 1.2.3-korg