diff options
Diffstat (limited to 'src/k8splugin/internal/app/config_backend.go')
-rw-r--r-- | src/k8splugin/internal/app/config_backend.go | 154 |
1 files changed, 130 insertions, 24 deletions
diff --git a/src/k8splugin/internal/app/config_backend.go b/src/k8splugin/internal/app/config_backend.go index 3e5d8a3f..4fedb386 100644 --- a/src/k8splugin/internal/app/config_backend.go +++ b/src/k8splugin/internal/app/config_backend.go @@ -38,9 +38,10 @@ import ( //ConfigStore contains the values that will be stored in the database type configVersionDBContent struct { - ConfigNew Config `json:"config-new"` - ConfigPrev Config `json:"config-prev"` - Action string `json:"action"` // CRUD opration for this config + ConfigNew Config `json:"config-new"` + ConfigPrev Config `json:"config-prev"` + Action string `json:"action"` // CRUD opration for this config + Resources []helm.KubernetesResource `json:"resources"` } //ConfigStore to Store the Config @@ -57,7 +58,8 @@ type ConfigVersionStore struct { type configResourceList struct { resourceTemplates []helm.KubernetesResourceTemplate - createdResources []helm.KubernetesResource + resources []helm.KubernetesResource + updatedResources chan []helm.KubernetesResource profile rb.Profile action string } @@ -72,6 +74,7 @@ const ( storeName = "config" tagCounter = "counter" tagVersion = "configversion" + tagName = "configtag" tagConfig = "configdata" ) @@ -223,8 +226,37 @@ func (c ConfigStore) deleteConfig() (Config, error) { return configPrev, nil } +//Cleanup stored data in etcd before instance is being deleted +func (c ConfigVersionStore) cleanupIstanceTags(configName string) error { + + rbName, rbVersion, profileName, _, err := resolveModelFromInstance(c.instanceID) + if err != nil { + return pkgerrors.Wrap(err, "Retrieving model info") + } + + versionKey := constructKey(rbName, rbVersion, profileName, c.instanceID, tagVersion, configName) + err = db.Etcd.DeletePrefix(versionKey) + if err != nil { + log.Printf("Deleting versions of instance failed: %s", err.Error()) + } + + counterKey := constructKey(rbName, rbVersion, profileName, c.instanceID, tagCounter, configName) + err = db.Etcd.DeletePrefix(counterKey) + if err != nil { + log.Printf("Deleting counters of instance failed: %s", err.Error()) + } + + nameKey := constructKey(rbName, rbVersion, profileName, c.instanceID, tagName, configName) + err = db.Etcd.DeletePrefix(nameKey) + if err != nil { + log.Printf("Deleting counters of instance failed: %s", err.Error()) + } + + return nil +} + // Create a version for the configuration. If previous config provided that is also stored -func (c ConfigVersionStore) createConfigVersion(configNew, configPrev Config, action string) (uint, error) { +func (c ConfigVersionStore) createConfigVersion(configNew, configPrev Config, action string, resources []helm.KubernetesResource) (uint, error) { configName := "" if configNew.ConfigName != "" { @@ -249,6 +281,7 @@ func (c ConfigVersionStore) createConfigVersion(configNew, configPrev Config, ac cs.Action = action cs.ConfigNew = configNew cs.ConfigPrev = configPrev + cs.Resources = resources //[]helm.KubernetesResource{} configValue, err := db.Serialize(cs) if err != nil { @@ -288,27 +321,27 @@ func (c ConfigVersionStore) deleteConfigVersion(configName string) error { // Read the specified version of the configuration and return its prev and current value. // Also returns the action for the config version -func (c ConfigVersionStore) getConfigVersion(configName string, version uint) (Config, Config, string, error) { +func (c ConfigVersionStore) getConfigVersion(configName string, version uint) (Config, Config, string, []helm.KubernetesResource, error) { rbName, rbVersion, profileName, _, err := resolveModelFromInstance(c.instanceID) if err != nil { - return Config{}, Config{}, "", pkgerrors.Wrap(err, "Retrieving model info") + return Config{}, Config{}, "", []helm.KubernetesResource{}, pkgerrors.Wrap(err, "Retrieving model info") } versionKey := constructKey(rbName, rbVersion, profileName, c.instanceID, tagVersion, configName, strconv.Itoa(int(version))) configBytes, err := db.Etcd.Get(versionKey) if err != nil { - return Config{}, Config{}, "", pkgerrors.Wrap(err, "Get Config Version ") + return Config{}, Config{}, "", []helm.KubernetesResource{}, pkgerrors.Wrap(err, "Get Config Version ") } if configBytes != nil { pr := configVersionDBContent{} err = db.DeSerialize(string(configBytes), &pr) if err != nil { - return Config{}, Config{}, "", pkgerrors.Wrap(err, "DeSerialize Config Version") + return Config{}, Config{}, "", []helm.KubernetesResource{}, pkgerrors.Wrap(err, "DeSerialize Config Version") } - return pr.ConfigNew, pr.ConfigPrev, pr.Action, nil + return pr.ConfigNew, pr.ConfigPrev, pr.Action, pr.Resources, nil } - return Config{}, Config{}, "", pkgerrors.Wrap(err, "Invalid data ") + return Config{}, Config{}, "", []helm.KubernetesResource{}, pkgerrors.Wrap(err, "Invalid data ") } // Get the counter for the version @@ -318,7 +351,7 @@ func (c ConfigVersionStore) getCurrentVersion(configName string) (uint, error) { if err != nil { return 0, pkgerrors.Wrap(err, "Retrieving model info") } - cfgKey := constructKey(rbName, rbVersion, profileName, c.instanceID, configName, tagCounter) + cfgKey := constructKey(rbName, rbVersion, profileName, c.instanceID, tagCounter, configName) value, err := db.Etcd.Get(cfgKey) if err != nil { @@ -344,7 +377,7 @@ func (c ConfigVersionStore) updateVersion(configName string, counter uint) error if err != nil { return pkgerrors.Wrap(err, "Retrieving model info") } - cfgKey := constructKey(rbName, rbVersion, profileName, c.instanceID, configName, tagCounter) + cfgKey := constructKey(rbName, rbVersion, profileName, c.instanceID, tagCounter, configName) err = db.Etcd.Put(cfgKey, strconv.Itoa(int(counter))) if err != nil { return pkgerrors.Wrap(err, "Counter DB Entry") @@ -386,45 +419,87 @@ func (c ConfigVersionStore) decrementVersion(configName string) error { return nil } +// Get tag version +func (c ConfigVersionStore) getTagVersion(configName, tagNameValue string) (string, error) { + rbName, rbVersion, profileName, _, err := resolveModelFromInstance(c.instanceID) + if err != nil { + return "", pkgerrors.Wrap(err, "Retrieving model info") + } + tagKey := constructKey(rbName, rbVersion, profileName, c.instanceID, tagName, configName, tagNameValue) + + value, err := db.Etcd.Get(tagKey) + if err != nil { + return "", pkgerrors.Wrap(err, "Config DB Entry Not found") + } + return string(value), nil +} + +// Tag current version +func (c ConfigVersionStore) tagCurrentVersion(configName, tagNameValue string) error { + currentVersion, err := c.getCurrentVersion(configName) + if err != nil { + return pkgerrors.Wrap(err, "Get Current Config Version ") + } + rbName, rbVersion, profileName, _, err := resolveModelFromInstance(c.instanceID) + if err != nil { + return pkgerrors.Wrap(err, "Retrieving model info") + } + tagKey := constructKey(rbName, rbVersion, profileName, c.instanceID, tagName, configName, tagNameValue) + + err = db.Etcd.Put(tagKey, strconv.Itoa(int(currentVersion))) + if err != nil { + return pkgerrors.Wrap(err, "TagIt store DB") + } + return nil +} + // Apply Config -func applyConfig(instanceID string, p Config, pChannel chan configResourceList, action string) error { +func applyConfig(instanceID string, p Config, pChannel chan configResourceList, action string, resources []helm.KubernetesResource) ([]helm.KubernetesResource, error) { rbName, rbVersion, profileName, releaseName, err := resolveModelFromInstance(instanceID) if err != nil { - return pkgerrors.Wrap(err, "Retrieving model info") + return []helm.KubernetesResource{}, pkgerrors.Wrap(err, "Retrieving model info") } // Get Template and Resolve the template with values crl, err := resolve(rbName, rbVersion, profileName, p, releaseName) if err != nil { - return pkgerrors.Wrap(err, "Resolve Config") + return []helm.KubernetesResource{}, pkgerrors.Wrap(err, "Resolve Config") } + var updatedResources (chan []helm.KubernetesResource) = make(chan []helm.KubernetesResource) crl.action = action + crl.resources = resources + crl.updatedResources = updatedResources // Send the configResourceList to the channel. Using select for non-blocking channel + log.Printf("Before Sent to goroutine %v", crl.profile) select { case pChannel <- crl: log.Printf("Message Sent to goroutine %v", crl.profile) default: } - return nil + var resultResources []helm.KubernetesResource = <-updatedResources + return resultResources, nil } // Per Profile Go routine to apply the configuration to Cloud Region func scheduleResources(c chan configResourceList) { // Keep thread running + log.Printf("[scheduleResources]: START thread") for { data := <-c //TODO: ADD Check to see if Application running ic := NewInstanceClient() resp, err := ic.Find(data.profile.RBName, data.profile.RBVersion, data.profile.ProfileName, nil) - if err != nil || len(resp) == 0 { + if (err != nil || len(resp) == 0) && data.action != "STOP" { log.Println("Error finding a running instance. Retrying later...") - time.Sleep(time.Second * 10) + data.updatedResources <- []helm.KubernetesResource{} continue } + breakThread := false switch { case data.action == "POST": log.Printf("[scheduleResources]: POST %v %v", data.profile, data.resourceTemplates) + var resources []helm.KubernetesResource for _, inst := range resp { k8sClient := KubernetesClient{} err = k8sClient.Init(inst.Request.CloudRegion, inst.ID) @@ -434,11 +509,11 @@ func scheduleResources(c chan configResourceList) { continue } //assuming - the resource is not exist already - data.createdResources, err = k8sClient.createResources(data.resourceTemplates, inst.Namespace) + resources, err = k8sClient.createResources(data.resourceTemplates, inst.Namespace) errCreate := err if err != nil { // assuming - the err represent the resource is already exist, so going for update - data.createdResources, err = k8sClient.updateResources(data.resourceTemplates, inst.Namespace) + resources, err = k8sClient.updateResources(data.resourceTemplates, inst.Namespace) if err != nil { log.Printf("Error Creating resources: %s", errCreate.Error()) log.Printf("Error Updating resources: %s", err.Error()) @@ -446,8 +521,10 @@ func scheduleResources(c chan configResourceList) { } } } + data.updatedResources <- resources case data.action == "PUT": log.Printf("[scheduleResources]: PUT %v %v", data.profile, data.resourceTemplates) + var resources []helm.KubernetesResource for _, inst := range resp { k8sClient := KubernetesClient{} err = k8sClient.Init(inst.Request.CloudRegion, inst.ID) @@ -456,14 +533,16 @@ func scheduleResources(c chan configResourceList) { //Move onto the next cloud region continue } - data.createdResources, err = k8sClient.updateResources(data.resourceTemplates, inst.Namespace) + + resources, err = k8sClient.updateResources(data.resourceTemplates, inst.Namespace) if err != nil { log.Printf("Error Updating resources: %s", err.Error()) continue } } + data.updatedResources <- resources case data.action == "DELETE": - log.Printf("[scheduleResources]: DELETE %v %v", data.profile, data.resourceTemplates) + log.Printf("[scheduleResources]: DELETE %v %v", data.profile, data.resources) for _, inst := range resp { k8sClient := KubernetesClient{} err = k8sClient.Init(inst.Request.CloudRegion, inst.ID) @@ -472,14 +551,22 @@ func scheduleResources(c chan configResourceList) { //Move onto the next cloud region continue } - err = k8sClient.deleteResources(data.createdResources, inst.Namespace) + err = k8sClient.deleteResources(data.resources, inst.Namespace) if err != nil { log.Printf("Error Deleting resources: %s", err.Error()) continue } } + data.updatedResources <- []helm.KubernetesResource{} + + case data.action == "STOP": + breakThread = true + } + if breakThread { + break } } + log.Printf("[scheduleResources]: STOP thread") } //Resolve returns the path where the helm chart merged with @@ -570,6 +657,25 @@ func getProfileData(key string) (*sync.Mutex, chan configResourceList) { if !ok { profileData.resourceChannel[key] = make(chan configResourceList) go scheduleResources(profileData.resourceChannel[key]) + time.Sleep(time.Second * 5) } return profileData.profileLockMap[key], profileData.resourceChannel[key] } + +func removeProfileData(key string) { + profileData.Lock() + defer profileData.Unlock() + _, ok := profileData.profileLockMap[key] + if ok { + delete(profileData.profileLockMap, key) + } + _, ok = profileData.resourceChannel[key] + if ok { + log.Printf("Stop config thread for %s", key) + crl := configResourceList{ + action: "STOP", + } + profileData.resourceChannel[key] <- crl + delete(profileData.resourceChannel, key) + } +} |