diff options
Diffstat (limited to 'src/k8splugin/internal/app')
-rw-r--r-- | src/k8splugin/internal/app/config.go | 438 | ||||
-rw-r--r-- | src/k8splugin/internal/app/config_backend.go | 475 | ||||
-rw-r--r-- | src/k8splugin/internal/app/config_test.go | 259 |
3 files changed, 1172 insertions, 0 deletions
diff --git a/src/k8splugin/internal/app/config.go b/src/k8splugin/internal/app/config.go new file mode 100644 index 00000000..f7e81358 --- /dev/null +++ b/src/k8splugin/internal/app/config.go @@ -0,0 +1,438 @@ +/* + * Copyright 2018 Intel Corporation, Inc + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package app + +import ( + "strconv" + "strings" + + "k8splugin/internal/db" + + pkgerrors "github.com/pkg/errors" +) + +// Config contains the parameters needed for configuration +type Config struct { + ConfigName string `json:"config-name"` + TemplateName string `json:"template-name"` + Description string `json:"description"` + Values map[string]interface{} `json:"values"` +} + +//ConfigResult output for Create, Update and delete +type ConfigResult struct { + DefinitionName string `json:"rb-name"` + DefinitionVersion string `json:"rb-version"` + ProfileName string `json:"profile-name"` + ConfigName string `json:"config-name"` + TemplateName string `json:"template-name"` + ConfigVersion uint `json:"config-verion"` +} + +//ConfigRollback input +type ConfigRollback struct { + AnyOf struct { + ConfigVersion string `json:"config-version,omitempty"` + ConfigTag string `json:"config-tag,omitempty"` + } `json:"anyOf"` +} + +//ConfigTagit for Tagging configurations +type ConfigTagit struct { + TagName string `json:"tag-name"` +} + +// ConfigManager is an interface exposes the config functionality +type ConfigManager interface { + Create(rbName, rbVersion, profileName string, p Config) (ConfigResult, error) + Get(rbName, rbVersion, profileName, configName string) (Config, error) + Help() map[string]string + Update(rbName, rbVersion, profileName, configName string, p Config) (ConfigResult, error) + Delete(rbName, rbVersion, profileName, configName string) (ConfigResult, error) + Rollback(rbName, rbVersion, profileName string, p ConfigRollback) error + Tagit(rbName, rbVersion, profileName string, p ConfigTagit) error +} + +// ConfigClient implements the ConfigManager +// It will also be used to maintain some localized state +type ConfigClient struct { + tagTag string +} + +// NewConfigClient returns an instance of the ConfigClient +// which implements the ConfigManager +func NewConfigClient() *ConfigClient { + return &ConfigClient{ + tagTag: "tag", + } +} + +// Help returns some information on how to create the content +// for the config in the form of html formatted page +func (v *ConfigClient) Help() map[string]string { + ret := make(map[string]string) + + return ret +} + +// Create an entry for the config in the database +func (v *ConfigClient) Create(rbName, rbVersion, profileName string, p Config) (ConfigResult, error) { + + // Check required fields + if p.ConfigName == "" || p.TemplateName == "" || len(p.Values) == 0 { + return ConfigResult{}, pkgerrors.New("Incomplete Configuration Provided") + } + cs := ConfigStore{ + rbName: rbName, + rbVersion: rbVersion, + profileName: profileName, + configName: p.ConfigName, + } + _, err := cs.getConfig() + if err == nil { + return ConfigResult{}, pkgerrors.Wrap(err, "Create Error - Config exists") + } else { + if strings.Contains(err.Error(), "Key doesn't exist") == false { + return ConfigResult{}, pkgerrors.Wrap(err, "Create Error") + } + } + lock, profileChannel := getProfileData(rbName + rbVersion + profileName) + // Acquire per profile Mutex + lock.Lock() + defer lock.Unlock() + err = applyConfig(rbName, rbVersion, profileName, p, profileChannel, "POST") + if err != nil { + return ConfigResult{}, pkgerrors.Wrap(err, "Apply Config failed") + } + // Create Config DB Entry + err = cs.createConfig(p) + if err != nil { + return ConfigResult{}, pkgerrors.Wrap(err, "Create Config DB Entry") + } + // Create Version Entry in DB for Config + cvs := ConfigVersionStore{ + rbName: rbName, + rbVersion: rbVersion, + profileName: profileName, + } + version, err := cvs.createConfigVersion(p, Config{}, "POST") + if err != nil { + return ConfigResult{}, pkgerrors.Wrap(err, "Create Config Version DB Entry") + } + // Create Result structure + cfgRes := ConfigResult{ + DefinitionName: rbName, + DefinitionVersion: rbVersion, + ProfileName: profileName, + ConfigName: p.ConfigName, + TemplateName: p.TemplateName, + ConfigVersion: version, + } + return cfgRes, nil +} + +// Update an entry for the config in the database +func (v *ConfigClient) Update(rbName, rbVersion, profileName, configName string, p Config) (ConfigResult, error) { + + // Check required fields + if len(p.Values) == 0 { + return ConfigResult{}, pkgerrors.New("Incomplete Configuration Provided") + } + // Check if Config exists + cs := ConfigStore{ + rbName: rbName, + rbVersion: rbVersion, + profileName: profileName, + configName: configName, + } + _, err := cs.getConfig() + if err != nil { + return ConfigResult{}, pkgerrors.Wrap(err, "Update Error - Config doesn't exist") + } + lock, profileChannel := getProfileData(rbName + rbVersion + profileName) + // Acquire per profile Mutex + lock.Lock() + defer lock.Unlock() + err = applyConfig(rbName, rbVersion, profileName, p, profileChannel, "PUT") + if err != nil { + return ConfigResult{}, pkgerrors.Wrap(err, "Apply Config failed") + } + // Update Config DB Entry + configPrev, err := cs.updateConfig(p) + if err != nil { + return ConfigResult{}, pkgerrors.Wrap(err, "Update Config DB Entry") + } + // Create Version Entry in DB for Config + cvs := ConfigVersionStore{ + rbName: rbName, + rbVersion: rbVersion, + profileName: profileName, + } + version, err := cvs.createConfigVersion(p, configPrev, "PUT") + if err != nil { + return ConfigResult{}, pkgerrors.Wrap(err, "Create Config Version DB Entry") + } + // Create Result structure + cfgRes := ConfigResult{ + DefinitionName: rbName, + DefinitionVersion: rbVersion, + ProfileName: profileName, + ConfigName: p.ConfigName, + TemplateName: p.TemplateName, + ConfigVersion: version, + } + return cfgRes, nil +} + +// Get config entry in the database +func (v *ConfigClient) Get(rbName, rbVersion, profileName, configName string) (Config, error) { + + // Acquire per profile Mutex + lock, _ := getProfileData(rbName + rbVersion + profileName) + lock.Lock() + defer lock.Unlock() + // Read Config DB + cs := ConfigStore{ + rbName: rbName, + rbVersion: rbVersion, + profileName: profileName, + configName: configName, + } + cfg, err := cs.getConfig() + if err != nil { + return Config{}, pkgerrors.Wrap(err, "Get Config DB Entry") + } + return cfg, nil +} + +// Delete the Config from database +func (v *ConfigClient) Delete(rbName, rbVersion, profileName, configName string) (ConfigResult, error) { + + // Check if Config exists + cs := ConfigStore{ + rbName: rbName, + rbVersion: rbVersion, + profileName: profileName, + configName: configName, + } + p, err := cs.getConfig() + if err != nil { + return ConfigResult{}, pkgerrors.Wrap(err, "Update Error - Config doesn't exist") + } + lock, profileChannel := getProfileData(rbName + rbVersion + profileName) + // Acquire per profile Mutex + lock.Lock() + defer lock.Unlock() + err = applyConfig(rbName, rbVersion, profileName, p, profileChannel, "DELETE") + if err != nil { + return ConfigResult{}, pkgerrors.Wrap(err, "Apply Config failed") + } + // Delete Config from DB + configPrev, err := cs.deleteConfig() + if err != nil { + return ConfigResult{}, pkgerrors.Wrap(err, "Delete Config DB Entry") + } + // Create Version Entry in DB for Config + cvs := ConfigVersionStore{ + rbName: rbName, + rbVersion: rbVersion, + profileName: profileName, + } + version, err := cvs.createConfigVersion(Config{}, configPrev, "DELETE") + if err != nil { + return ConfigResult{}, pkgerrors.Wrap(err, "Delete Config Version DB Entry") + } + // Create Result structure + cfgRes := ConfigResult{ + DefinitionName: rbName, + DefinitionVersion: rbVersion, + ProfileName: profileName, + ConfigName: configName, + TemplateName: configPrev.TemplateName, + ConfigVersion: version, + } + return cfgRes, nil +} + +// Rollback starts from current version and rollbacks to the version desired +func (v *ConfigClient) Rollback(rbName, rbVersion, profileName string, rback ConfigRollback) error { + + var reqVersion string + var err error + + if rback.AnyOf.ConfigTag != "" { + reqVersion, err = v.GetTagVersion(rbName, rbVersion, profileName, rback.AnyOf.ConfigTag) + if err != nil { + return pkgerrors.Wrap(err, "Rollback Invalid tag") + } + } else if rback.AnyOf.ConfigVersion != "" { + reqVersion = rback.AnyOf.ConfigVersion + } else { + return pkgerrors.Errorf("No valid Index for Rollback") + } + + index, err := strconv.Atoi(reqVersion) + if err != nil { + return pkgerrors.Wrap(err, "Rollback Invalid Index") + } + rollbackIndex := uint(index) + + lock, profileChannel := getProfileData(rbName + rbVersion + profileName) + // Acquire per profile Mutex + lock.Lock() + defer lock.Unlock() + + cvs := ConfigVersionStore{ + rbName: rbName, + rbVersion: rbVersion, + profileName: profileName, + } + currentVersion, err := cvs.getCurrentVersion() + if err != nil { + return pkgerrors.Wrap(err, "Rollback Get Current Config Version ") + } + + if rollbackIndex < 1 && rollbackIndex >= currentVersion { + return pkgerrors.Wrap(err, "Rollback Invalid Config Version") + } + + //Rollback all the intermettinent configurations + for i := currentVersion; i > rollbackIndex; i-- { + configNew, configPrev, action, err := cvs.getConfigVersion(i) + if err != nil { + return pkgerrors.Wrap(err, "Rollback Get Config Version") + } + cs := ConfigStore{ + rbName: rbName, + rbVersion: rbVersion, + profileName: profileName, + configName: configNew.ConfigName, + } + if action == "PUT" { + // PUT is proceeded by PUT or POST + err = applyConfig(rbName, rbVersion, profileName, configPrev, profileChannel, "PUT") + if err != nil { + return pkgerrors.Wrap(err, "Apply Config failed") + } + _, err = cs.updateConfig(configPrev) + if err != nil { + return pkgerrors.Wrap(err, "Update Config DB Entry") + } + } else if action == "POST" { + // POST is always preceeded by Config not existing + err = applyConfig(rbName, rbVersion, profileName, configNew, profileChannel, "DELETE") + if err != nil { + return pkgerrors.Wrap(err, "Delete Config failed") + } + _, err = cs.deleteConfig() + if err != nil { + return pkgerrors.Wrap(err, "Delete Config DB Entry") + } + } else if action == "DELETE" { + // DELETE is proceeded by PUT or POST + err = applyConfig(rbName, rbVersion, profileName, configPrev, profileChannel, "PUT") + if err != nil { + return pkgerrors.Wrap(err, "Delete Config failed") + } + _, err = cs.updateConfig(configPrev) + if err != nil { + return pkgerrors.Wrap(err, "Update Config DB Entry") + } + } + } + for i := currentVersion; i > rollbackIndex; i-- { + // Delete rolled back items + err = cvs.deleteConfigVersion() + if err != nil { + return pkgerrors.Wrap(err, "Delete Config Version ") + } + } + return nil +} + +// Tagit tags the current version with the tag provided +func (v *ConfigClient) Tagit(rbName, rbVersion, profileName string, tag ConfigTagit) error { + + lock, _ := getProfileData(rbName + rbVersion + profileName) + // Acquire per profile Mutex + lock.Lock() + defer lock.Unlock() + + cvs := ConfigVersionStore{ + rbName: rbName, + rbVersion: rbVersion, + profileName: profileName, + } + currentVersion, err := cvs.getCurrentVersion() + if err != nil { + return pkgerrors.Wrap(err, "Get Current Config Version ") + } + tagKey := constructKey(rbName, rbVersion, profileName, v.tagTag, tag.TagName) + + err = db.Etcd.Put(tagKey, strconv.Itoa(int(currentVersion))) + if err != nil { + return pkgerrors.Wrap(err, "TagIt store DB") + } + return nil +} + +// GetTagVersion returns the version associated with the tag +func (v *ConfigClient) GetTagVersion(rbName, rbVersion, profileName, tagName string) (string, error) { + + tagKey := constructKey(rbName, rbVersion, profileName, v.tagTag, tagName) + + value, err := db.Etcd.Get(tagKey) + if err != nil { + return "", pkgerrors.Wrap(err, "Config DB Entry Not found") + } + return string(value), nil +} + +// ApplyAllConfig starts from first configuration version and applies all versions in sequence +func (v *ConfigClient) ApplyAllConfig(rbName, rbVersion, profileName string) error { + + lock, profileChannel := getProfileData(rbName + rbVersion + profileName) + // Acquire per profile Mutex + lock.Lock() + defer lock.Unlock() + + cvs := ConfigVersionStore{ + rbName: rbName, + rbVersion: rbVersion, + profileName: profileName, + } + currentVersion, err := cvs.getCurrentVersion() + if err != nil { + return pkgerrors.Wrap(err, "Get Current Config Version ") + } + if currentVersion < 1 { + return pkgerrors.Wrap(err, "No Config Version to Apply") + } + //Apply all configurations + var i uint + for i = 1; i <= currentVersion; i++ { + configNew, _, action, err := cvs.getConfigVersion(i) + if err != nil { + return pkgerrors.Wrap(err, "Get Config Version") + } + err = applyConfig(rbName, rbVersion, profileName, configNew, profileChannel, action) + if err != nil { + return pkgerrors.Wrap(err, "Apply Config failed") + } + } + return nil +} diff --git a/src/k8splugin/internal/app/config_backend.go b/src/k8splugin/internal/app/config_backend.go new file mode 100644 index 00000000..763aed0d --- /dev/null +++ b/src/k8splugin/internal/app/config_backend.go @@ -0,0 +1,475 @@ +/* + * Copyright 2018 Intel Corporation, Inc + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package app + +import ( + "bytes" + "encoding/json" + "io/ioutil" + "log" + "path/filepath" + "strconv" + "strings" + "sync" + "time" + + "k8splugin/internal/db" + "k8splugin/internal/helm" + "k8splugin/internal/rb" + + "github.com/ghodss/yaml" + pkgerrors "github.com/pkg/errors" +) + +//ConfigStore contains the values that will be stored in the database +type configVersionDBContent struct { + ConfigNew Config `json:"config-new"` + ConfigPrev Config `json:"config-prev"` + Action string `json:"action"` // CRUD opration for this config +} + +//ConfigStore to Store the Config +type ConfigStore struct { + rbName string + rbVersion string + profileName string + configName string +} + +//ConfigVersionStore to Store the Versions of the Config +type ConfigVersionStore struct { + rbName string + rbVersion string + profileName string +} + +type configResourceList struct { + resourceTemplates []helm.KubernetesResourceTemplate + createdResources []helm.KubernetesResource + profile rb.Profile + action string +} + +type profileDataManager struct { + profileLockMap map[string]*sync.Mutex + resourceChannel map[string](chan configResourceList) + sync.Mutex +} + +const ( + storeName = "config" + tagCounter = "counter" + tagVersion = "configversion" + tagConfig = "configdata" +) + +var profileData = profileDataManager{ + profileLockMap: map[string]*sync.Mutex{}, + resourceChannel: map[string]chan configResourceList{}, +} + +// Construct key for storing data +func constructKey(strs ...string) string { + + var sb strings.Builder + sb.WriteString("onapk8s") + sb.WriteString("/") + sb.WriteString(storeName) + sb.WriteString("/") + for _, str := range strs { + sb.WriteString(str) + sb.WriteString("/") + } + return sb.String() + +} + +// Create an entry for the config in the database +func (c ConfigStore) createConfig(p Config) error { + + cfgKey := constructKey(c.rbName, c.rbVersion, c.profileName, tagConfig, p.ConfigName) + _, err := db.Etcd.Get(cfgKey) + if err == nil { + return pkgerrors.Wrap(err, "Config DB Entry Already exists") + } + configValue, err := db.Serialize(p) + if err != nil { + return pkgerrors.Wrap(err, "Serialize Config Value") + } + err = db.Etcd.Put(cfgKey, configValue) + if err != nil { + return pkgerrors.Wrap(err, "Config DB Entry") + } + return nil +} + +// Update the config entry in the database. Updates with the new value +// Returns the previous value of the Config +func (c ConfigStore) updateConfig(p Config) (Config, error) { + + cfgKey := constructKey(c.rbName, c.rbVersion, c.profileName, tagConfig, p.ConfigName) + value, err := db.Etcd.Get(cfgKey) + configPrev := Config{} + if err == nil { + // If updating Config after rollback then previous config may not exist + err = db.DeSerialize(string(value), &configPrev) + if err != nil { + return Config{}, pkgerrors.Wrap(err, "DeSerialize Config Value") + } + } + configValue, err := db.Serialize(p) + if err != nil { + return Config{}, pkgerrors.Wrap(err, "Serialize Config Value") + } + err = db.Etcd.Put(cfgKey, configValue) + if err != nil { + return Config{}, pkgerrors.Wrap(err, "Config DB Entry") + } + return configPrev, nil +} + +// Read the config entry in the database +func (c ConfigStore) getConfig() (Config, error) { + cfgKey := constructKey(c.rbName, c.rbVersion, c.profileName, tagConfig, c.configName) + value, err := db.Etcd.Get(cfgKey) + if err != nil { + return Config{}, pkgerrors.Wrap(err, "Get Config DB Entry") + } + //value is a byte array + if value != nil { + cfg := Config{} + err = db.DeSerialize(string(value), &cfg) + if err != nil { + return Config{}, pkgerrors.Wrap(err, "Unmarshaling Config Value") + } + return cfg, nil + } + return Config{}, pkgerrors.Wrap(err, "Get Config DB Entry") +} + +// Delete the config entry in the database +func (c ConfigStore) deleteConfig() (Config, error) { + + cfgKey := constructKey(c.rbName, c.rbVersion, c.profileName, tagConfig, c.configName) + value, err := db.Etcd.Get(cfgKey) + if err != nil { + return Config{}, pkgerrors.Wrap(err, "Config DB Entry Not found") + } + configPrev := Config{} + err = db.DeSerialize(string(value), &configPrev) + if err != nil { + return Config{}, pkgerrors.Wrap(err, "DeSerialize Config Value") + } + + err = db.Etcd.Delete(cfgKey) + if err != nil { + return Config{}, pkgerrors.Wrap(err, "Config DB Entry") + } + return configPrev, nil +} + +// Create a version for the configuration. If previous config provided that is also stored +func (c ConfigVersionStore) createConfigVersion(configNew, configPrev Config, action string) (uint, error) { + + version, err := c.incrementVersion() + + if err != nil { + return 0, pkgerrors.Wrap(err, "Get Next Version") + } + versionKey := constructKey(c.rbName, c.rbVersion, c.profileName, tagVersion, strconv.Itoa(int(version))) + + var cs configVersionDBContent + cs.Action = action + cs.ConfigNew = configNew + cs.ConfigPrev = configPrev + + configValue, err := db.Serialize(cs) + if err != nil { + return 0, pkgerrors.Wrap(err, "Serialize Config Value") + } + err = db.Etcd.Put(versionKey, configValue) + if err != nil { + return 0, pkgerrors.Wrap(err, "Create Config DB Entry") + } + return version, nil +} + +// Delete current version of the configuration. Configuration always deleted from top +func (c ConfigVersionStore) deleteConfigVersion() error { + + counter, err := c.getCurrentVersion() + + if err != nil { + return pkgerrors.Wrap(err, "Get Next Version") + } + versionKey := constructKey(c.rbName, c.rbVersion, c.profileName, tagVersion, strconv.Itoa(int(counter))) + + err = db.Etcd.Delete(versionKey) + if err != nil { + return pkgerrors.Wrap(err, "Delete Config DB Entry") + } + err = c.decrementVersion() + if err != nil { + return pkgerrors.Wrap(err, "Decrement Version") + } + return nil +} + +// Read the specified version of the configuration and return its prev and current value. +// Also returns the action for the config version +func (c ConfigVersionStore) getConfigVersion(version uint) (Config, Config, string, error) { + + versionKey := constructKey(c.rbName, c.rbVersion, c.profileName, tagVersion, strconv.Itoa(int(version))) + configBytes, err := db.Etcd.Get(versionKey) + if err != nil { + return Config{}, Config{}, "", pkgerrors.Wrap(err, "Get Config Version ") + } + + if configBytes != nil { + pr := configVersionDBContent{} + err = db.DeSerialize(string(configBytes), &pr) + if err != nil { + return Config{}, Config{}, "", pkgerrors.Wrap(err, "DeSerialize Config Version") + } + return pr.ConfigNew, pr.ConfigPrev, pr.Action, nil + } + return Config{}, Config{}, "", pkgerrors.Wrap(err, "Invalid data ") +} + +// Get the counter for the version +func (c ConfigVersionStore) getCurrentVersion() (uint, error) { + + cfgKey := constructKey(c.rbName, c.rbVersion, c.profileName, tagCounter) + + value, err := db.Etcd.Get(cfgKey) + if err != nil { + if strings.Contains(err.Error(), "Key doesn't exist") == true { + // Counter not started yet, 0 is invalid value + return 0, nil + } else { + return 0, pkgerrors.Wrap(err, "Get Current Version") + } + } + + index, err := strconv.Atoi(string(value)) + if err != nil { + return 0, pkgerrors.Wrap(err, "Invalid counter") + } + return uint(index), nil +} + +// Update the counter for the version +func (c ConfigVersionStore) updateVersion(counter uint) error { + + cfgKey := constructKey(c.rbName, c.rbVersion, c.profileName, tagCounter) + err := db.Etcd.Put(cfgKey, strconv.Itoa(int(counter))) + if err != nil { + return pkgerrors.Wrap(err, "Counter DB Entry") + } + return nil +} + +// Increment the version counter +func (c ConfigVersionStore) incrementVersion() (uint, error) { + + counter, err := c.getCurrentVersion() + if err != nil { + return 0, pkgerrors.Wrap(err, "Get Next Counter Value") + } + //This is done while Profile lock is taken + counter++ + err = c.updateVersion(counter) + if err != nil { + return 0, pkgerrors.Wrap(err, "Store Next Counter Value") + } + + return counter, nil +} + +// Decrement the version counter +func (c ConfigVersionStore) decrementVersion() error { + + counter, err := c.getCurrentVersion() + if err != nil { + return pkgerrors.Wrap(err, "Get Next Counter Value") + } + //This is done while Profile lock is taken + counter-- + err = c.updateVersion(counter) + if err != nil { + return pkgerrors.Wrap(err, "Store Next Counter Value") + } + + return nil +} + +// Apply Config +func applyConfig(rbName, rbVersion, profileName string, p Config, pChannel chan configResourceList, action string) error { + + // Get Template and Resolve the template with values + crl, err := resolve(rbName, rbVersion, profileName, p) + if err != nil { + return pkgerrors.Wrap(err, "Resolve Config") + } + crl.action = action + // Send the configResourceList to the channel. Using select for non-blocking channel + select { + case pChannel <- crl: + log.Printf("Message Sent to goroutine %v", crl.profile) + default: + } + + return nil +} + +// Per Profile Go routine to apply the configuration to Cloud Region +func scheduleResources(c chan configResourceList) { + // Keep thread running + for { + data := <-c + //TODO: ADD Check to see if Application running + ic := NewInstanceClient() + resp, err := ic.Find(data.profile.RBName, data.profile.RBVersion, data.profile.ProfileName) + if err != nil || len(resp) == 0 { + log.Println("Error finding a running instance. Retrying later...") + time.Sleep(time.Second * 10) + continue + } + switch { + case data.action == "POST": + log.Printf("[scheduleResources]: POST %v %v", data.profile, data.resourceTemplates) + for _, inst := range resp { + k8sClient := KubernetesClient{} + err = k8sClient.init(inst.CloudRegion) + if err != nil { + log.Printf("Getting CloudRegion Information: %s", err.Error()) + //Move onto the next cloud region + continue + } + data.createdResources, err = k8sClient.createResources(data.resourceTemplates, inst.Namespace) + if err != nil { + log.Printf("Error Creating resources: %s", err.Error()) + continue + } + } + //TODO: Needs to add code to call Kubectl create + case data.action == "PUT": + log.Printf("[scheduleResources]: PUT %v %v", data.profile, data.resourceTemplates) + //TODO: Needs to add code to call Kubectl apply + case data.action == "DELETE": + log.Printf("[scheduleResources]: DELETE %v %v", data.profile, data.resourceTemplates) + for _, inst := range resp { + k8sClient := KubernetesClient{} + err = k8sClient.init(inst.CloudRegion) + if err != nil { + log.Printf("Getting CloudRegion Information: %s", err.Error()) + //Move onto the next cloud region + continue + } + err = k8sClient.deleteResources(data.createdResources, inst.Namespace) + if err != nil { + log.Printf("Error Deleting resources: %s", err.Error()) + continue + } + } + } + } +} + +//Resolve returns the path where the helm chart merged with +//configuration overrides resides. +var resolve = func(rbName, rbVersion, profileName string, p Config) (configResourceList, error) { + + var resTemplates []helm.KubernetesResourceTemplate + + profile, err := rb.NewProfileClient().Get(rbName, rbVersion, profileName) + if err != nil { + return configResourceList{}, pkgerrors.Wrap(err, "Reading Profile Data") + } + + t, err := rb.NewConfigTemplateClient().Get(rbName, rbVersion, p.TemplateName) + if err != nil { + return configResourceList{}, pkgerrors.Wrap(err, "Getting Template") + } + if t.ChartName == "" { + return configResourceList{}, pkgerrors.New("Invalid template no Chart.yaml file found") + } + + def, err := rb.NewConfigTemplateClient().Download(rbName, rbVersion, p.TemplateName) + if err != nil { + return configResourceList{}, pkgerrors.Wrap(err, "Downloading Template") + } + + //Create a temp file in the system temp folder for values input + b, err := json.Marshal(p.Values) + if err != nil { + return configResourceList{}, pkgerrors.Wrap(err, "Error Marshalling config data") + } + data, err := yaml.JSONToYAML(b) + if err != nil { + return configResourceList{}, pkgerrors.Wrap(err, "JSON to YAML") + } + + outputfile, err := ioutil.TempFile("", "helm-config-values-") + if err != nil { + return configResourceList{}, pkgerrors.Wrap(err, "Got error creating temp file") + } + _, err = outputfile.Write([]byte(data)) + if err != nil { + return configResourceList{}, pkgerrors.Wrap(err, "Got error writting temp file") + } + defer outputfile.Close() + + chartBasePath, err := rb.ExtractTarBall(bytes.NewBuffer(def)) + if err != nil { + return configResourceList{}, pkgerrors.Wrap(err, "Extracting Template") + } + + helmClient := helm.NewTemplateClient(profile.KubernetesVersion, + profile.Namespace, + profile.ReleaseName) + + chartPath := filepath.Join(chartBasePath, t.ChartName) + resTemplates, err = helmClient.GenerateKubernetesArtifacts(chartPath, + []string{outputfile.Name()}, + nil) + if err != nil { + return configResourceList{}, pkgerrors.Wrap(err, "Generate final k8s yaml") + } + crl := configResourceList{ + resourceTemplates: resTemplates, + profile: profile, + } + + return crl, nil +} + +// Get the Mutex for the Profile +func getProfileData(key string) (*sync.Mutex, chan configResourceList) { + profileData.Lock() + defer profileData.Unlock() + _, ok := profileData.profileLockMap[key] + if !ok { + profileData.profileLockMap[key] = &sync.Mutex{} + } + _, ok = profileData.resourceChannel[key] + if !ok { + profileData.resourceChannel[key] = make(chan configResourceList) + go scheduleResources(profileData.resourceChannel[key]) + } + return profileData.profileLockMap[key], profileData.resourceChannel[key] +} diff --git a/src/k8splugin/internal/app/config_test.go b/src/k8splugin/internal/app/config_test.go new file mode 100644 index 00000000..11a300ff --- /dev/null +++ b/src/k8splugin/internal/app/config_test.go @@ -0,0 +1,259 @@ +/* + * Copyright 2018 Intel Corporation, Inc + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package app + +import ( + "k8splugin/internal/db" + "reflect" + "strings" + "testing" + // pkgerrors "github.com/pkg/errors" +) + +func TestCreateConfig(t *testing.T) { + testCases := []struct { + label string + rbName string + rbVersion string + profileName string + inp Config + expectedError string + mockdb *db.MockEtcdClient + expected ConfigResult + }{ + { + label: "Create Config", + rbName: "testdef1", + rbVersion: "v1", + profileName: "testprofile1", + inp: Config{ + ConfigName: "testconfig1", + TemplateName: "testtemplate1", + Values: map[string]interface{}{ + "values": "{\"namespace\": \"kafka\", \"topic\": {\"name\":\"orders\", \"cluster\":\"my-cluster\", \"partitions\": 10,\"replicas\": 2, }}"}, + }, + expected: ConfigResult{ + DefinitionName: "testdef1", + DefinitionVersion: "v1", + ProfileName: "testprofile1", + ConfigName: "testconfig1", + TemplateName: "testtemplate1", + ConfigVersion: 1, + }, + expectedError: "", + mockdb: &db.MockEtcdClient{ + Items: nil, + Err: nil, + }, + }, + } + + for _, testCase := range testCases { + t.Run(testCase.label, func(t *testing.T) { + db.Etcd = testCase.mockdb + resolve = func(rbName, rbVersion, profileName string, p Config) (configResourceList, error) { + return configResourceList{}, nil + } + impl := NewConfigClient() + got, err := impl.Create(testCase.rbName, testCase.rbVersion, testCase.profileName, testCase.inp) + if err != nil { + if testCase.expectedError == "" { + t.Fatalf("Create returned an unexpected error %s", err) + } + if strings.Contains(err.Error(), testCase.expectedError) == false { + t.Fatalf("Create returned an unexpected error %s", err) + } + } else { + if reflect.DeepEqual(testCase.expected, got) == false { + t.Errorf("Create Resource Bundle returned unexpected body: got %v;"+ + " expected %v", got, testCase.expected) + } + } + }) + } +} + +func TestRollbackConfig(t *testing.T) { + testCases := []struct { + label string + rbName string + rbVersion string + profileName string + inp Config + inpUpdate1 Config + inpUpdate2 Config + expectedError string + mockdb *db.MockEtcdClient + expected1 ConfigResult + expected2 ConfigResult + expected3 ConfigResult + expected4 ConfigResult + rollbackConfig ConfigRollback + }{ + { + label: "Rollback Config", + rbName: "testdef1", + rbVersion: "v1", + profileName: "testprofile1", + inp: Config{ + ConfigName: "testconfig1", + TemplateName: "testtemplate1", + Values: map[string]interface{}{ + "values": "{\"namespace\": \"kafka\", \"topic\": {\"name\":\"orders\", \"cluster\":\"my-cluster\", \"partitions\": 10,\"replicas\": 2, }}"}, + }, + inpUpdate1: Config{ + ConfigName: "testconfig1", + TemplateName: "testtemplate1", + Values: map[string]interface{}{ + "values": "{\"namespace\": \"kafka\", \"topic\": {\"name\":\"orders\", \"cluster\":\"my-cluster\", \"partitions\": 20,\"replicas\": 2, }}"}, + }, + inpUpdate2: Config{ + ConfigName: "testconfig1", + TemplateName: "testtemplate1", + Values: map[string]interface{}{ + "values": "{\"namespace\": \"kafka\", \"topic\": {\"name\":\"orders\", \"cluster\":\"my-cluster\", \"partitions\": 30,\"replicas\": 2, }}"}, + }, + expected1: ConfigResult{ + DefinitionName: "testdef1", + DefinitionVersion: "v1", + ProfileName: "testprofile1", + ConfigName: "testconfig1", + TemplateName: "testtemplate1", + ConfigVersion: 1, + }, + expected2: ConfigResult{ + DefinitionName: "testdef1", + DefinitionVersion: "v1", + ProfileName: "testprofile1", + ConfigName: "testconfig1", + TemplateName: "testtemplate1", + ConfigVersion: 2, + }, + expected3: ConfigResult{ + DefinitionName: "testdef1", + DefinitionVersion: "v1", + ProfileName: "testprofile1", + ConfigName: "testconfig1", + TemplateName: "testtemplate1", + ConfigVersion: 3, + }, + expected4: ConfigResult{ + DefinitionName: "testdef1", + DefinitionVersion: "v1", + ProfileName: "testprofile1", + ConfigName: "testconfig1", + TemplateName: "testtemplate1", + ConfigVersion: 4, + }, + expectedError: "", + mockdb: &db.MockEtcdClient{ + Items: nil, + Err: nil, + }, + }, + } + + for _, testCase := range testCases { + t.Run(testCase.label, func(t *testing.T) { + db.Etcd = testCase.mockdb + resolve = func(rbName, rbVersion, profileName string, p Config) (configResourceList, error) { + return configResourceList{}, nil + } + impl := NewConfigClient() + got, err := impl.Create(testCase.rbName, testCase.rbVersion, testCase.profileName, testCase.inp) + if err != nil { + if testCase.expectedError == "" { + t.Fatalf("Create returned an unexpected error %s", err) + } + if strings.Contains(err.Error(), testCase.expectedError) == false { + t.Fatalf("Create returned an unexpected error %s", err) + } + } else { + if reflect.DeepEqual(testCase.expected1, got) == false { + t.Errorf("Create Resource Bundle returned unexpected body: got %v;"+ + " expected %v", got, testCase.expected1) + } + } + got, err = impl.Update(testCase.rbName, testCase.rbVersion, testCase.profileName, testCase.inp.ConfigName, testCase.inpUpdate1) + if err != nil { + if testCase.expectedError == "" { + t.Fatalf("Create returned an unexpected error %s", err) + } + if strings.Contains(err.Error(), testCase.expectedError) == false { + t.Fatalf("Create returned an unexpected error %s", err) + } + } else { + if reflect.DeepEqual(testCase.expected2, got) == false { + t.Errorf("Create Resource Bundle returned unexpected body: got %v;"+ + " expected %v", got, testCase.expected2) + } + } + got, err = impl.Update(testCase.rbName, testCase.rbVersion, testCase.profileName, testCase.inp.ConfigName, testCase.inpUpdate2) + if err != nil { + if testCase.expectedError == "" { + t.Fatalf("Create returned an unexpected error %s", err) + } + if strings.Contains(err.Error(), testCase.expectedError) == false { + t.Fatalf("Create returned an unexpected error %s", err) + } + } else { + if reflect.DeepEqual(testCase.expected3, got) == false { + t.Errorf("Create Resource Bundle returned unexpected body: got %v;"+ + " expected %v", got, testCase.expected3) + } + } + got, err = impl.Delete(testCase.rbName, testCase.rbVersion, testCase.profileName, testCase.inp.ConfigName) + if err != nil { + if testCase.expectedError == "" { + t.Fatalf("Create returned an unexpected error %s", err) + } + if strings.Contains(err.Error(), testCase.expectedError) == false { + t.Fatalf("Create returned an unexpected error %s", err) + } + } else { + if reflect.DeepEqual(testCase.expected4, got) == false { + t.Errorf("Create Resource Bundle returned unexpected body: got %v;"+ + " expected %v", got, testCase.expected4) + } + } + testCase.rollbackConfig.AnyOf.ConfigVersion = "2" + err = impl.Rollback(testCase.rbName, testCase.rbVersion, testCase.profileName, testCase.rollbackConfig) + if err != nil { + if testCase.expectedError == "" { + t.Fatalf("Create returned an unexpected error %s", err) + } + if strings.Contains(err.Error(), testCase.expectedError) == false { + t.Fatalf("Create returned an unexpected error %s", err) + } + } + rollbackConfig, err := impl.Get(testCase.rbName, testCase.rbVersion, testCase.profileName, testCase.inp.ConfigName) + if err != nil { + if testCase.expectedError == "" { + t.Fatalf("Create returned an unexpected error %s", err) + } + if strings.Contains(err.Error(), testCase.expectedError) == false { + t.Fatalf("Create returned an unexpected error %s", err) + } + } else { + if reflect.DeepEqual(testCase.inpUpdate1, rollbackConfig) == false { + t.Errorf("Rollback config failed: got %v;"+ + " expected %v", rollbackConfig, testCase.inpUpdate1) + } + } + }) + } +} |