summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorLukasz Rajewski <lukasz.rajewski@orange.com>2021-10-01 09:35:35 +0200
committerLukasz Rajewski <lukasz.rajewski@orange.com>2021-10-04 12:06:25 +0200
commitbbeac9a596074d0af6e5be60448567517978a388 (patch)
treea3e0203797e00e6db9e31001e316e238a77d775a
parentdc62323aa7f6782d69c7ac6509eb270e86ef31bd (diff)
Further fixes for config delete operation
The issue was related with insufficient handlijg of different versions of config vs their delete operation handled by the plugin. Issue-ID: MULTICLOUD-1332 Signed-off-by: Lukasz Rajewski <lukasz.rajewski@orange.com> Change-Id: I90d896720fa89ebd66cb3290cdd9401272f5e3fd
-rw-r--r--src/k8splugin/api/api.go3
-rw-r--r--src/k8splugin/api/confighandler.go19
-rw-r--r--src/k8splugin/internal/app/client.go24
-rw-r--r--src/k8splugin/internal/app/config.go188
-rw-r--r--src/k8splugin/internal/app/config_backend.go154
-rw-r--r--src/k8splugin/internal/app/config_test.go3
-rw-r--r--src/k8splugin/internal/app/instance.go22
-rw-r--r--src/k8splugin/internal/db/etcd.go14
-rw-r--r--src/k8splugin/internal/db/etcd_testing.go9
-rw-r--r--src/k8splugin/plugins/generic/plugin.go107
-rw-r--r--src/k8splugin/plugins/namespace/plugin.go13
-rw-r--r--src/k8splugin/plugins/service/plugin.go41
12 files changed, 430 insertions, 167 deletions
diff --git a/src/k8splugin/api/api.go b/src/k8splugin/api/api.go
index e34b93a2..ed23f392 100644
--- a/src/k8splugin/api/api.go
+++ b/src/k8splugin/api/api.go
@@ -137,7 +137,8 @@ func NewRouter(defClient rb.DefinitionManager,
instRouter.HandleFunc("/instance/{instID}/config", configHandler.listHandler).Methods("GET")
instRouter.HandleFunc("/instance/{instID}/config/{cfgname}", configHandler.getHandler).Methods("GET")
instRouter.HandleFunc("/instance/{instID}/config/{cfgname}", configHandler.updateHandler).Methods("PUT")
- instRouter.HandleFunc("/instance/{instID}/config/{cfgname}", configHandler.deleteHandler).Methods("DELETE")
+ instRouter.HandleFunc("/instance/{instID}/config/{cfgname}", configHandler.deleteAllHandler).Methods("DELETE")
+ instRouter.HandleFunc("/instance/{instID}/config/{cfgname}/delete", configHandler.deleteHandler).Methods("POST")
instRouter.HandleFunc("/instance/{instID}/config/{cfgname}/rollback", configHandler.rollbackHandler).Methods("POST")
instRouter.HandleFunc("/instance/{instID}/config/{cfgname}/tagit", configHandler.tagitHandler).Methods("POST")
diff --git a/src/k8splugin/api/confighandler.go b/src/k8splugin/api/confighandler.go
index b0a8f851..a4f08131 100644
--- a/src/k8splugin/api/confighandler.go
+++ b/src/k8splugin/api/confighandler.go
@@ -117,6 +117,22 @@ func (h rbConfigHandler) listHandler(w http.ResponseWriter, r *http.Request) {
}
// deleteHandler handles DELETE operations on a config
+func (h rbConfigHandler) deleteAllHandler(w http.ResponseWriter, r *http.Request) {
+ vars := mux.Vars(r)
+ instanceID := vars["instID"]
+ cfgName := vars["cfgname"]
+
+ err := h.client.DeleteAll(instanceID, cfgName)
+ if err != nil {
+ http.Error(w, err.Error(), http.StatusInternalServerError)
+ return
+ }
+
+ w.Header().Set("Content-Type", "application/json")
+ w.WriteHeader(http.StatusAccepted)
+}
+
+// deleteHandler handles delete operations on a config creating its delete version
func (h rbConfigHandler) deleteHandler(w http.ResponseWriter, r *http.Request) {
vars := mux.Vars(r)
instanceID := vars["instID"]
@@ -184,12 +200,13 @@ func (h rbConfigHandler) rollbackHandler(w http.ResponseWriter, r *http.Request)
}
var p app.ConfigRollback
- err := json.NewDecoder(r.Body).Decode(&p)
+ err := json.NewDecoder(r.Body).Decode(&p.AnyOf)
if err != nil {
http.Error(w, err.Error(), http.StatusUnprocessableEntity)
return
}
err = h.client.Rollback(instanceID, cfgName, p)
+ //err = h.client.Cleanup(instanceID)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
diff --git a/src/k8splugin/internal/app/client.go b/src/k8splugin/internal/app/client.go
index 9813333e..06c4c464 100644
--- a/src/k8splugin/internal/app/client.go
+++ b/src/k8splugin/internal/app/client.go
@@ -20,25 +20,30 @@ package app
import (
"context"
"io/ioutil"
+
appsv1 "k8s.io/api/apps/v1"
+
//appsv1beta1 "k8s.io/api/apps/v1beta1"
//appsv1beta2 "k8s.io/api/apps/v1beta2"
batchv1 "k8s.io/api/batch/v1"
corev1 "k8s.io/api/core/v1"
+
//extensionsv1beta1 "k8s.io/api/extensions/v1beta1"
//apiextv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
//apiextv1beta1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1beta1"
- metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"os"
"strings"
"time"
+ metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
+
+ logger "log"
+
"github.com/onap/multicloud-k8s/src/k8splugin/internal/config"
"github.com/onap/multicloud-k8s/src/k8splugin/internal/connection"
"github.com/onap/multicloud-k8s/src/k8splugin/internal/helm"
log "github.com/onap/multicloud-k8s/src/k8splugin/internal/logutils"
"github.com/onap/multicloud-k8s/src/k8splugin/internal/plugin"
- logger "log"
pkgerrors "github.com/pkg/errors"
"k8s.io/apimachinery/pkg/api/meta"
@@ -545,9 +550,18 @@ func (k *KubernetesClient) DeleteKind(resource helm.KubernetesResource, namespac
return pkgerrors.Wrap(err, "Error loading plugin")
}
- err = pluginImpl.Delete(resource, namespace, k)
- if err != nil {
- return pkgerrors.Wrap(err, "Error deleting "+resource.Name)
+ name, err := pluginImpl.Get(resource, namespace, k)
+
+ if (err == nil && name == resource.Name) || (err != nil && strings.Contains(err.Error(), "not found") == false) {
+ err = pluginImpl.Delete(resource, namespace, k)
+ if err != nil {
+ return pkgerrors.Wrap(err, "Error deleting "+resource.Name)
+ }
+ } else {
+ log.Warn("Resource does not exist, Skipping delete", log.Fields{
+ "gvk": resource.GVK,
+ "resource": resource.Name,
+ })
}
return nil
diff --git a/src/k8splugin/internal/app/config.go b/src/k8splugin/internal/app/config.go
index a25ab543..8952c16d 100644
--- a/src/k8splugin/internal/app/config.go
+++ b/src/k8splugin/internal/app/config.go
@@ -18,10 +18,11 @@
package app
import (
+ "log"
"strconv"
"strings"
- "github.com/onap/multicloud-k8s/src/k8splugin/internal/db"
+ "github.com/onap/multicloud-k8s/src/k8splugin/internal/helm"
pkgerrors "github.com/pkg/errors"
)
@@ -66,7 +67,9 @@ type ConfigManager interface {
Help() map[string]string
Update(instanceID, configName string, p Config) (ConfigResult, error)
Delete(instanceID, configName string) (ConfigResult, error)
+ DeleteAll(instanceID, configName string) error
Rollback(instanceID string, configName string, p ConfigRollback) error
+ Cleanup(instanceID string) error
Tagit(instanceID string, configName string, p ConfigTagit) error
}
@@ -94,7 +97,7 @@ func (v *ConfigClient) Help() map[string]string {
// Create an entry for the config in the database
func (v *ConfigClient) Create(instanceID string, p Config) (ConfigResult, error) {
-
+ log.Printf("[Config Create] Instance %s", instanceID)
// Check required fields
if p.ConfigName == "" || p.TemplateName == "" || len(p.Values) == 0 {
return ConfigResult{}, pkgerrors.New("Incomplete Configuration Provided")
@@ -120,10 +123,12 @@ func (v *ConfigClient) Create(instanceID string, p Config) (ConfigResult, error)
// Acquire per profile Mutex
lock.Lock()
defer lock.Unlock()
- err = applyConfig(instanceID, p, profileChannel, "POST")
+ var appliedResources ([]helm.KubernetesResource)
+ appliedResources, err = applyConfig(instanceID, p, profileChannel, "POST", nil)
if err != nil {
return ConfigResult{}, pkgerrors.Wrap(err, "Apply Config failed")
}
+ log.Printf("POST result: %s", appliedResources)
// Create Config DB Entry
err = cs.createConfig(p)
if err != nil {
@@ -134,7 +139,7 @@ func (v *ConfigClient) Create(instanceID string, p Config) (ConfigResult, error)
instanceID: instanceID,
configName: p.ConfigName,
}
- version, err := cvs.createConfigVersion(p, Config{}, "POST")
+ version, err := cvs.createConfigVersion(p, Config{}, "POST", appliedResources)
if err != nil {
return ConfigResult{}, pkgerrors.Wrap(err, "Create Config Version DB Entry")
}
@@ -154,7 +159,7 @@ func (v *ConfigClient) Create(instanceID string, p Config) (ConfigResult, error)
// Update an entry for the config in the database
func (v *ConfigClient) Update(instanceID, configName string, p Config) (ConfigResult, error) {
-
+ log.Printf("[Config Update] Instance %s Config %s", instanceID, configName)
// Check required fields
if len(p.Values) == 0 {
return ConfigResult{}, pkgerrors.New("Incomplete Configuration Provided")
@@ -177,10 +182,12 @@ func (v *ConfigClient) Update(instanceID, configName string, p Config) (ConfigRe
// Acquire per profile Mutex
lock.Lock()
defer lock.Unlock()
- err = applyConfig(instanceID, p, profileChannel, "PUT")
+ var appliedResources ([]helm.KubernetesResource)
+ appliedResources, err = applyConfig(instanceID, p, profileChannel, "PUT", nil)
if err != nil {
return ConfigResult{}, pkgerrors.Wrap(err, "Apply Config failed")
}
+ log.Printf("PUT result: %s", appliedResources)
// Update Config DB Entry
configPrev, err := cs.updateConfig(p)
if err != nil {
@@ -191,7 +198,7 @@ func (v *ConfigClient) Update(instanceID, configName string, p Config) (ConfigRe
instanceID: instanceID,
configName: configName,
}
- version, err := cvs.createConfigVersion(p, configPrev, "PUT")
+ version, err := cvs.createConfigVersion(p, configPrev, "PUT", appliedResources)
if err != nil {
return ConfigResult{}, pkgerrors.Wrap(err, "Create Config Version DB Entry")
}
@@ -247,8 +254,49 @@ func (v *ConfigClient) List(instanceID string) ([]Config, error) {
}
// Delete the Config from database
-func (v *ConfigClient) Delete(instanceID, configName string) (ConfigResult, error) {
+func (v *ConfigClient) DeleteAll(instanceID, configName string) error {
+ log.Printf("[Config Delete All] Instance %s Config %s", instanceID, configName)
+ // Check if Config exists
+ cs := ConfigStore{
+ instanceID: instanceID,
+ configName: configName,
+ }
+ _, err := cs.getConfig()
+ if err != nil {
+ return pkgerrors.Wrap(err, "Update Error - Config doesn't exist")
+ }
+ // Get Version Entry in DB for Config
+ cvs := ConfigVersionStore{
+ instanceID: instanceID,
+ configName: configName,
+ }
+ currentVersion, err := cvs.getCurrentVersion(configName)
+ if err != nil {
+ return pkgerrors.Wrap(err, "Current version get failed")
+ }
+ _, _, action, _, err := cvs.getConfigVersion(configName, currentVersion)
+ if err != nil {
+ return pkgerrors.Wrap(err, "Config version get failed")
+ }
+
+ if action != "DELETE" {
+ _, err = v.Delete(instanceID, configName)
+ if err != nil {
+ return pkgerrors.Wrap(err, "Config DELETE version failed")
+ }
+ }
+ // Delete Config from DB
+ _, err = cs.deleteConfig()
+ if err != nil {
+ return pkgerrors.Wrap(err, "Delete Config DB Entry")
+ }
+ cvs.cleanupIstanceTags(configName)
+ return nil
+}
+// Apply update with delete operation
+func (v *ConfigClient) Delete(instanceID, configName string) (ConfigResult, error) {
+ log.Printf("[Config Delete] Instance %s Config %s", instanceID, configName)
// Resolving rbName, Version, etc. not to break response
rbName, rbVersion, profileName, _, err := resolveModelFromInstance(instanceID)
if err != nil {
@@ -261,29 +309,39 @@ func (v *ConfigClient) Delete(instanceID, configName string) (ConfigResult, erro
}
p, err := cs.getConfig()
if err != nil {
- return ConfigResult{}, pkgerrors.Wrap(err, "Update Error - Config doesn't exist")
+ return ConfigResult{}, pkgerrors.Wrap(err, "Delete Error - Config doesn't exist")
}
lock, profileChannel := getProfileData(instanceID)
// Acquire per profile Mutex
lock.Lock()
defer lock.Unlock()
- err = applyConfig(instanceID, p, profileChannel, "DELETE")
- if err != nil {
- return ConfigResult{}, pkgerrors.Wrap(err, "Apply Config failed")
- }
- // Delete Config from DB
- configPrev, err := cs.deleteConfig()
- if err != nil {
- return ConfigResult{}, pkgerrors.Wrap(err, "Delete Config DB Entry")
- }
// Create Version Entry in DB for Config
cvs := ConfigVersionStore{
instanceID: instanceID,
configName: configName,
}
- version, err := cvs.createConfigVersion(Config{}, configPrev, "DELETE")
+ currentVersion, err := cvs.getCurrentVersion(configName)
+ if err != nil {
+ return ConfigResult{}, pkgerrors.Wrap(err, "Current version get failed")
+ }
+ _, _, _, resources, err := cvs.getConfigVersion(configName, currentVersion)
+ if err != nil {
+ return ConfigResult{}, pkgerrors.Wrap(err, "Config version get failed")
+ }
+
+ _, err = applyConfig(instanceID, p, profileChannel, "DELETE", resources)
+ if err != nil {
+ return ConfigResult{}, pkgerrors.Wrap(err, "Apply Config failed")
+ }
+ log.Printf("DELETE resources: [%s]", resources)
+ // Update Config from DB
+ configPrev, err := cs.updateConfig(p)
if err != nil {
- return ConfigResult{}, pkgerrors.Wrap(err, "Delete Config Version DB Entry")
+ return ConfigResult{}, pkgerrors.Wrap(err, "Update Config DB Entry")
+ }
+ version, err := cvs.createConfigVersion(p, configPrev, "DELETE", []helm.KubernetesResource{})
+ if err != nil {
+ return ConfigResult{}, pkgerrors.Wrap(err, "Create Delete Config Version DB Entry")
}
// Create Result structure
@@ -301,7 +359,7 @@ func (v *ConfigClient) Delete(instanceID, configName string) (ConfigResult, erro
// Rollback starts from current version and rollbacks to the version desired
func (v *ConfigClient) Rollback(instanceID string, configName string, rback ConfigRollback) error {
-
+ log.Printf("[Config Rollback] Instance %s Config %s", instanceID, configName)
var reqVersion string
var err error
@@ -342,40 +400,35 @@ func (v *ConfigClient) Rollback(instanceID string, configName string, rback Conf
//Rollback all the intermettinent configurations
for i := currentVersion; i > rollbackIndex; i-- {
- configNew, configPrev, action, err := cvs.getConfigVersion(configName, i)
+ configNew, configPrev, _, resources, err := cvs.getConfigVersion(configName, i)
if err != nil {
return pkgerrors.Wrap(err, "Rollback Get Config Version")
}
+ _, _, prevAction, _, err := cvs.getConfigVersion(configName, i-1)
+ if err != nil {
+ return pkgerrors.Wrap(err, "Rollback Get Prev Config Version")
+ }
cs := ConfigStore{
instanceID: instanceID,
configName: configNew.ConfigName,
}
- if action == "PUT" {
- // PUT is proceeded by PUT or POST
- err = applyConfig(instanceID, configPrev, profileChannel, "PUT")
+ if prevAction != "DELETE" {
+ appliedResources, err := applyConfig(instanceID, configPrev, profileChannel, prevAction, nil)
if err != nil {
return pkgerrors.Wrap(err, "Apply Config failed")
}
+ log.Printf("%s result: %s", prevAction, appliedResources)
_, err = cs.updateConfig(configPrev)
if err != nil {
return pkgerrors.Wrap(err, "Update Config DB Entry")
}
- } else if action == "POST" {
+ } else {
// POST is always preceeded by Config not existing
- err = applyConfig(instanceID, configNew, profileChannel, "DELETE")
- if err != nil {
- return pkgerrors.Wrap(err, "Delete Config failed")
- }
- _, err = cs.deleteConfig()
- if err != nil {
- return pkgerrors.Wrap(err, "Delete Config DB Entry")
- }
- } else if action == "DELETE" {
- // DELETE is proceeded by PUT or POST
- err = applyConfig(instanceID, configPrev, profileChannel, "PUT")
+ _, err := applyConfig(instanceID, configNew, profileChannel, prevAction, resources)
if err != nil {
return pkgerrors.Wrap(err, "Delete Config failed")
}
+ log.Printf("DELETE resources: %s", resources)
_, err = cs.updateConfig(configPrev)
if err != nil {
return pkgerrors.Wrap(err, "Update Config DB Entry")
@@ -394,11 +447,7 @@ func (v *ConfigClient) Rollback(instanceID string, configName string, rback Conf
// Tagit tags the current version with the tag provided
func (v *ConfigClient) Tagit(instanceID string, configName string, tag ConfigTagit) error {
-
- rbName, rbVersion, profileName, _, err := resolveModelFromInstance(instanceID)
- if err != nil {
- return pkgerrors.Wrap(err, "Retrieving model info")
- }
+ log.Printf("[Config Tag It] Instance %s Config %s", instanceID, configName)
lock, _ := getProfileData(instanceID)
// Acquire per profile Mutex
lock.Lock()
@@ -408,38 +457,52 @@ func (v *ConfigClient) Tagit(instanceID string, configName string, tag ConfigTag
instanceID: instanceID,
configName: configName,
}
- currentVersion, err := cvs.getCurrentVersion(configName)
- if err != nil {
- return pkgerrors.Wrap(err, "Get Current Config Version ")
- }
- tagKey := constructKey(rbName, rbVersion, profileName, instanceID, v.tagTag, configName, tag.TagName)
-
- err = db.Etcd.Put(tagKey, strconv.Itoa(int(currentVersion)))
+ err := cvs.tagCurrentVersion(configName, tag.TagName)
if err != nil {
- return pkgerrors.Wrap(err, "TagIt store DB")
+ return pkgerrors.Wrap(err, "Tag of current version failed")
}
return nil
}
// GetTagVersion returns the version associated with the tag
func (v *ConfigClient) GetTagVersion(instanceID, configName string, tagName string) (string, error) {
-
- rbName, rbVersion, profileName, _, err := resolveModelFromInstance(instanceID)
+ log.Printf("[Config Get Tag Version] Instance %s Config %s", instanceID, configName)
+ cvs := ConfigVersionStore{
+ instanceID: instanceID,
+ configName: configName,
+ }
+ value, err := cvs.getTagVersion(configName, tagName)
if err != nil {
- return "", pkgerrors.Wrap(err, "Retrieving model info")
+ return "", pkgerrors.Wrap(err, "Tag of current version failed")
}
- tagKey := constructKey(rbName, rbVersion, profileName, instanceID, v.tagTag, configName, tagName)
- value, err := db.Etcd.Get(tagKey)
+ return value, nil
+}
+
+// Cleanup version used only when instance is being deleted. We do not pass errors and we try to delete data
+func (v *ConfigClient) Cleanup(instanceID string) error {
+ log.Printf("[Config Cleanup] Instance %s", instanceID)
+ configs, err := v.List(instanceID)
+
if err != nil {
- return "", pkgerrors.Wrap(err, "Config DB Entry Not found")
+ return pkgerrors.Wrap(err, "Retrieving active config list info")
+ }
+
+ for _, config := range configs {
+ err = v.DeleteAll(instanceID, config.ConfigName)
+ if err != nil {
+ log.Printf("Config %s delete failed: %s", config.ConfigName, err.Error())
+ }
}
- return string(value), nil
+
+ removeProfileData(instanceID)
+
+ return nil
}
// ApplyAllConfig starts from first configuration version and applies all versions in sequence
func (v *ConfigClient) ApplyAllConfig(instanceID string, configName string) error {
-
+ log.Printf("[Config Apply All] Instance %s Config %s", instanceID, configName)
lock, profileChannel := getProfileData(instanceID)
// Acquire per profile Mutex
lock.Lock()
@@ -459,14 +522,19 @@ func (v *ConfigClient) ApplyAllConfig(instanceID string, configName string) erro
//Apply all configurations
var i uint
for i = 1; i <= currentVersion; i++ {
- configNew, _, action, err := cvs.getConfigVersion(configName, i)
+ configNew, _, action, resources, err := cvs.getConfigVersion(configName, i)
if err != nil {
return pkgerrors.Wrap(err, "Get Config Version")
}
- err = applyConfig(instanceID, configNew, profileChannel, action)
+ if action != "DELETE" {
+ resources = nil
+ }
+ var appliedResources ([]helm.KubernetesResource)
+ appliedResources, err = applyConfig(instanceID, configNew, profileChannel, action, resources)
if err != nil {
return pkgerrors.Wrap(err, "Apply Config failed")
}
+ log.Printf("%s result: %s", action, appliedResources)
}
return nil
}
diff --git a/src/k8splugin/internal/app/config_backend.go b/src/k8splugin/internal/app/config_backend.go
index 3e5d8a3f..4fedb386 100644
--- a/src/k8splugin/internal/app/config_backend.go
+++ b/src/k8splugin/internal/app/config_backend.go
@@ -38,9 +38,10 @@ import (
//ConfigStore contains the values that will be stored in the database
type configVersionDBContent struct {
- ConfigNew Config `json:"config-new"`
- ConfigPrev Config `json:"config-prev"`
- Action string `json:"action"` // CRUD opration for this config
+ ConfigNew Config `json:"config-new"`
+ ConfigPrev Config `json:"config-prev"`
+ Action string `json:"action"` // CRUD opration for this config
+ Resources []helm.KubernetesResource `json:"resources"`
}
//ConfigStore to Store the Config
@@ -57,7 +58,8 @@ type ConfigVersionStore struct {
type configResourceList struct {
resourceTemplates []helm.KubernetesResourceTemplate
- createdResources []helm.KubernetesResource
+ resources []helm.KubernetesResource
+ updatedResources chan []helm.KubernetesResource
profile rb.Profile
action string
}
@@ -72,6 +74,7 @@ const (
storeName = "config"
tagCounter = "counter"
tagVersion = "configversion"
+ tagName = "configtag"
tagConfig = "configdata"
)
@@ -223,8 +226,37 @@ func (c ConfigStore) deleteConfig() (Config, error) {
return configPrev, nil
}
+//Cleanup stored data in etcd before instance is being deleted
+func (c ConfigVersionStore) cleanupIstanceTags(configName string) error {
+
+ rbName, rbVersion, profileName, _, err := resolveModelFromInstance(c.instanceID)
+ if err != nil {
+ return pkgerrors.Wrap(err, "Retrieving model info")
+ }
+
+ versionKey := constructKey(rbName, rbVersion, profileName, c.instanceID, tagVersion, configName)
+ err = db.Etcd.DeletePrefix(versionKey)
+ if err != nil {
+ log.Printf("Deleting versions of instance failed: %s", err.Error())
+ }
+
+ counterKey := constructKey(rbName, rbVersion, profileName, c.instanceID, tagCounter, configName)
+ err = db.Etcd.DeletePrefix(counterKey)
+ if err != nil {
+ log.Printf("Deleting counters of instance failed: %s", err.Error())
+ }
+
+ nameKey := constructKey(rbName, rbVersion, profileName, c.instanceID, tagName, configName)
+ err = db.Etcd.DeletePrefix(nameKey)
+ if err != nil {
+ log.Printf("Deleting counters of instance failed: %s", err.Error())
+ }
+
+ return nil
+}
+
// Create a version for the configuration. If previous config provided that is also stored
-func (c ConfigVersionStore) createConfigVersion(configNew, configPrev Config, action string) (uint, error) {
+func (c ConfigVersionStore) createConfigVersion(configNew, configPrev Config, action string, resources []helm.KubernetesResource) (uint, error) {
configName := ""
if configNew.ConfigName != "" {
@@ -249,6 +281,7 @@ func (c ConfigVersionStore) createConfigVersion(configNew, configPrev Config, ac
cs.Action = action
cs.ConfigNew = configNew
cs.ConfigPrev = configPrev
+ cs.Resources = resources //[]helm.KubernetesResource{}
configValue, err := db.Serialize(cs)
if err != nil {
@@ -288,27 +321,27 @@ func (c ConfigVersionStore) deleteConfigVersion(configName string) error {
// Read the specified version of the configuration and return its prev and current value.
// Also returns the action for the config version
-func (c ConfigVersionStore) getConfigVersion(configName string, version uint) (Config, Config, string, error) {
+func (c ConfigVersionStore) getConfigVersion(configName string, version uint) (Config, Config, string, []helm.KubernetesResource, error) {
rbName, rbVersion, profileName, _, err := resolveModelFromInstance(c.instanceID)
if err != nil {
- return Config{}, Config{}, "", pkgerrors.Wrap(err, "Retrieving model info")
+ return Config{}, Config{}, "", []helm.KubernetesResource{}, pkgerrors.Wrap(err, "Retrieving model info")
}
versionKey := constructKey(rbName, rbVersion, profileName, c.instanceID, tagVersion, configName, strconv.Itoa(int(version)))
configBytes, err := db.Etcd.Get(versionKey)
if err != nil {
- return Config{}, Config{}, "", pkgerrors.Wrap(err, "Get Config Version ")
+ return Config{}, Config{}, "", []helm.KubernetesResource{}, pkgerrors.Wrap(err, "Get Config Version ")
}
if configBytes != nil {
pr := configVersionDBContent{}
err = db.DeSerialize(string(configBytes), &pr)
if err != nil {
- return Config{}, Config{}, "", pkgerrors.Wrap(err, "DeSerialize Config Version")
+ return Config{}, Config{}, "", []helm.KubernetesResource{}, pkgerrors.Wrap(err, "DeSerialize Config Version")
}
- return pr.ConfigNew, pr.ConfigPrev, pr.Action, nil
+ return pr.ConfigNew, pr.ConfigPrev, pr.Action, pr.Resources, nil
}
- return Config{}, Config{}, "", pkgerrors.Wrap(err, "Invalid data ")
+ return Config{}, Config{}, "", []helm.KubernetesResource{}, pkgerrors.Wrap(err, "Invalid data ")
}
// Get the counter for the version
@@ -318,7 +351,7 @@ func (c ConfigVersionStore) getCurrentVersion(configName string) (uint, error) {
if err != nil {
return 0, pkgerrors.Wrap(err, "Retrieving model info")
}
- cfgKey := constructKey(rbName, rbVersion, profileName, c.instanceID, configName, tagCounter)
+ cfgKey := constructKey(rbName, rbVersion, profileName, c.instanceID, tagCounter, configName)
value, err := db.Etcd.Get(cfgKey)
if err != nil {
@@ -344,7 +377,7 @@ func (c ConfigVersionStore) updateVersion(configName string, counter uint) error
if err != nil {
return pkgerrors.Wrap(err, "Retrieving model info")
}
- cfgKey := constructKey(rbName, rbVersion, profileName, c.instanceID, configName, tagCounter)
+ cfgKey := constructKey(rbName, rbVersion, profileName, c.instanceID, tagCounter, configName)
err = db.Etcd.Put(cfgKey, strconv.Itoa(int(counter)))
if err != nil {
return pkgerrors.Wrap(err, "Counter DB Entry")
@@ -386,45 +419,87 @@ func (c ConfigVersionStore) decrementVersion(configName string) error {
return nil
}
+// Get tag version
+func (c ConfigVersionStore) getTagVersion(configName, tagNameValue string) (string, error) {
+ rbName, rbVersion, profileName, _, err := resolveModelFromInstance(c.instanceID)
+ if err != nil {
+ return "", pkgerrors.Wrap(err, "Retrieving model info")
+ }
+ tagKey := constructKey(rbName, rbVersion, profileName, c.instanceID, tagName, configName, tagNameValue)
+
+ value, err := db.Etcd.Get(tagKey)
+ if err != nil {
+ return "", pkgerrors.Wrap(err, "Config DB Entry Not found")
+ }
+ return string(value), nil
+}
+
+// Tag current version
+func (c ConfigVersionStore) tagCurrentVersion(configName, tagNameValue string) error {
+ currentVersion, err := c.getCurrentVersion(configName)
+ if err != nil {
+ return pkgerrors.Wrap(err, "Get Current Config Version ")
+ }
+ rbName, rbVersion, profileName, _, err := resolveModelFromInstance(c.instanceID)
+ if err != nil {
+ return pkgerrors.Wrap(err, "Retrieving model info")
+ }
+ tagKey := constructKey(rbName, rbVersion, profileName, c.instanceID, tagName, configName, tagNameValue)
+
+ err = db.Etcd.Put(tagKey, strconv.Itoa(int(currentVersion)))
+ if err != nil {
+ return pkgerrors.Wrap(err, "TagIt store DB")
+ }
+ return nil
+}
+
// Apply Config
-func applyConfig(instanceID string, p Config, pChannel chan configResourceList, action string) error {
+func applyConfig(instanceID string, p Config, pChannel chan configResourceList, action string, resources []helm.KubernetesResource) ([]helm.KubernetesResource, error) {
rbName, rbVersion, profileName, releaseName, err := resolveModelFromInstance(instanceID)
if err != nil {
- return pkgerrors.Wrap(err, "Retrieving model info")
+ return []helm.KubernetesResource{}, pkgerrors.Wrap(err, "Retrieving model info")
}
// Get Template and Resolve the template with values
crl, err := resolve(rbName, rbVersion, profileName, p, releaseName)
if err != nil {
- return pkgerrors.Wrap(err, "Resolve Config")
+ return []helm.KubernetesResource{}, pkgerrors.Wrap(err, "Resolve Config")
}
+ var updatedResources (chan []helm.KubernetesResource) = make(chan []helm.KubernetesResource)
crl.action = action
+ crl.resources = resources
+ crl.updatedResources = updatedResources
// Send the configResourceList to the channel. Using select for non-blocking channel
+ log.Printf("Before Sent to goroutine %v", crl.profile)
select {
case pChannel <- crl:
log.Printf("Message Sent to goroutine %v", crl.profile)
default:
}
- return nil
+ var resultResources []helm.KubernetesResource = <-updatedResources
+ return resultResources, nil
}
// Per Profile Go routine to apply the configuration to Cloud Region
func scheduleResources(c chan configResourceList) {
// Keep thread running
+ log.Printf("[scheduleResources]: START thread")
for {
data := <-c
//TODO: ADD Check to see if Application running
ic := NewInstanceClient()
resp, err := ic.Find(data.profile.RBName, data.profile.RBVersion, data.profile.ProfileName, nil)
- if err != nil || len(resp) == 0 {
+ if (err != nil || len(resp) == 0) && data.action != "STOP" {
log.Println("Error finding a running instance. Retrying later...")
- time.Sleep(time.Second * 10)
+ data.updatedResources <- []helm.KubernetesResource{}
continue
}
+ breakThread := false
switch {
case data.action == "POST":
log.Printf("[scheduleResources]: POST %v %v", data.profile, data.resourceTemplates)
+ var resources []helm.KubernetesResource
for _, inst := range resp {
k8sClient := KubernetesClient{}
err = k8sClient.Init(inst.Request.CloudRegion, inst.ID)
@@ -434,11 +509,11 @@ func scheduleResources(c chan configResourceList) {
continue
}
//assuming - the resource is not exist already
- data.createdResources, err = k8sClient.createResources(data.resourceTemplates, inst.Namespace)
+ resources, err = k8sClient.createResources(data.resourceTemplates, inst.Namespace)
errCreate := err
if err != nil {
// assuming - the err represent the resource is already exist, so going for update
- data.createdResources, err = k8sClient.updateResources(data.resourceTemplates, inst.Namespace)
+ resources, err = k8sClient.updateResources(data.resourceTemplates, inst.Namespace)
if err != nil {
log.Printf("Error Creating resources: %s", errCreate.Error())
log.Printf("Error Updating resources: %s", err.Error())
@@ -446,8 +521,10 @@ func scheduleResources(c chan configResourceList) {
}
}
}
+ data.updatedResources <- resources
case data.action == "PUT":
log.Printf("[scheduleResources]: PUT %v %v", data.profile, data.resourceTemplates)
+ var resources []helm.KubernetesResource
for _, inst := range resp {
k8sClient := KubernetesClient{}
err = k8sClient.Init(inst.Request.CloudRegion, inst.ID)
@@ -456,14 +533,16 @@ func scheduleResources(c chan configResourceList) {
//Move onto the next cloud region
continue
}
- data.createdResources, err = k8sClient.updateResources(data.resourceTemplates, inst.Namespace)
+
+ resources, err = k8sClient.updateResources(data.resourceTemplates, inst.Namespace)
if err != nil {
log.Printf("Error Updating resources: %s", err.Error())
continue
}
}
+ data.updatedResources <- resources
case data.action == "DELETE":
- log.Printf("[scheduleResources]: DELETE %v %v", data.profile, data.resourceTemplates)
+ log.Printf("[scheduleResources]: DELETE %v %v", data.profile, data.resources)
for _, inst := range resp {
k8sClient := KubernetesClient{}
err = k8sClient.Init(inst.Request.CloudRegion, inst.ID)
@@ -472,14 +551,22 @@ func scheduleResources(c chan configResourceList) {
//Move onto the next cloud region
continue
}
- err = k8sClient.deleteResources(data.createdResources, inst.Namespace)
+ err = k8sClient.deleteResources(data.resources, inst.Namespace)
if err != nil {
log.Printf("Error Deleting resources: %s", err.Error())
continue
}
}
+ data.updatedResources <- []helm.KubernetesResource{}
+
+ case data.action == "STOP":
+ breakThread = true
+ }
+ if breakThread {
+ break
}
}
+ log.Printf("[scheduleResources]: STOP thread")
}
//Resolve returns the path where the helm chart merged with
@@ -570,6 +657,25 @@ func getProfileData(key string) (*sync.Mutex, chan configResourceList) {
if !ok {
profileData.resourceChannel[key] = make(chan configResourceList)
go scheduleResources(profileData.resourceChannel[key])
+ time.Sleep(time.Second * 5)
}
return profileData.profileLockMap[key], profileData.resourceChannel[key]
}
+
+func removeProfileData(key string) {
+ profileData.Lock()
+ defer profileData.Unlock()
+ _, ok := profileData.profileLockMap[key]
+ if ok {
+ delete(profileData.profileLockMap, key)
+ }
+ _, ok = profileData.resourceChannel[key]
+ if ok {
+ log.Printf("Stop config thread for %s", key)
+ crl := configResourceList{
+ action: "STOP",
+ }
+ profileData.resourceChannel[key] <- crl
+ delete(profileData.resourceChannel, key)
+ }
+}
diff --git a/src/k8splugin/internal/app/config_test.go b/src/k8splugin/internal/app/config_test.go
index dc4779d8..0cc3c3ce 100644
--- a/src/k8splugin/internal/app/config_test.go
+++ b/src/k8splugin/internal/app/config_test.go
@@ -319,3 +319,6 @@ func TestRollbackConfig(t *testing.T) {
})
}
}
+
+func main() {
+}
diff --git a/src/k8splugin/internal/app/instance.go b/src/k8splugin/internal/app/instance.go
index 0f1f3d7e..b7f382ad 100644
--- a/src/k8splugin/internal/app/instance.go
+++ b/src/k8splugin/internal/app/instance.go
@@ -709,7 +709,8 @@ func (v *InstanceClient) Delete(id string) error {
return nil
} else if inst.Status != "DONE" {
//Recover is ongoing, do nothing here
- return nil
+ //return nil
+ //TODO: implement recovery
}
k8sClient := KubernetesClient{}
@@ -718,12 +719,6 @@ func (v *InstanceClient) Delete(id string) error {
return pkgerrors.Wrap(err, "Getting CloudRegion Information")
}
- configClient := NewConfigClient()
- configs, err := configClient.List(id)
- if err != nil {
- return pkgerrors.Wrap(err, "Getting Configs Information")
- }
-
inst.Status = "PRE-DELETE"
inst.HookProgress = ""
err = db.DBconn.Update(v.storeName, key, v.tagInst, inst)
@@ -751,15 +746,10 @@ func (v *InstanceClient) Delete(id string) error {
log.Printf("Update Instance DB Entry for release %s has error.", inst.ReleaseName)
}
- if len(configs) > 0 {
- log.Printf("Deleting config resources first")
- for _, config := range configs {
- log.Printf("Deleting Config %s Resources", config.ConfigName)
- _, err = configClient.Delete(id, config.ConfigName)
- if err != nil {
- return pkgerrors.Wrap(err, "Deleting Config Resources")
- }
- }
+ configClient := NewConfigClient()
+ err = configClient.Cleanup(id)
+ if err != nil {
+ return pkgerrors.Wrap(err, "Cleanup Config Resources")
}
err = k8sClient.deleteResources(inst.Resources, inst.Namespace)
diff --git a/src/k8splugin/internal/db/etcd.go b/src/k8splugin/internal/db/etcd.go
index a435b435..e455cc1a 100644
--- a/src/k8splugin/internal/db/etcd.go
+++ b/src/k8splugin/internal/db/etcd.go
@@ -39,6 +39,7 @@ type EtcdStore interface {
GetAll(key string) ([][]byte, error)
Put(key, value string) error
Delete(key string) error
+ DeletePrefix(keyPrefix string) error
}
// EtcdClient for Etcd
@@ -151,3 +152,16 @@ func (e EtcdClient) Delete(key string) error {
}
return nil
}
+
+// Delete values by prefix from Etcd DB
+func (e EtcdClient) DeletePrefix(keyPrefix string) error {
+
+ if e.cli == nil {
+ return pkgerrors.Errorf("Etcd Client not initialized")
+ }
+ _, err := e.cli.Delete(context.Background(), keyPrefix, clientv3.WithPrefix())
+ if err != nil {
+ return pkgerrors.Errorf("Delete prefix failed etcd entry:%s", err.Error())
+ }
+ return nil
+}
diff --git a/src/k8splugin/internal/db/etcd_testing.go b/src/k8splugin/internal/db/etcd_testing.go
index 9dfcad82..4b4dfe3e 100644
--- a/src/k8splugin/internal/db/etcd_testing.go
+++ b/src/k8splugin/internal/db/etcd_testing.go
@@ -55,3 +55,12 @@ func (c *MockEtcdClient) Delete(key string) error {
delete(c.Items, key)
return c.Err
}
+
+func (c *MockEtcdClient) DeletePrefix(key string) error {
+ for kvKey := range c.Items {
+ if strings.HasPrefix(kvKey, key) {
+ delete(c.Items, key)
+ }
+ }
+ return c.Err
+}
diff --git a/src/k8splugin/plugins/generic/plugin.go b/src/k8splugin/plugins/generic/plugin.go
index f71c436c..5815b74f 100644
--- a/src/k8splugin/plugins/generic/plugin.go
+++ b/src/k8splugin/plugins/generic/plugin.go
@@ -22,6 +22,7 @@ import (
appsv1 "k8s.io/api/apps/v1"
"k8s.io/client-go/kubernetes"
+
//appsv1beta1 "k8s.io/api/apps/v1beta1"
//appsv1beta2 "k8s.io/api/apps/v1beta2"
batchv1 "k8s.io/api/batch/v1"
@@ -340,58 +341,58 @@ func (g genericPlugin) Create(yamlFilePath string, namespace string, client plug
// Update deployment object in a specific Kubernetes cluster
func (g genericPlugin) Update(yamlFilePath string, namespace string, client plugin.KubernetesConnector) (string, error) {
- if namespace == "" {
- namespace = "default"
- }
-
- //Decode the yaml file to create a runtime.Object
- unstruct := &unstructured.Unstructured{}
- //Ignore the returned obj as we expect the data in unstruct
- _, err := utils.DecodeYAML(yamlFilePath, unstruct)
- if err != nil {
- return "", pkgerrors.Wrap(err, "Decode deployment object error")
- }
-
- dynClient := client.GetDynamicClient()
- mapper := client.GetMapper()
-
- gvk := unstruct.GroupVersionKind()
- mapping, err := mapper.RESTMapping(schema.GroupKind{Group: gvk.Group, Kind: gvk.Kind}, gvk.Version)
- if err != nil {
- return "", pkgerrors.Wrap(err, "Mapping kind to resource error")
- }
-
- //Add the tracking label to all resources created here
- labels := unstruct.GetLabels()
- //Check if labels exist for this object
- if labels == nil {
- labels = map[string]string{}
- }
- labels[config.GetConfiguration().KubernetesLabelName] = client.GetInstanceID()
- unstruct.SetLabels(labels)
-
- // This checks if the resource we are creating has a podSpec in it
- // Eg: Deployment, StatefulSet, Job etc..
- // If a PodSpec is found, the label will be added to it too.
- plugin.TagPodsIfPresent(unstruct, client.GetInstanceID())
-
- gvr := mapping.Resource
- var updatedObj *unstructured.Unstructured
-
- switch mapping.Scope.Name() {
- case meta.RESTScopeNameNamespace:
- updatedObj, err = dynClient.Resource(gvr).Namespace(namespace).Update(context.TODO(), unstruct, metav1.UpdateOptions{})
- case meta.RESTScopeNameRoot:
- updatedObj, err = dynClient.Resource(gvr).Update(context.TODO(), unstruct, metav1.UpdateOptions{})
- default:
- return "", pkgerrors.New("Got an unknown RESTSCopeName for mapping: " + gvk.String())
- }
-
- if err != nil {
- return "", pkgerrors.Wrap(err, "Update object error")
- }
-
- return updatedObj.GetName(), nil
+ if namespace == "" {
+ namespace = "default"
+ }
+
+ //Decode the yaml file to create a runtime.Object
+ unstruct := &unstructured.Unstructured{}
+ //Ignore the returned obj as we expect the data in unstruct
+ _, err := utils.DecodeYAML(yamlFilePath, unstruct)
+ if err != nil {
+ return "", pkgerrors.Wrap(err, "Decode deployment object error")
+ }
+
+ dynClient := client.GetDynamicClient()
+ mapper := client.GetMapper()
+
+ gvk := unstruct.GroupVersionKind()
+ mapping, err := mapper.RESTMapping(schema.GroupKind{Group: gvk.Group, Kind: gvk.Kind}, gvk.Version)
+ if err != nil {
+ return "", pkgerrors.Wrap(err, "Mapping kind to resource error")
+ }
+
+ //Add the tracking label to all resources created here
+ labels := unstruct.GetLabels()
+ //Check if labels exist for this object
+ if labels == nil {
+ labels = map[string]string{}
+ }
+ labels[config.GetConfiguration().KubernetesLabelName] = client.GetInstanceID()
+ unstruct.SetLabels(labels)
+
+ // This checks if the resource we are creating has a podSpec in it
+ // Eg: Deployment, StatefulSet, Job etc..
+ // If a PodSpec is found, the label will be added to it too.
+ plugin.TagPodsIfPresent(unstruct, client.GetInstanceID())
+
+ gvr := mapping.Resource
+ var updatedObj *unstructured.Unstructured
+
+ switch mapping.Scope.Name() {
+ case meta.RESTScopeNameNamespace:
+ updatedObj, err = dynClient.Resource(gvr).Namespace(namespace).Update(context.TODO(), unstruct, metav1.UpdateOptions{})
+ case meta.RESTScopeNameRoot:
+ updatedObj, err = dynClient.Resource(gvr).Update(context.TODO(), unstruct, metav1.UpdateOptions{})
+ default:
+ return "", pkgerrors.New("Got an unknown RESTSCopeName for mapping: " + gvk.String())
+ }
+
+ if err != nil {
+ return "", pkgerrors.Wrap(err, "Update object error")
+ }
+
+ return updatedObj.GetName(), nil
}
// Get an existing resource hosted in a specific Kubernetes cluster
@@ -425,7 +426,7 @@ func (g genericPlugin) Get(resource helm.KubernetesResource,
}
if err != nil {
- return "", pkgerrors.Wrap(err, "Delete object error")
+ return "", pkgerrors.Wrap(err, "Get object error")
}
return unstruct.GetName(), nil
diff --git a/src/k8splugin/plugins/namespace/plugin.go b/src/k8splugin/plugins/namespace/plugin.go
index 8732442e..6c6d1f6c 100644
--- a/src/k8splugin/plugins/namespace/plugin.go
+++ b/src/k8splugin/plugins/namespace/plugin.go
@@ -21,10 +21,10 @@ import (
pkgerrors "github.com/pkg/errors"
coreV1 "k8s.io/api/core/v1"
- metaV1 "k8s.io/apimachinery/pkg/apis/meta/v1"
- "k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/api/meta"
+ metaV1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
+ "k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
@@ -60,7 +60,12 @@ func (p namespacePlugin) Create(yamlFilePath string, namespace string, client pl
Name: namespace,
},
}
- _, err := client.GetStandardClient().CoreV1().Namespaces().Create(context.TODO(), namespaceObj, metaV1.CreateOptions{})
+ existingNs, err := client.GetStandardClient().CoreV1().Namespaces().Get(context.TODO(), namespace, metaV1.GetOptions{})
+ if err == nil && len(existingNs.ManagedFields) > 0 && existingNs.ManagedFields[0].Manager == "k8plugin" {
+ log.Printf("Namespace (%s) already ensured by plugin. Skip", namespace)
+ return namespace, nil
+ }
+ _, err = client.GetStandardClient().CoreV1().Namespaces().Create(context.TODO(), namespaceObj, metaV1.CreateOptions{})
if err != nil {
return "", pkgerrors.Wrap(err, "Create Namespace error")
}
@@ -128,5 +133,5 @@ func (p namespacePlugin) List(gvk schema.GroupVersionKind, namespace string, cli
func (p namespacePlugin) Update(yamlFilePath string, namespace string, client plugin.KubernetesConnector) (string, error) {
- return "", nil
+ return namespace, nil
}
diff --git a/src/k8splugin/plugins/service/plugin.go b/src/k8splugin/plugins/service/plugin.go
index aa5c685c..52dd4591 100644
--- a/src/k8splugin/plugins/service/plugin.go
+++ b/src/k8splugin/plugins/service/plugin.go
@@ -21,10 +21,10 @@ import (
pkgerrors "github.com/pkg/errors"
coreV1 "k8s.io/api/core/v1"
- metaV1 "k8s.io/apimachinery/pkg/apis/meta/v1"
- "k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/api/meta"
+ metaV1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
+ "k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
@@ -156,8 +156,43 @@ func (p servicePlugin) Get(resource helm.KubernetesResource, namespace string, c
return service.Name, nil
}
+// Update a service object in a specific Kubernetes cluster
func (p servicePlugin) Update(yamlFilePath string, namespace string, client plugin.KubernetesConnector) (string, error) {
+ if namespace == "" {
+ namespace = "default"
+ }
- return "", nil
+ obj, err := utils.DecodeYAML(yamlFilePath, nil)
+ if err != nil {
+ return "", pkgerrors.Wrap(err, "Decode service object error")
+ }
+ service, ok := obj.(*coreV1.Service)
+ if !ok {
+ return "", pkgerrors.New("Decoded object contains another resource different than Service")
+ }
+ service.Namespace = namespace
+
+ existingService, err := client.GetStandardClient().CoreV1().Services(namespace).Get(context.TODO(), service.Name, metaV1.GetOptions{})
+ if err == nil {
+ service.ResourceVersion = existingService.ResourceVersion
+ service.Spec.ClusterIP = existingService.Spec.ClusterIP
+ } else {
+ return p.Create(yamlFilePath, namespace, client)
+ }
+ labels := service.GetLabels()
+ //Check if labels exist for this object
+ if labels == nil {
+ labels = map[string]string{}
+ }
+ labels[config.GetConfiguration().KubernetesLabelName] = client.GetInstanceID()
+ service.SetLabels(labels)
+
+ _, err = client.GetStandardClient().CoreV1().Services(namespace).Update(context.TODO(), service, metaV1.UpdateOptions{})
+
+ if err != nil {
+ return "", pkgerrors.Wrap(err, "Update object error")
+ }
+
+ return service.Name, nil
}