summaryrefslogtreecommitdiffstats
path: root/src/k8splugin/internal/app
diff options
context:
space:
mode:
Diffstat (limited to 'src/k8splugin/internal/app')
-rw-r--r--src/k8splugin/internal/app/config.go438
-rw-r--r--src/k8splugin/internal/app/config_backend.go475
-rw-r--r--src/k8splugin/internal/app/config_test.go259
3 files changed, 1172 insertions, 0 deletions
diff --git a/src/k8splugin/internal/app/config.go b/src/k8splugin/internal/app/config.go
new file mode 100644
index 00000000..f7e81358
--- /dev/null
+++ b/src/k8splugin/internal/app/config.go
@@ -0,0 +1,438 @@
+/*
+ * Copyright 2018 Intel Corporation, Inc
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package app
+
+import (
+ "strconv"
+ "strings"
+
+ "k8splugin/internal/db"
+
+ pkgerrors "github.com/pkg/errors"
+)
+
+// Config contains the parameters needed for configuration
+type Config struct {
+ ConfigName string `json:"config-name"`
+ TemplateName string `json:"template-name"`
+ Description string `json:"description"`
+ Values map[string]interface{} `json:"values"`
+}
+
+//ConfigResult output for Create, Update and delete
+type ConfigResult struct {
+ DefinitionName string `json:"rb-name"`
+ DefinitionVersion string `json:"rb-version"`
+ ProfileName string `json:"profile-name"`
+ ConfigName string `json:"config-name"`
+ TemplateName string `json:"template-name"`
+ ConfigVersion uint `json:"config-verion"`
+}
+
+//ConfigRollback input
+type ConfigRollback struct {
+ AnyOf struct {
+ ConfigVersion string `json:"config-version,omitempty"`
+ ConfigTag string `json:"config-tag,omitempty"`
+ } `json:"anyOf"`
+}
+
+//ConfigTagit for Tagging configurations
+type ConfigTagit struct {
+ TagName string `json:"tag-name"`
+}
+
+// ConfigManager is an interface exposes the config functionality
+type ConfigManager interface {
+ Create(rbName, rbVersion, profileName string, p Config) (ConfigResult, error)
+ Get(rbName, rbVersion, profileName, configName string) (Config, error)
+ Help() map[string]string
+ Update(rbName, rbVersion, profileName, configName string, p Config) (ConfigResult, error)
+ Delete(rbName, rbVersion, profileName, configName string) (ConfigResult, error)
+ Rollback(rbName, rbVersion, profileName string, p ConfigRollback) error
+ Tagit(rbName, rbVersion, profileName string, p ConfigTagit) error
+}
+
+// ConfigClient implements the ConfigManager
+// It will also be used to maintain some localized state
+type ConfigClient struct {
+ tagTag string
+}
+
+// NewConfigClient returns an instance of the ConfigClient
+// which implements the ConfigManager
+func NewConfigClient() *ConfigClient {
+ return &ConfigClient{
+ tagTag: "tag",
+ }
+}
+
+// Help returns some information on how to create the content
+// for the config in the form of html formatted page
+func (v *ConfigClient) Help() map[string]string {
+ ret := make(map[string]string)
+
+ return ret
+}
+
+// Create an entry for the config in the database
+func (v *ConfigClient) Create(rbName, rbVersion, profileName string, p Config) (ConfigResult, error) {
+
+ // Check required fields
+ if p.ConfigName == "" || p.TemplateName == "" || len(p.Values) == 0 {
+ return ConfigResult{}, pkgerrors.New("Incomplete Configuration Provided")
+ }
+ cs := ConfigStore{
+ rbName: rbName,
+ rbVersion: rbVersion,
+ profileName: profileName,
+ configName: p.ConfigName,
+ }
+ _, err := cs.getConfig()
+ if err == nil {
+ return ConfigResult{}, pkgerrors.Wrap(err, "Create Error - Config exists")
+ } else {
+ if strings.Contains(err.Error(), "Key doesn't exist") == false {
+ return ConfigResult{}, pkgerrors.Wrap(err, "Create Error")
+ }
+ }
+ lock, profileChannel := getProfileData(rbName + rbVersion + profileName)
+ // Acquire per profile Mutex
+ lock.Lock()
+ defer lock.Unlock()
+ err = applyConfig(rbName, rbVersion, profileName, p, profileChannel, "POST")
+ if err != nil {
+ return ConfigResult{}, pkgerrors.Wrap(err, "Apply Config failed")
+ }
+ // Create Config DB Entry
+ err = cs.createConfig(p)
+ if err != nil {
+ return ConfigResult{}, pkgerrors.Wrap(err, "Create Config DB Entry")
+ }
+ // Create Version Entry in DB for Config
+ cvs := ConfigVersionStore{
+ rbName: rbName,
+ rbVersion: rbVersion,
+ profileName: profileName,
+ }
+ version, err := cvs.createConfigVersion(p, Config{}, "POST")
+ if err != nil {
+ return ConfigResult{}, pkgerrors.Wrap(err, "Create Config Version DB Entry")
+ }
+ // Create Result structure
+ cfgRes := ConfigResult{
+ DefinitionName: rbName,
+ DefinitionVersion: rbVersion,
+ ProfileName: profileName,
+ ConfigName: p.ConfigName,
+ TemplateName: p.TemplateName,
+ ConfigVersion: version,
+ }
+ return cfgRes, nil
+}
+
+// Update an entry for the config in the database
+func (v *ConfigClient) Update(rbName, rbVersion, profileName, configName string, p Config) (ConfigResult, error) {
+
+ // Check required fields
+ if len(p.Values) == 0 {
+ return ConfigResult{}, pkgerrors.New("Incomplete Configuration Provided")
+ }
+ // Check if Config exists
+ cs := ConfigStore{
+ rbName: rbName,
+ rbVersion: rbVersion,
+ profileName: profileName,
+ configName: configName,
+ }
+ _, err := cs.getConfig()
+ if err != nil {
+ return ConfigResult{}, pkgerrors.Wrap(err, "Update Error - Config doesn't exist")
+ }
+ lock, profileChannel := getProfileData(rbName + rbVersion + profileName)
+ // Acquire per profile Mutex
+ lock.Lock()
+ defer lock.Unlock()
+ err = applyConfig(rbName, rbVersion, profileName, p, profileChannel, "PUT")
+ if err != nil {
+ return ConfigResult{}, pkgerrors.Wrap(err, "Apply Config failed")
+ }
+ // Update Config DB Entry
+ configPrev, err := cs.updateConfig(p)
+ if err != nil {
+ return ConfigResult{}, pkgerrors.Wrap(err, "Update Config DB Entry")
+ }
+ // Create Version Entry in DB for Config
+ cvs := ConfigVersionStore{
+ rbName: rbName,
+ rbVersion: rbVersion,
+ profileName: profileName,
+ }
+ version, err := cvs.createConfigVersion(p, configPrev, "PUT")
+ if err != nil {
+ return ConfigResult{}, pkgerrors.Wrap(err, "Create Config Version DB Entry")
+ }
+ // Create Result structure
+ cfgRes := ConfigResult{
+ DefinitionName: rbName,
+ DefinitionVersion: rbVersion,
+ ProfileName: profileName,
+ ConfigName: p.ConfigName,
+ TemplateName: p.TemplateName,
+ ConfigVersion: version,
+ }
+ return cfgRes, nil
+}
+
+// Get config entry in the database
+func (v *ConfigClient) Get(rbName, rbVersion, profileName, configName string) (Config, error) {
+
+ // Acquire per profile Mutex
+ lock, _ := getProfileData(rbName + rbVersion + profileName)
+ lock.Lock()
+ defer lock.Unlock()
+ // Read Config DB
+ cs := ConfigStore{
+ rbName: rbName,
+ rbVersion: rbVersion,
+ profileName: profileName,
+ configName: configName,
+ }
+ cfg, err := cs.getConfig()
+ if err != nil {
+ return Config{}, pkgerrors.Wrap(err, "Get Config DB Entry")
+ }
+ return cfg, nil
+}
+
+// Delete the Config from database
+func (v *ConfigClient) Delete(rbName, rbVersion, profileName, configName string) (ConfigResult, error) {
+
+ // Check if Config exists
+ cs := ConfigStore{
+ rbName: rbName,
+ rbVersion: rbVersion,
+ profileName: profileName,
+ configName: configName,
+ }
+ p, err := cs.getConfig()
+ if err != nil {
+ return ConfigResult{}, pkgerrors.Wrap(err, "Update Error - Config doesn't exist")
+ }
+ lock, profileChannel := getProfileData(rbName + rbVersion + profileName)
+ // Acquire per profile Mutex
+ lock.Lock()
+ defer lock.Unlock()
+ err = applyConfig(rbName, rbVersion, profileName, p, profileChannel, "DELETE")
+ if err != nil {
+ return ConfigResult{}, pkgerrors.Wrap(err, "Apply Config failed")
+ }
+ // Delete Config from DB
+ configPrev, err := cs.deleteConfig()
+ if err != nil {
+ return ConfigResult{}, pkgerrors.Wrap(err, "Delete Config DB Entry")
+ }
+ // Create Version Entry in DB for Config
+ cvs := ConfigVersionStore{
+ rbName: rbName,
+ rbVersion: rbVersion,
+ profileName: profileName,
+ }
+ version, err := cvs.createConfigVersion(Config{}, configPrev, "DELETE")
+ if err != nil {
+ return ConfigResult{}, pkgerrors.Wrap(err, "Delete Config Version DB Entry")
+ }
+ // Create Result structure
+ cfgRes := ConfigResult{
+ DefinitionName: rbName,
+ DefinitionVersion: rbVersion,
+ ProfileName: profileName,
+ ConfigName: configName,
+ TemplateName: configPrev.TemplateName,
+ ConfigVersion: version,
+ }
+ return cfgRes, nil
+}
+
+// Rollback starts from current version and rollbacks to the version desired
+func (v *ConfigClient) Rollback(rbName, rbVersion, profileName string, rback ConfigRollback) error {
+
+ var reqVersion string
+ var err error
+
+ if rback.AnyOf.ConfigTag != "" {
+ reqVersion, err = v.GetTagVersion(rbName, rbVersion, profileName, rback.AnyOf.ConfigTag)
+ if err != nil {
+ return pkgerrors.Wrap(err, "Rollback Invalid tag")
+ }
+ } else if rback.AnyOf.ConfigVersion != "" {
+ reqVersion = rback.AnyOf.ConfigVersion
+ } else {
+ return pkgerrors.Errorf("No valid Index for Rollback")
+ }
+
+ index, err := strconv.Atoi(reqVersion)
+ if err != nil {
+ return pkgerrors.Wrap(err, "Rollback Invalid Index")
+ }
+ rollbackIndex := uint(index)
+
+ lock, profileChannel := getProfileData(rbName + rbVersion + profileName)
+ // Acquire per profile Mutex
+ lock.Lock()
+ defer lock.Unlock()
+
+ cvs := ConfigVersionStore{
+ rbName: rbName,
+ rbVersion: rbVersion,
+ profileName: profileName,
+ }
+ currentVersion, err := cvs.getCurrentVersion()
+ if err != nil {
+ return pkgerrors.Wrap(err, "Rollback Get Current Config Version ")
+ }
+
+ if rollbackIndex < 1 && rollbackIndex >= currentVersion {
+ return pkgerrors.Wrap(err, "Rollback Invalid Config Version")
+ }
+
+ //Rollback all the intermettinent configurations
+ for i := currentVersion; i > rollbackIndex; i-- {
+ configNew, configPrev, action, err := cvs.getConfigVersion(i)
+ if err != nil {
+ return pkgerrors.Wrap(err, "Rollback Get Config Version")
+ }
+ cs := ConfigStore{
+ rbName: rbName,
+ rbVersion: rbVersion,
+ profileName: profileName,
+ configName: configNew.ConfigName,
+ }
+ if action == "PUT" {
+ // PUT is proceeded by PUT or POST
+ err = applyConfig(rbName, rbVersion, profileName, configPrev, profileChannel, "PUT")
+ if err != nil {
+ return pkgerrors.Wrap(err, "Apply Config failed")
+ }
+ _, err = cs.updateConfig(configPrev)
+ if err != nil {
+ return pkgerrors.Wrap(err, "Update Config DB Entry")
+ }
+ } else if action == "POST" {
+ // POST is always preceeded by Config not existing
+ err = applyConfig(rbName, rbVersion, profileName, configNew, profileChannel, "DELETE")
+ if err != nil {
+ return pkgerrors.Wrap(err, "Delete Config failed")
+ }
+ _, err = cs.deleteConfig()
+ if err != nil {
+ return pkgerrors.Wrap(err, "Delete Config DB Entry")
+ }
+ } else if action == "DELETE" {
+ // DELETE is proceeded by PUT or POST
+ err = applyConfig(rbName, rbVersion, profileName, configPrev, profileChannel, "PUT")
+ if err != nil {
+ return pkgerrors.Wrap(err, "Delete Config failed")
+ }
+ _, err = cs.updateConfig(configPrev)
+ if err != nil {
+ return pkgerrors.Wrap(err, "Update Config DB Entry")
+ }
+ }
+ }
+ for i := currentVersion; i > rollbackIndex; i-- {
+ // Delete rolled back items
+ err = cvs.deleteConfigVersion()
+ if err != nil {
+ return pkgerrors.Wrap(err, "Delete Config Version ")
+ }
+ }
+ return nil
+}
+
+// Tagit tags the current version with the tag provided
+func (v *ConfigClient) Tagit(rbName, rbVersion, profileName string, tag ConfigTagit) error {
+
+ lock, _ := getProfileData(rbName + rbVersion + profileName)
+ // Acquire per profile Mutex
+ lock.Lock()
+ defer lock.Unlock()
+
+ cvs := ConfigVersionStore{
+ rbName: rbName,
+ rbVersion: rbVersion,
+ profileName: profileName,
+ }
+ currentVersion, err := cvs.getCurrentVersion()
+ if err != nil {
+ return pkgerrors.Wrap(err, "Get Current Config Version ")
+ }
+ tagKey := constructKey(rbName, rbVersion, profileName, v.tagTag, tag.TagName)
+
+ err = db.Etcd.Put(tagKey, strconv.Itoa(int(currentVersion)))
+ if err != nil {
+ return pkgerrors.Wrap(err, "TagIt store DB")
+ }
+ return nil
+}
+
+// GetTagVersion returns the version associated with the tag
+func (v *ConfigClient) GetTagVersion(rbName, rbVersion, profileName, tagName string) (string, error) {
+
+ tagKey := constructKey(rbName, rbVersion, profileName, v.tagTag, tagName)
+
+ value, err := db.Etcd.Get(tagKey)
+ if err != nil {
+ return "", pkgerrors.Wrap(err, "Config DB Entry Not found")
+ }
+ return string(value), nil
+}
+
+// ApplyAllConfig starts from first configuration version and applies all versions in sequence
+func (v *ConfigClient) ApplyAllConfig(rbName, rbVersion, profileName string) error {
+
+ lock, profileChannel := getProfileData(rbName + rbVersion + profileName)
+ // Acquire per profile Mutex
+ lock.Lock()
+ defer lock.Unlock()
+
+ cvs := ConfigVersionStore{
+ rbName: rbName,
+ rbVersion: rbVersion,
+ profileName: profileName,
+ }
+ currentVersion, err := cvs.getCurrentVersion()
+ if err != nil {
+ return pkgerrors.Wrap(err, "Get Current Config Version ")
+ }
+ if currentVersion < 1 {
+ return pkgerrors.Wrap(err, "No Config Version to Apply")
+ }
+ //Apply all configurations
+ var i uint
+ for i = 1; i <= currentVersion; i++ {
+ configNew, _, action, err := cvs.getConfigVersion(i)
+ if err != nil {
+ return pkgerrors.Wrap(err, "Get Config Version")
+ }
+ err = applyConfig(rbName, rbVersion, profileName, configNew, profileChannel, action)
+ if err != nil {
+ return pkgerrors.Wrap(err, "Apply Config failed")
+ }
+ }
+ return nil
+}
diff --git a/src/k8splugin/internal/app/config_backend.go b/src/k8splugin/internal/app/config_backend.go
new file mode 100644
index 00000000..763aed0d
--- /dev/null
+++ b/src/k8splugin/internal/app/config_backend.go
@@ -0,0 +1,475 @@
+/*
+ * Copyright 2018 Intel Corporation, Inc
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package app
+
+import (
+ "bytes"
+ "encoding/json"
+ "io/ioutil"
+ "log"
+ "path/filepath"
+ "strconv"
+ "strings"
+ "sync"
+ "time"
+
+ "k8splugin/internal/db"
+ "k8splugin/internal/helm"
+ "k8splugin/internal/rb"
+
+ "github.com/ghodss/yaml"
+ pkgerrors "github.com/pkg/errors"
+)
+
+//ConfigStore contains the values that will be stored in the database
+type configVersionDBContent struct {
+ ConfigNew Config `json:"config-new"`
+ ConfigPrev Config `json:"config-prev"`
+ Action string `json:"action"` // CRUD opration for this config
+}
+
+//ConfigStore to Store the Config
+type ConfigStore struct {
+ rbName string
+ rbVersion string
+ profileName string
+ configName string
+}
+
+//ConfigVersionStore to Store the Versions of the Config
+type ConfigVersionStore struct {
+ rbName string
+ rbVersion string
+ profileName string
+}
+
+type configResourceList struct {
+ resourceTemplates []helm.KubernetesResourceTemplate
+ createdResources []helm.KubernetesResource
+ profile rb.Profile
+ action string
+}
+
+type profileDataManager struct {
+ profileLockMap map[string]*sync.Mutex
+ resourceChannel map[string](chan configResourceList)
+ sync.Mutex
+}
+
+const (
+ storeName = "config"
+ tagCounter = "counter"
+ tagVersion = "configversion"
+ tagConfig = "configdata"
+)
+
+var profileData = profileDataManager{
+ profileLockMap: map[string]*sync.Mutex{},
+ resourceChannel: map[string]chan configResourceList{},
+}
+
+// Construct key for storing data
+func constructKey(strs ...string) string {
+
+ var sb strings.Builder
+ sb.WriteString("onapk8s")
+ sb.WriteString("/")
+ sb.WriteString(storeName)
+ sb.WriteString("/")
+ for _, str := range strs {
+ sb.WriteString(str)
+ sb.WriteString("/")
+ }
+ return sb.String()
+
+}
+
+// Create an entry for the config in the database
+func (c ConfigStore) createConfig(p Config) error {
+
+ cfgKey := constructKey(c.rbName, c.rbVersion, c.profileName, tagConfig, p.ConfigName)
+ _, err := db.Etcd.Get(cfgKey)
+ if err == nil {
+ return pkgerrors.Wrap(err, "Config DB Entry Already exists")
+ }
+ configValue, err := db.Serialize(p)
+ if err != nil {
+ return pkgerrors.Wrap(err, "Serialize Config Value")
+ }
+ err = db.Etcd.Put(cfgKey, configValue)
+ if err != nil {
+ return pkgerrors.Wrap(err, "Config DB Entry")
+ }
+ return nil
+}
+
+// Update the config entry in the database. Updates with the new value
+// Returns the previous value of the Config
+func (c ConfigStore) updateConfig(p Config) (Config, error) {
+
+ cfgKey := constructKey(c.rbName, c.rbVersion, c.profileName, tagConfig, p.ConfigName)
+ value, err := db.Etcd.Get(cfgKey)
+ configPrev := Config{}
+ if err == nil {
+ // If updating Config after rollback then previous config may not exist
+ err = db.DeSerialize(string(value), &configPrev)
+ if err != nil {
+ return Config{}, pkgerrors.Wrap(err, "DeSerialize Config Value")
+ }
+ }
+ configValue, err := db.Serialize(p)
+ if err != nil {
+ return Config{}, pkgerrors.Wrap(err, "Serialize Config Value")
+ }
+ err = db.Etcd.Put(cfgKey, configValue)
+ if err != nil {
+ return Config{}, pkgerrors.Wrap(err, "Config DB Entry")
+ }
+ return configPrev, nil
+}
+
+// Read the config entry in the database
+func (c ConfigStore) getConfig() (Config, error) {
+ cfgKey := constructKey(c.rbName, c.rbVersion, c.profileName, tagConfig, c.configName)
+ value, err := db.Etcd.Get(cfgKey)
+ if err != nil {
+ return Config{}, pkgerrors.Wrap(err, "Get Config DB Entry")
+ }
+ //value is a byte array
+ if value != nil {
+ cfg := Config{}
+ err = db.DeSerialize(string(value), &cfg)
+ if err != nil {
+ return Config{}, pkgerrors.Wrap(err, "Unmarshaling Config Value")
+ }
+ return cfg, nil
+ }
+ return Config{}, pkgerrors.Wrap(err, "Get Config DB Entry")
+}
+
+// Delete the config entry in the database
+func (c ConfigStore) deleteConfig() (Config, error) {
+
+ cfgKey := constructKey(c.rbName, c.rbVersion, c.profileName, tagConfig, c.configName)
+ value, err := db.Etcd.Get(cfgKey)
+ if err != nil {
+ return Config{}, pkgerrors.Wrap(err, "Config DB Entry Not found")
+ }
+ configPrev := Config{}
+ err = db.DeSerialize(string(value), &configPrev)
+ if err != nil {
+ return Config{}, pkgerrors.Wrap(err, "DeSerialize Config Value")
+ }
+
+ err = db.Etcd.Delete(cfgKey)
+ if err != nil {
+ return Config{}, pkgerrors.Wrap(err, "Config DB Entry")
+ }
+ return configPrev, nil
+}
+
+// Create a version for the configuration. If previous config provided that is also stored
+func (c ConfigVersionStore) createConfigVersion(configNew, configPrev Config, action string) (uint, error) {
+
+ version, err := c.incrementVersion()
+
+ if err != nil {
+ return 0, pkgerrors.Wrap(err, "Get Next Version")
+ }
+ versionKey := constructKey(c.rbName, c.rbVersion, c.profileName, tagVersion, strconv.Itoa(int(version)))
+
+ var cs configVersionDBContent
+ cs.Action = action
+ cs.ConfigNew = configNew
+ cs.ConfigPrev = configPrev
+
+ configValue, err := db.Serialize(cs)
+ if err != nil {
+ return 0, pkgerrors.Wrap(err, "Serialize Config Value")
+ }
+ err = db.Etcd.Put(versionKey, configValue)
+ if err != nil {
+ return 0, pkgerrors.Wrap(err, "Create Config DB Entry")
+ }
+ return version, nil
+}
+
+// Delete current version of the configuration. Configuration always deleted from top
+func (c ConfigVersionStore) deleteConfigVersion() error {
+
+ counter, err := c.getCurrentVersion()
+
+ if err != nil {
+ return pkgerrors.Wrap(err, "Get Next Version")
+ }
+ versionKey := constructKey(c.rbName, c.rbVersion, c.profileName, tagVersion, strconv.Itoa(int(counter)))
+
+ err = db.Etcd.Delete(versionKey)
+ if err != nil {
+ return pkgerrors.Wrap(err, "Delete Config DB Entry")
+ }
+ err = c.decrementVersion()
+ if err != nil {
+ return pkgerrors.Wrap(err, "Decrement Version")
+ }
+ return nil
+}
+
+// Read the specified version of the configuration and return its prev and current value.
+// Also returns the action for the config version
+func (c ConfigVersionStore) getConfigVersion(version uint) (Config, Config, string, error) {
+
+ versionKey := constructKey(c.rbName, c.rbVersion, c.profileName, tagVersion, strconv.Itoa(int(version)))
+ configBytes, err := db.Etcd.Get(versionKey)
+ if err != nil {
+ return Config{}, Config{}, "", pkgerrors.Wrap(err, "Get Config Version ")
+ }
+
+ if configBytes != nil {
+ pr := configVersionDBContent{}
+ err = db.DeSerialize(string(configBytes), &pr)
+ if err != nil {
+ return Config{}, Config{}, "", pkgerrors.Wrap(err, "DeSerialize Config Version")
+ }
+ return pr.ConfigNew, pr.ConfigPrev, pr.Action, nil
+ }
+ return Config{}, Config{}, "", pkgerrors.Wrap(err, "Invalid data ")
+}
+
+// Get the counter for the version
+func (c ConfigVersionStore) getCurrentVersion() (uint, error) {
+
+ cfgKey := constructKey(c.rbName, c.rbVersion, c.profileName, tagCounter)
+
+ value, err := db.Etcd.Get(cfgKey)
+ if err != nil {
+ if strings.Contains(err.Error(), "Key doesn't exist") == true {
+ // Counter not started yet, 0 is invalid value
+ return 0, nil
+ } else {
+ return 0, pkgerrors.Wrap(err, "Get Current Version")
+ }
+ }
+
+ index, err := strconv.Atoi(string(value))
+ if err != nil {
+ return 0, pkgerrors.Wrap(err, "Invalid counter")
+ }
+ return uint(index), nil
+}
+
+// Update the counter for the version
+func (c ConfigVersionStore) updateVersion(counter uint) error {
+
+ cfgKey := constructKey(c.rbName, c.rbVersion, c.profileName, tagCounter)
+ err := db.Etcd.Put(cfgKey, strconv.Itoa(int(counter)))
+ if err != nil {
+ return pkgerrors.Wrap(err, "Counter DB Entry")
+ }
+ return nil
+}
+
+// Increment the version counter
+func (c ConfigVersionStore) incrementVersion() (uint, error) {
+
+ counter, err := c.getCurrentVersion()
+ if err != nil {
+ return 0, pkgerrors.Wrap(err, "Get Next Counter Value")
+ }
+ //This is done while Profile lock is taken
+ counter++
+ err = c.updateVersion(counter)
+ if err != nil {
+ return 0, pkgerrors.Wrap(err, "Store Next Counter Value")
+ }
+
+ return counter, nil
+}
+
+// Decrement the version counter
+func (c ConfigVersionStore) decrementVersion() error {
+
+ counter, err := c.getCurrentVersion()
+ if err != nil {
+ return pkgerrors.Wrap(err, "Get Next Counter Value")
+ }
+ //This is done while Profile lock is taken
+ counter--
+ err = c.updateVersion(counter)
+ if err != nil {
+ return pkgerrors.Wrap(err, "Store Next Counter Value")
+ }
+
+ return nil
+}
+
+// Apply Config
+func applyConfig(rbName, rbVersion, profileName string, p Config, pChannel chan configResourceList, action string) error {
+
+ // Get Template and Resolve the template with values
+ crl, err := resolve(rbName, rbVersion, profileName, p)
+ if err != nil {
+ return pkgerrors.Wrap(err, "Resolve Config")
+ }
+ crl.action = action
+ // Send the configResourceList to the channel. Using select for non-blocking channel
+ select {
+ case pChannel <- crl:
+ log.Printf("Message Sent to goroutine %v", crl.profile)
+ default:
+ }
+
+ return nil
+}
+
+// Per Profile Go routine to apply the configuration to Cloud Region
+func scheduleResources(c chan configResourceList) {
+ // Keep thread running
+ for {
+ data := <-c
+ //TODO: ADD Check to see if Application running
+ ic := NewInstanceClient()
+ resp, err := ic.Find(data.profile.RBName, data.profile.RBVersion, data.profile.ProfileName)
+ if err != nil || len(resp) == 0 {
+ log.Println("Error finding a running instance. Retrying later...")
+ time.Sleep(time.Second * 10)
+ continue
+ }
+ switch {
+ case data.action == "POST":
+ log.Printf("[scheduleResources]: POST %v %v", data.profile, data.resourceTemplates)
+ for _, inst := range resp {
+ k8sClient := KubernetesClient{}
+ err = k8sClient.init(inst.CloudRegion)
+ if err != nil {
+ log.Printf("Getting CloudRegion Information: %s", err.Error())
+ //Move onto the next cloud region
+ continue
+ }
+ data.createdResources, err = k8sClient.createResources(data.resourceTemplates, inst.Namespace)
+ if err != nil {
+ log.Printf("Error Creating resources: %s", err.Error())
+ continue
+ }
+ }
+ //TODO: Needs to add code to call Kubectl create
+ case data.action == "PUT":
+ log.Printf("[scheduleResources]: PUT %v %v", data.profile, data.resourceTemplates)
+ //TODO: Needs to add code to call Kubectl apply
+ case data.action == "DELETE":
+ log.Printf("[scheduleResources]: DELETE %v %v", data.profile, data.resourceTemplates)
+ for _, inst := range resp {
+ k8sClient := KubernetesClient{}
+ err = k8sClient.init(inst.CloudRegion)
+ if err != nil {
+ log.Printf("Getting CloudRegion Information: %s", err.Error())
+ //Move onto the next cloud region
+ continue
+ }
+ err = k8sClient.deleteResources(data.createdResources, inst.Namespace)
+ if err != nil {
+ log.Printf("Error Deleting resources: %s", err.Error())
+ continue
+ }
+ }
+ }
+ }
+}
+
+//Resolve returns the path where the helm chart merged with
+//configuration overrides resides.
+var resolve = func(rbName, rbVersion, profileName string, p Config) (configResourceList, error) {
+
+ var resTemplates []helm.KubernetesResourceTemplate
+
+ profile, err := rb.NewProfileClient().Get(rbName, rbVersion, profileName)
+ if err != nil {
+ return configResourceList{}, pkgerrors.Wrap(err, "Reading Profile Data")
+ }
+
+ t, err := rb.NewConfigTemplateClient().Get(rbName, rbVersion, p.TemplateName)
+ if err != nil {
+ return configResourceList{}, pkgerrors.Wrap(err, "Getting Template")
+ }
+ if t.ChartName == "" {
+ return configResourceList{}, pkgerrors.New("Invalid template no Chart.yaml file found")
+ }
+
+ def, err := rb.NewConfigTemplateClient().Download(rbName, rbVersion, p.TemplateName)
+ if err != nil {
+ return configResourceList{}, pkgerrors.Wrap(err, "Downloading Template")
+ }
+
+ //Create a temp file in the system temp folder for values input
+ b, err := json.Marshal(p.Values)
+ if err != nil {
+ return configResourceList{}, pkgerrors.Wrap(err, "Error Marshalling config data")
+ }
+ data, err := yaml.JSONToYAML(b)
+ if err != nil {
+ return configResourceList{}, pkgerrors.Wrap(err, "JSON to YAML")
+ }
+
+ outputfile, err := ioutil.TempFile("", "helm-config-values-")
+ if err != nil {
+ return configResourceList{}, pkgerrors.Wrap(err, "Got error creating temp file")
+ }
+ _, err = outputfile.Write([]byte(data))
+ if err != nil {
+ return configResourceList{}, pkgerrors.Wrap(err, "Got error writting temp file")
+ }
+ defer outputfile.Close()
+
+ chartBasePath, err := rb.ExtractTarBall(bytes.NewBuffer(def))
+ if err != nil {
+ return configResourceList{}, pkgerrors.Wrap(err, "Extracting Template")
+ }
+
+ helmClient := helm.NewTemplateClient(profile.KubernetesVersion,
+ profile.Namespace,
+ profile.ReleaseName)
+
+ chartPath := filepath.Join(chartBasePath, t.ChartName)
+ resTemplates, err = helmClient.GenerateKubernetesArtifacts(chartPath,
+ []string{outputfile.Name()},
+ nil)
+ if err != nil {
+ return configResourceList{}, pkgerrors.Wrap(err, "Generate final k8s yaml")
+ }
+ crl := configResourceList{
+ resourceTemplates: resTemplates,
+ profile: profile,
+ }
+
+ return crl, nil
+}
+
+// Get the Mutex for the Profile
+func getProfileData(key string) (*sync.Mutex, chan configResourceList) {
+ profileData.Lock()
+ defer profileData.Unlock()
+ _, ok := profileData.profileLockMap[key]
+ if !ok {
+ profileData.profileLockMap[key] = &sync.Mutex{}
+ }
+ _, ok = profileData.resourceChannel[key]
+ if !ok {
+ profileData.resourceChannel[key] = make(chan configResourceList)
+ go scheduleResources(profileData.resourceChannel[key])
+ }
+ return profileData.profileLockMap[key], profileData.resourceChannel[key]
+}
diff --git a/src/k8splugin/internal/app/config_test.go b/src/k8splugin/internal/app/config_test.go
new file mode 100644
index 00000000..11a300ff
--- /dev/null
+++ b/src/k8splugin/internal/app/config_test.go
@@ -0,0 +1,259 @@
+/*
+ * Copyright 2018 Intel Corporation, Inc
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package app
+
+import (
+ "k8splugin/internal/db"
+ "reflect"
+ "strings"
+ "testing"
+ // pkgerrors "github.com/pkg/errors"
+)
+
+func TestCreateConfig(t *testing.T) {
+ testCases := []struct {
+ label string
+ rbName string
+ rbVersion string
+ profileName string
+ inp Config
+ expectedError string
+ mockdb *db.MockEtcdClient
+ expected ConfigResult
+ }{
+ {
+ label: "Create Config",
+ rbName: "testdef1",
+ rbVersion: "v1",
+ profileName: "testprofile1",
+ inp: Config{
+ ConfigName: "testconfig1",
+ TemplateName: "testtemplate1",
+ Values: map[string]interface{}{
+ "values": "{\"namespace\": \"kafka\", \"topic\": {\"name\":\"orders\", \"cluster\":\"my-cluster\", \"partitions\": 10,\"replicas\": 2, }}"},
+ },
+ expected: ConfigResult{
+ DefinitionName: "testdef1",
+ DefinitionVersion: "v1",
+ ProfileName: "testprofile1",
+ ConfigName: "testconfig1",
+ TemplateName: "testtemplate1",
+ ConfigVersion: 1,
+ },
+ expectedError: "",
+ mockdb: &db.MockEtcdClient{
+ Items: nil,
+ Err: nil,
+ },
+ },
+ }
+
+ for _, testCase := range testCases {
+ t.Run(testCase.label, func(t *testing.T) {
+ db.Etcd = testCase.mockdb
+ resolve = func(rbName, rbVersion, profileName string, p Config) (configResourceList, error) {
+ return configResourceList{}, nil
+ }
+ impl := NewConfigClient()
+ got, err := impl.Create(testCase.rbName, testCase.rbVersion, testCase.profileName, testCase.inp)
+ if err != nil {
+ if testCase.expectedError == "" {
+ t.Fatalf("Create returned an unexpected error %s", err)
+ }
+ if strings.Contains(err.Error(), testCase.expectedError) == false {
+ t.Fatalf("Create returned an unexpected error %s", err)
+ }
+ } else {
+ if reflect.DeepEqual(testCase.expected, got) == false {
+ t.Errorf("Create Resource Bundle returned unexpected body: got %v;"+
+ " expected %v", got, testCase.expected)
+ }
+ }
+ })
+ }
+}
+
+func TestRollbackConfig(t *testing.T) {
+ testCases := []struct {
+ label string
+ rbName string
+ rbVersion string
+ profileName string
+ inp Config
+ inpUpdate1 Config
+ inpUpdate2 Config
+ expectedError string
+ mockdb *db.MockEtcdClient
+ expected1 ConfigResult
+ expected2 ConfigResult
+ expected3 ConfigResult
+ expected4 ConfigResult
+ rollbackConfig ConfigRollback
+ }{
+ {
+ label: "Rollback Config",
+ rbName: "testdef1",
+ rbVersion: "v1",
+ profileName: "testprofile1",
+ inp: Config{
+ ConfigName: "testconfig1",
+ TemplateName: "testtemplate1",
+ Values: map[string]interface{}{
+ "values": "{\"namespace\": \"kafka\", \"topic\": {\"name\":\"orders\", \"cluster\":\"my-cluster\", \"partitions\": 10,\"replicas\": 2, }}"},
+ },
+ inpUpdate1: Config{
+ ConfigName: "testconfig1",
+ TemplateName: "testtemplate1",
+ Values: map[string]interface{}{
+ "values": "{\"namespace\": \"kafka\", \"topic\": {\"name\":\"orders\", \"cluster\":\"my-cluster\", \"partitions\": 20,\"replicas\": 2, }}"},
+ },
+ inpUpdate2: Config{
+ ConfigName: "testconfig1",
+ TemplateName: "testtemplate1",
+ Values: map[string]interface{}{
+ "values": "{\"namespace\": \"kafka\", \"topic\": {\"name\":\"orders\", \"cluster\":\"my-cluster\", \"partitions\": 30,\"replicas\": 2, }}"},
+ },
+ expected1: ConfigResult{
+ DefinitionName: "testdef1",
+ DefinitionVersion: "v1",
+ ProfileName: "testprofile1",
+ ConfigName: "testconfig1",
+ TemplateName: "testtemplate1",
+ ConfigVersion: 1,
+ },
+ expected2: ConfigResult{
+ DefinitionName: "testdef1",
+ DefinitionVersion: "v1",
+ ProfileName: "testprofile1",
+ ConfigName: "testconfig1",
+ TemplateName: "testtemplate1",
+ ConfigVersion: 2,
+ },
+ expected3: ConfigResult{
+ DefinitionName: "testdef1",
+ DefinitionVersion: "v1",
+ ProfileName: "testprofile1",
+ ConfigName: "testconfig1",
+ TemplateName: "testtemplate1",
+ ConfigVersion: 3,
+ },
+ expected4: ConfigResult{
+ DefinitionName: "testdef1",
+ DefinitionVersion: "v1",
+ ProfileName: "testprofile1",
+ ConfigName: "testconfig1",
+ TemplateName: "testtemplate1",
+ ConfigVersion: 4,
+ },
+ expectedError: "",
+ mockdb: &db.MockEtcdClient{
+ Items: nil,
+ Err: nil,
+ },
+ },
+ }
+
+ for _, testCase := range testCases {
+ t.Run(testCase.label, func(t *testing.T) {
+ db.Etcd = testCase.mockdb
+ resolve = func(rbName, rbVersion, profileName string, p Config) (configResourceList, error) {
+ return configResourceList{}, nil
+ }
+ impl := NewConfigClient()
+ got, err := impl.Create(testCase.rbName, testCase.rbVersion, testCase.profileName, testCase.inp)
+ if err != nil {
+ if testCase.expectedError == "" {
+ t.Fatalf("Create returned an unexpected error %s", err)
+ }
+ if strings.Contains(err.Error(), testCase.expectedError) == false {
+ t.Fatalf("Create returned an unexpected error %s", err)
+ }
+ } else {
+ if reflect.DeepEqual(testCase.expected1, got) == false {
+ t.Errorf("Create Resource Bundle returned unexpected body: got %v;"+
+ " expected %v", got, testCase.expected1)
+ }
+ }
+ got, err = impl.Update(testCase.rbName, testCase.rbVersion, testCase.profileName, testCase.inp.ConfigName, testCase.inpUpdate1)
+ if err != nil {
+ if testCase.expectedError == "" {
+ t.Fatalf("Create returned an unexpected error %s", err)
+ }
+ if strings.Contains(err.Error(), testCase.expectedError) == false {
+ t.Fatalf("Create returned an unexpected error %s", err)
+ }
+ } else {
+ if reflect.DeepEqual(testCase.expected2, got) == false {
+ t.Errorf("Create Resource Bundle returned unexpected body: got %v;"+
+ " expected %v", got, testCase.expected2)
+ }
+ }
+ got, err = impl.Update(testCase.rbName, testCase.rbVersion, testCase.profileName, testCase.inp.ConfigName, testCase.inpUpdate2)
+ if err != nil {
+ if testCase.expectedError == "" {
+ t.Fatalf("Create returned an unexpected error %s", err)
+ }
+ if strings.Contains(err.Error(), testCase.expectedError) == false {
+ t.Fatalf("Create returned an unexpected error %s", err)
+ }
+ } else {
+ if reflect.DeepEqual(testCase.expected3, got) == false {
+ t.Errorf("Create Resource Bundle returned unexpected body: got %v;"+
+ " expected %v", got, testCase.expected3)
+ }
+ }
+ got, err = impl.Delete(testCase.rbName, testCase.rbVersion, testCase.profileName, testCase.inp.ConfigName)
+ if err != nil {
+ if testCase.expectedError == "" {
+ t.Fatalf("Create returned an unexpected error %s", err)
+ }
+ if strings.Contains(err.Error(), testCase.expectedError) == false {
+ t.Fatalf("Create returned an unexpected error %s", err)
+ }
+ } else {
+ if reflect.DeepEqual(testCase.expected4, got) == false {
+ t.Errorf("Create Resource Bundle returned unexpected body: got %v;"+
+ " expected %v", got, testCase.expected4)
+ }
+ }
+ testCase.rollbackConfig.AnyOf.ConfigVersion = "2"
+ err = impl.Rollback(testCase.rbName, testCase.rbVersion, testCase.profileName, testCase.rollbackConfig)
+ if err != nil {
+ if testCase.expectedError == "" {
+ t.Fatalf("Create returned an unexpected error %s", err)
+ }
+ if strings.Contains(err.Error(), testCase.expectedError) == false {
+ t.Fatalf("Create returned an unexpected error %s", err)
+ }
+ }
+ rollbackConfig, err := impl.Get(testCase.rbName, testCase.rbVersion, testCase.profileName, testCase.inp.ConfigName)
+ if err != nil {
+ if testCase.expectedError == "" {
+ t.Fatalf("Create returned an unexpected error %s", err)
+ }
+ if strings.Contains(err.Error(), testCase.expectedError) == false {
+ t.Fatalf("Create returned an unexpected error %s", err)
+ }
+ } else {
+ if reflect.DeepEqual(testCase.inpUpdate1, rollbackConfig) == false {
+ t.Errorf("Rollback config failed: got %v;"+
+ " expected %v", rollbackConfig, testCase.inpUpdate1)
+ }
+ }
+ })
+ }
+}