diff options
Diffstat (limited to 'src')
27 files changed, 1016 insertions, 271 deletions
diff --git a/src/k8splugin/api/api.go b/src/k8splugin/api/api.go index 94fb9b34..a3e53dc8 100644 --- a/src/k8splugin/api/api.go +++ b/src/k8splugin/api/api.go @@ -104,6 +104,7 @@ func NewRouter(defClient rb.DefinitionManager, resRouter.HandleFunc("/definition/{rbname}", defHandler.listVersionsHandler).Methods("GET") resRouter.HandleFunc("/definition", defHandler.listAllHandler).Methods("GET") resRouter.HandleFunc("/definition/{rbname}/{rbversion}", defHandler.getHandler).Methods("GET") + resRouter.HandleFunc("/definition/{rbname}/{rbversion}", defHandler.updateHandler).Methods("PUT") resRouter.HandleFunc("/definition/{rbname}/{rbversion}", defHandler.deleteHandler).Methods("DELETE") //Setup resource bundle profile routes @@ -115,6 +116,7 @@ func NewRouter(defClient rb.DefinitionManager, resRouter.HandleFunc("/definition/{rbname}/{rbversion}/profile", profileHandler.listHandler).Methods("GET") resRouter.HandleFunc("/definition/{rbname}/{rbversion}/profile/{prname}/content", profileHandler.uploadHandler).Methods("POST") resRouter.HandleFunc("/definition/{rbname}/{rbversion}/profile/{prname}", profileHandler.getHandler).Methods("GET") + resRouter.HandleFunc("/definition/{rbname}/{rbversion}/profile/{prname}", profileHandler.updateHandler).Methods("PUT") resRouter.HandleFunc("/definition/{rbname}/{rbversion}/profile/{prname}", profileHandler.deleteHandler).Methods("DELETE") // Config Template @@ -123,8 +125,10 @@ func NewRouter(defClient rb.DefinitionManager, } templateHandler := rbTemplateHandler{client: templateClient} resRouter.HandleFunc("/definition/{rbname}/{rbversion}/config-template", templateHandler.createHandler).Methods("POST") + resRouter.HandleFunc("/definition/{rbname}/{rbversion}/config-template", templateHandler.listHandler).Methods("GET") resRouter.HandleFunc("/definition/{rbname}/{rbversion}/config-template/{tname}/content", templateHandler.uploadHandler).Methods("POST") resRouter.HandleFunc("/definition/{rbname}/{rbversion}/config-template/{tname}", templateHandler.getHandler).Methods("GET") + resRouter.HandleFunc("/definition/{rbname}/{rbversion}/config-template/{tname}", templateHandler.updateHandler).Methods("PUT") resRouter.HandleFunc("/definition/{rbname}/{rbversion}/config-template/{tname}", templateHandler.deleteHandler).Methods("DELETE") // Config value @@ -136,9 +140,10 @@ func NewRouter(defClient rb.DefinitionManager, instRouter.HandleFunc("/instance/{instID}/config", configHandler.listHandler).Methods("GET") instRouter.HandleFunc("/instance/{instID}/config/{cfgname}", configHandler.getHandler).Methods("GET") instRouter.HandleFunc("/instance/{instID}/config/{cfgname}", configHandler.updateHandler).Methods("PUT") - instRouter.HandleFunc("/instance/{instID}/config/{cfgname}", configHandler.deleteHandler).Methods("DELETE") - instRouter.HandleFunc("/instance/{instID}/config/rollback", configHandler.rollbackHandler).Methods("POST") - instRouter.HandleFunc("/instance/{instID}/config/tagit", configHandler.tagitHandler).Methods("POST") + instRouter.HandleFunc("/instance/{instID}/config/{cfgname}", configHandler.deleteAllHandler).Methods("DELETE") + instRouter.HandleFunc("/instance/{instID}/config/{cfgname}/delete", configHandler.deleteHandler).Methods("POST") + instRouter.HandleFunc("/instance/{instID}/config/{cfgname}/rollback", configHandler.rollbackHandler).Methods("POST") + instRouter.HandleFunc("/instance/{instID}/config/{cfgname}/tagit", configHandler.tagitHandler).Methods("POST") // Instance Healthcheck API if healthcheckClient == nil { diff --git a/src/k8splugin/api/confighandler.go b/src/k8splugin/api/confighandler.go index c2236378..a4f08131 100644 --- a/src/k8splugin/api/confighandler.go +++ b/src/k8splugin/api/confighandler.go @@ -117,6 +117,22 @@ func (h rbConfigHandler) listHandler(w http.ResponseWriter, r *http.Request) { } // deleteHandler handles DELETE operations on a config +func (h rbConfigHandler) deleteAllHandler(w http.ResponseWriter, r *http.Request) { + vars := mux.Vars(r) + instanceID := vars["instID"] + cfgName := vars["cfgname"] + + err := h.client.DeleteAll(instanceID, cfgName) + if err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(http.StatusAccepted) +} + +// deleteHandler handles delete operations on a config creating its delete version func (h rbConfigHandler) deleteHandler(w http.ResponseWriter, r *http.Request) { vars := mux.Vars(r) instanceID := vars["instID"] @@ -176,6 +192,7 @@ func (h rbConfigHandler) updateHandler(w http.ResponseWriter, r *http.Request) { func (h rbConfigHandler) rollbackHandler(w http.ResponseWriter, r *http.Request) { vars := mux.Vars(r) instanceID := vars["instID"] + cfgName := vars["cfgname"] if r.Body == nil { http.Error(w, "Empty body", http.StatusBadRequest) @@ -183,12 +200,13 @@ func (h rbConfigHandler) rollbackHandler(w http.ResponseWriter, r *http.Request) } var p app.ConfigRollback - err := json.NewDecoder(r.Body).Decode(&p) + err := json.NewDecoder(r.Body).Decode(&p.AnyOf) if err != nil { http.Error(w, err.Error(), http.StatusUnprocessableEntity) return } - err = h.client.Rollback(instanceID, p) + err = h.client.Rollback(instanceID, cfgName, p) + //err = h.client.Cleanup(instanceID) if err != nil { http.Error(w, err.Error(), http.StatusInternalServerError) return @@ -200,6 +218,7 @@ func (h rbConfigHandler) rollbackHandler(w http.ResponseWriter, r *http.Request) func (h rbConfigHandler) tagitHandler(w http.ResponseWriter, r *http.Request) { vars := mux.Vars(r) instanceID := vars["instID"] + cfgName := vars["cfgname"] if r.Body == nil { http.Error(w, "Empty body", http.StatusBadRequest) @@ -213,7 +232,7 @@ func (h rbConfigHandler) tagitHandler(w http.ResponseWriter, r *http.Request) { return } - err = h.client.Tagit(instanceID, p) + err = h.client.Tagit(instanceID, cfgName, p) if err != nil { http.Error(w, err.Error(), http.StatusInternalServerError) return diff --git a/src/k8splugin/api/configtemplatehandler.go b/src/k8splugin/api/configtemplatehandler.go index bd7c2db9..e8750fd8 100644 --- a/src/k8splugin/api/configtemplatehandler.go +++ b/src/k8splugin/api/configtemplatehandler.go @@ -20,9 +20,10 @@ import ( "encoding/json" "io" "io/ioutil" - "github.com/onap/multicloud-k8s/src/k8splugin/internal/rb" "net/http" + "github.com/onap/multicloud-k8s/src/k8splugin/internal/rb" + "github.com/gorilla/mux" ) @@ -58,7 +59,7 @@ func (h rbTemplateHandler) createHandler(w http.ResponseWriter, r *http.Request) return } - err = h.client.Create(rbName, rbVersion, p) + err = h.client.CreateOrUpdate(rbName, rbVersion, p, false) if err != nil { http.Error(w, err.Error(), http.StatusInternalServerError) return @@ -122,6 +123,78 @@ func (h rbTemplateHandler) getHandler(w http.ResponseWriter, r *http.Request) { } } +// createHandler handles creation of the template entry in the database +func (h rbTemplateHandler) updateHandler(w http.ResponseWriter, r *http.Request) { + var p rb.ConfigTemplate + + vars := mux.Vars(r) + rbName := vars["rbname"] + rbVersion := vars["rbversion"] + templateName := vars["tname"] + + err := json.NewDecoder(r.Body).Decode(&p) + switch { + case err == io.EOF: + http.Error(w, "Empty body", http.StatusBadRequest) + return + case err != nil: + http.Error(w, err.Error(), http.StatusUnprocessableEntity) + return + } + + // Name is required. + if p.TemplateName == "" { + http.Error(w, "Missing name in POST request", http.StatusBadRequest) + return + } + + ret, err := h.client.Get(rbName, rbVersion, templateName) + if err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + + if p.TemplateName != "" && p.TemplateName != ret.TemplateName { + http.Error(w, "Template name mismatch", http.StatusBadRequest) + return + } + + err = h.client.CreateOrUpdate(rbName, rbVersion, p, true) + if err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(http.StatusCreated) + err = json.NewEncoder(w).Encode(p) + if err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } +} + +// getHandler handles GET operations on a particular template +func (h rbTemplateHandler) listHandler(w http.ResponseWriter, r *http.Request) { + vars := mux.Vars(r) + rbName := vars["rbname"] + rbVersion := vars["rbversion"] + + ret, err := h.client.List(rbName, rbVersion) + if err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(http.StatusOK) + err = json.NewEncoder(w).Encode(ret) + if err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } +} + // deleteHandler handles DELETE operations on a template func (h rbTemplateHandler) deleteHandler(w http.ResponseWriter, r *http.Request) { vars := mux.Vars(r) diff --git a/src/k8splugin/api/defhandler.go b/src/k8splugin/api/defhandler.go index 480d4be5..3dea8ade 100644 --- a/src/k8splugin/api/defhandler.go +++ b/src/k8splugin/api/defhandler.go @@ -35,7 +35,7 @@ type rbDefinitionHandler struct { client rb.DefinitionManager } -// createHandler handles creation of the definition entry in the database +// createOrUpdateHandler handles creation of the definition entry in the database func (h rbDefinitionHandler) createHandler(w http.ResponseWriter, r *http.Request) { var v rb.Definition @@ -48,20 +48,64 @@ func (h rbDefinitionHandler) createHandler(w http.ResponseWriter, r *http.Reques http.Error(w, err.Error(), http.StatusUnprocessableEntity) return } + h.createOrUpdateHandler(v, w, false) +} + +// createOrUpdateHandler handles creation of the definition entry in the database +func (h rbDefinitionHandler) updateHandler(w http.ResponseWriter, r *http.Request) { + vars := mux.Vars(r) + name := vars["rbname"] + version := vars["rbversion"] + + var v rb.Definition + err := json.NewDecoder(r.Body).Decode(&v) + switch { + case err == io.EOF: + http.Error(w, "Empty body", http.StatusBadRequest) + return + case err != nil: + http.Error(w, err.Error(), http.StatusUnprocessableEntity) + return + } + + if v.RBVersion != "" && v.RBVersion != version { + http.Error(w, "RB version mismatch", http.StatusBadRequest) + return + } + + if v.RBName != "" && v.RBName != name { + http.Error(w, "RB name mismatch", http.StatusBadRequest) + return + } + + v.RBVersion = version + v.RBName = name + + h.createOrUpdateHandler(v, w, true) +} + +// createOrUpdateHandler handles creation of the definition entry in the database +func (h rbDefinitionHandler) createOrUpdateHandler(v rb.Definition, w http.ResponseWriter, update bool) { // Name is required. if v.RBName == "" { - http.Error(w, "Missing name in POST request", http.StatusBadRequest) + http.Error(w, "Missing name in request", http.StatusBadRequest) return } // Version is required. if v.RBVersion == "" { - http.Error(w, "Missing version in POST request", http.StatusBadRequest) + http.Error(w, "Missing version in request", http.StatusBadRequest) return } - ret, err := h.client.Create(v) + var ret rb.Definition + var err error + if update { + ret, err = h.client.Update(v) + } else { + ret, err = h.client.Create(v) + } if err != nil { http.Error(w, err.Error(), http.StatusInternalServerError) return diff --git a/src/k8splugin/api/profilehandler.go b/src/k8splugin/api/profilehandler.go index acd23060..1babc4ae 100644 --- a/src/k8splugin/api/profilehandler.go +++ b/src/k8splugin/api/profilehandler.go @@ -56,7 +56,7 @@ func (h rbProfileHandler) createHandler(w http.ResponseWriter, r *http.Request) return } - ret, err := h.client.Create(p) + ret, err := h.client.CreateOrUpdate(p, false) if err != nil { http.Error(w, err.Error(), http.StatusInternalServerError) return @@ -127,6 +127,68 @@ func (h rbProfileHandler) getHandler(w http.ResponseWriter, r *http.Request) { } } +// updateHandler updates Profile Key in the database +// Returns an rb.Profile +func (h rbProfileHandler) updateHandler(w http.ResponseWriter, r *http.Request) { + vars := mux.Vars(r) + rbName := vars["rbname"] + rbVersion := vars["rbversion"] + prName := vars["prname"] + + ret, err := h.client.Get(rbName, rbVersion, prName) + if err != nil { + // Separate "Not found" from generic DB errors + if strings.Contains(err.Error(), "Error finding") { + http.Error(w, err.Error(), http.StatusNotFound) + return + } else { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + } + + var p rb.Profile + + err = json.NewDecoder(r.Body).Decode(&p) + switch { + case err == io.EOF: + http.Error(w, "Empty body", http.StatusBadRequest) + return + case err != nil: + http.Error(w, err.Error(), http.StatusUnprocessableEntity) + return + } + + if p.ProfileName != "" && p.ProfileName != ret.ProfileName { + http.Error(w, "Profile name mismatch", http.StatusBadRequest) + return + } + + if p.RBVersion != "" && p.RBVersion != ret.RBVersion { + http.Error(w, "RB version mismatch", http.StatusBadRequest) + return + } + + if p.RBName != "" && p.RBName != ret.RBName { + http.Error(w, "RB name mismatch", http.StatusBadRequest) + return + } + + p.ProfileName = ret.ProfileName + p.RBVersion = ret.RBVersion + p.RBName = ret.RBName + + ret, err = h.client.CreateOrUpdate(p, true) + + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(http.StatusOK) + err = json.NewEncoder(w).Encode(ret) + if err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } +} + // getHandler gets all profiles of a Resource Bundle Key in the database // Returns a list of rb.Profile func (h rbProfileHandler) listHandler(w http.ResponseWriter, r *http.Request) { diff --git a/src/k8splugin/api/profilehandler_test.go b/src/k8splugin/api/profilehandler_test.go index 32d0061f..181b775b 100644 --- a/src/k8splugin/api/profilehandler_test.go +++ b/src/k8splugin/api/profilehandler_test.go @@ -42,7 +42,7 @@ type mockRBProfile struct { Err error } -func (m *mockRBProfile) Create(inp rb.Profile) (rb.Profile, error) { +func (m *mockRBProfile) CreateOrUpdate(inp rb.Profile, update bool) (rb.Profile, error) { if m.Err != nil { return rb.Profile{}, m.Err } diff --git a/src/k8splugin/api/queryhandler.go b/src/k8splugin/api/queryhandler.go index 9c11954c..f5950cdf 100644 --- a/src/k8splugin/api/queryhandler.go +++ b/src/k8splugin/api/queryhandler.go @@ -52,7 +52,7 @@ func (i queryHandler) queryHandler(w http.ResponseWriter, r *http.Request) { return } // instance id is irrelevant here - resp, err := i.client.Query(namespace, cloudRegion, apiVersion, kind, name, labels, "query") + resp, err := i.client.Query(namespace, cloudRegion, apiVersion, kind, name, labels) if err != nil { log.Error("Error getting Query results", log.Fields{ "error": err, diff --git a/src/k8splugin/internal/app/client.go b/src/k8splugin/internal/app/client.go index 9813333e..a2868cd5 100644 --- a/src/k8splugin/internal/app/client.go +++ b/src/k8splugin/internal/app/client.go @@ -20,25 +20,30 @@ package app import ( "context" "io/ioutil" + appsv1 "k8s.io/api/apps/v1" + //appsv1beta1 "k8s.io/api/apps/v1beta1" //appsv1beta2 "k8s.io/api/apps/v1beta2" batchv1 "k8s.io/api/batch/v1" corev1 "k8s.io/api/core/v1" + //extensionsv1beta1 "k8s.io/api/extensions/v1beta1" //apiextv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1" //apiextv1beta1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1beta1" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "os" "strings" "time" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + + logger "log" + "github.com/onap/multicloud-k8s/src/k8splugin/internal/config" "github.com/onap/multicloud-k8s/src/k8splugin/internal/connection" "github.com/onap/multicloud-k8s/src/k8splugin/internal/helm" log "github.com/onap/multicloud-k8s/src/k8splugin/internal/logutils" "github.com/onap/multicloud-k8s/src/k8splugin/internal/plugin" - logger "log" pkgerrors "github.com/pkg/errors" "k8s.io/apimachinery/pkg/api/meta" @@ -251,6 +256,7 @@ func (k *KubernetesClient) queryResources(apiVersion, kind, labelSelector, names LabelSelector: labelSelector, } var unstrList *unstructured.UnstructuredList + dynClient.Resource(gvr).Namespace(namespace).List(context.TODO(), opts) switch mapping.Scope.Name() { case meta.RESTScopeNameNamespace: unstrList, err = dynClient.Resource(gvr).Namespace(namespace).List(context.TODO(), opts) @@ -546,8 +552,16 @@ func (k *KubernetesClient) DeleteKind(resource helm.KubernetesResource, namespac } err = pluginImpl.Delete(resource, namespace, k) + if err != nil { - return pkgerrors.Wrap(err, "Error deleting "+resource.Name) + if strings.Contains(err.Error(), "not found") == false { + return pkgerrors.Wrap(err, "Error deleting "+resource.Name) + } else { + log.Warn("Resource already does not exist", log.Fields{ + "gvk": resource.GVK, + "resource": resource.Name, + }) + } } return nil diff --git a/src/k8splugin/internal/app/client_test.go b/src/k8splugin/internal/app/client_test.go index 0ba244d2..f51c15fc 100644 --- a/src/k8splugin/internal/app/client_test.go +++ b/src/k8splugin/internal/app/client_test.go @@ -15,13 +15,14 @@ package app import ( "encoding/base64" - "github.com/onap/multicloud-k8s/src/k8splugin/internal/utils" "io/ioutil" "os" "plugin" "reflect" "testing" + "github.com/onap/multicloud-k8s/src/k8splugin/internal/utils" + "github.com/onap/multicloud-k8s/src/k8splugin/internal/connection" "github.com/onap/multicloud-k8s/src/k8splugin/internal/db" "github.com/onap/multicloud-k8s/src/k8splugin/internal/helm" diff --git a/src/k8splugin/internal/app/config.go b/src/k8splugin/internal/app/config.go index 94acadcc..8952c16d 100644 --- a/src/k8splugin/internal/app/config.go +++ b/src/k8splugin/internal/app/config.go @@ -18,10 +18,11 @@ package app import ( + "log" "strconv" "strings" - "github.com/onap/multicloud-k8s/src/k8splugin/internal/db" + "github.com/onap/multicloud-k8s/src/k8splugin/internal/helm" pkgerrors "github.com/pkg/errors" ) @@ -66,8 +67,10 @@ type ConfigManager interface { Help() map[string]string Update(instanceID, configName string, p Config) (ConfigResult, error) Delete(instanceID, configName string) (ConfigResult, error) - Rollback(instanceID string, p ConfigRollback) error - Tagit(instanceID string, p ConfigTagit) error + DeleteAll(instanceID, configName string) error + Rollback(instanceID string, configName string, p ConfigRollback) error + Cleanup(instanceID string) error + Tagit(instanceID string, configName string, p ConfigTagit) error } // ConfigClient implements the ConfigManager @@ -94,7 +97,7 @@ func (v *ConfigClient) Help() map[string]string { // Create an entry for the config in the database func (v *ConfigClient) Create(instanceID string, p Config) (ConfigResult, error) { - + log.Printf("[Config Create] Instance %s", instanceID) // Check required fields if p.ConfigName == "" || p.TemplateName == "" || len(p.Values) == 0 { return ConfigResult{}, pkgerrors.New("Incomplete Configuration Provided") @@ -120,10 +123,12 @@ func (v *ConfigClient) Create(instanceID string, p Config) (ConfigResult, error) // Acquire per profile Mutex lock.Lock() defer lock.Unlock() - err = applyConfig(instanceID, p, profileChannel, "POST") + var appliedResources ([]helm.KubernetesResource) + appliedResources, err = applyConfig(instanceID, p, profileChannel, "POST", nil) if err != nil { return ConfigResult{}, pkgerrors.Wrap(err, "Apply Config failed") } + log.Printf("POST result: %s", appliedResources) // Create Config DB Entry err = cs.createConfig(p) if err != nil { @@ -132,8 +137,9 @@ func (v *ConfigClient) Create(instanceID string, p Config) (ConfigResult, error) // Create Version Entry in DB for Config cvs := ConfigVersionStore{ instanceID: instanceID, + configName: p.ConfigName, } - version, err := cvs.createConfigVersion(p, Config{}, "POST") + version, err := cvs.createConfigVersion(p, Config{}, "POST", appliedResources) if err != nil { return ConfigResult{}, pkgerrors.Wrap(err, "Create Config Version DB Entry") } @@ -153,7 +159,7 @@ func (v *ConfigClient) Create(instanceID string, p Config) (ConfigResult, error) // Update an entry for the config in the database func (v *ConfigClient) Update(instanceID, configName string, p Config) (ConfigResult, error) { - + log.Printf("[Config Update] Instance %s Config %s", instanceID, configName) // Check required fields if len(p.Values) == 0 { return ConfigResult{}, pkgerrors.New("Incomplete Configuration Provided") @@ -176,10 +182,12 @@ func (v *ConfigClient) Update(instanceID, configName string, p Config) (ConfigRe // Acquire per profile Mutex lock.Lock() defer lock.Unlock() - err = applyConfig(instanceID, p, profileChannel, "PUT") + var appliedResources ([]helm.KubernetesResource) + appliedResources, err = applyConfig(instanceID, p, profileChannel, "PUT", nil) if err != nil { return ConfigResult{}, pkgerrors.Wrap(err, "Apply Config failed") } + log.Printf("PUT result: %s", appliedResources) // Update Config DB Entry configPrev, err := cs.updateConfig(p) if err != nil { @@ -188,8 +196,9 @@ func (v *ConfigClient) Update(instanceID, configName string, p Config) (ConfigRe // Create Version Entry in DB for Config cvs := ConfigVersionStore{ instanceID: instanceID, + configName: configName, } - version, err := cvs.createConfigVersion(p, configPrev, "PUT") + version, err := cvs.createConfigVersion(p, configPrev, "PUT", appliedResources) if err != nil { return ConfigResult{}, pkgerrors.Wrap(err, "Create Config Version DB Entry") } @@ -245,8 +254,49 @@ func (v *ConfigClient) List(instanceID string) ([]Config, error) { } // Delete the Config from database -func (v *ConfigClient) Delete(instanceID, configName string) (ConfigResult, error) { +func (v *ConfigClient) DeleteAll(instanceID, configName string) error { + log.Printf("[Config Delete All] Instance %s Config %s", instanceID, configName) + // Check if Config exists + cs := ConfigStore{ + instanceID: instanceID, + configName: configName, + } + _, err := cs.getConfig() + if err != nil { + return pkgerrors.Wrap(err, "Update Error - Config doesn't exist") + } + // Get Version Entry in DB for Config + cvs := ConfigVersionStore{ + instanceID: instanceID, + configName: configName, + } + currentVersion, err := cvs.getCurrentVersion(configName) + if err != nil { + return pkgerrors.Wrap(err, "Current version get failed") + } + _, _, action, _, err := cvs.getConfigVersion(configName, currentVersion) + if err != nil { + return pkgerrors.Wrap(err, "Config version get failed") + } + + if action != "DELETE" { + _, err = v.Delete(instanceID, configName) + if err != nil { + return pkgerrors.Wrap(err, "Config DELETE version failed") + } + } + // Delete Config from DB + _, err = cs.deleteConfig() + if err != nil { + return pkgerrors.Wrap(err, "Delete Config DB Entry") + } + cvs.cleanupIstanceTags(configName) + return nil +} +// Apply update with delete operation +func (v *ConfigClient) Delete(instanceID, configName string) (ConfigResult, error) { + log.Printf("[Config Delete] Instance %s Config %s", instanceID, configName) // Resolving rbName, Version, etc. not to break response rbName, rbVersion, profileName, _, err := resolveModelFromInstance(instanceID) if err != nil { @@ -259,28 +309,39 @@ func (v *ConfigClient) Delete(instanceID, configName string) (ConfigResult, erro } p, err := cs.getConfig() if err != nil { - return ConfigResult{}, pkgerrors.Wrap(err, "Update Error - Config doesn't exist") + return ConfigResult{}, pkgerrors.Wrap(err, "Delete Error - Config doesn't exist") } lock, profileChannel := getProfileData(instanceID) // Acquire per profile Mutex lock.Lock() defer lock.Unlock() - err = applyConfig(instanceID, p, profileChannel, "DELETE") + // Create Version Entry in DB for Config + cvs := ConfigVersionStore{ + instanceID: instanceID, + configName: configName, + } + currentVersion, err := cvs.getCurrentVersion(configName) if err != nil { - return ConfigResult{}, pkgerrors.Wrap(err, "Apply Config failed") + return ConfigResult{}, pkgerrors.Wrap(err, "Current version get failed") } - // Delete Config from DB - configPrev, err := cs.deleteConfig() + _, _, _, resources, err := cvs.getConfigVersion(configName, currentVersion) if err != nil { - return ConfigResult{}, pkgerrors.Wrap(err, "Delete Config DB Entry") + return ConfigResult{}, pkgerrors.Wrap(err, "Config version get failed") } - // Create Version Entry in DB for Config - cvs := ConfigVersionStore{ - instanceID: instanceID, + + _, err = applyConfig(instanceID, p, profileChannel, "DELETE", resources) + if err != nil { + return ConfigResult{}, pkgerrors.Wrap(err, "Apply Config failed") + } + log.Printf("DELETE resources: [%s]", resources) + // Update Config from DB + configPrev, err := cs.updateConfig(p) + if err != nil { + return ConfigResult{}, pkgerrors.Wrap(err, "Update Config DB Entry") } - version, err := cvs.createConfigVersion(Config{}, configPrev, "DELETE") + version, err := cvs.createConfigVersion(p, configPrev, "DELETE", []helm.KubernetesResource{}) if err != nil { - return ConfigResult{}, pkgerrors.Wrap(err, "Delete Config Version DB Entry") + return ConfigResult{}, pkgerrors.Wrap(err, "Create Delete Config Version DB Entry") } // Create Result structure @@ -297,13 +358,13 @@ func (v *ConfigClient) Delete(instanceID, configName string) (ConfigResult, erro } // Rollback starts from current version and rollbacks to the version desired -func (v *ConfigClient) Rollback(instanceID string, rback ConfigRollback) error { - +func (v *ConfigClient) Rollback(instanceID string, configName string, rback ConfigRollback) error { + log.Printf("[Config Rollback] Instance %s Config %s", instanceID, configName) var reqVersion string var err error if rback.AnyOf.ConfigTag != "" { - reqVersion, err = v.GetTagVersion(instanceID, rback.AnyOf.ConfigTag) + reqVersion, err = v.GetTagVersion(instanceID, configName, rback.AnyOf.ConfigTag) if err != nil { return pkgerrors.Wrap(err, "Rollback Invalid tag") } @@ -326,8 +387,9 @@ func (v *ConfigClient) Rollback(instanceID string, rback ConfigRollback) error { cvs := ConfigVersionStore{ instanceID: instanceID, + configName: configName, } - currentVersion, err := cvs.getCurrentVersion() + currentVersion, err := cvs.getCurrentVersion(configName) if err != nil { return pkgerrors.Wrap(err, "Rollback Get Current Config Version ") } @@ -338,40 +400,35 @@ func (v *ConfigClient) Rollback(instanceID string, rback ConfigRollback) error { //Rollback all the intermettinent configurations for i := currentVersion; i > rollbackIndex; i-- { - configNew, configPrev, action, err := cvs.getConfigVersion(i) + configNew, configPrev, _, resources, err := cvs.getConfigVersion(configName, i) if err != nil { return pkgerrors.Wrap(err, "Rollback Get Config Version") } + _, _, prevAction, _, err := cvs.getConfigVersion(configName, i-1) + if err != nil { + return pkgerrors.Wrap(err, "Rollback Get Prev Config Version") + } cs := ConfigStore{ instanceID: instanceID, configName: configNew.ConfigName, } - if action == "PUT" { - // PUT is proceeded by PUT or POST - err = applyConfig(instanceID, configPrev, profileChannel, "PUT") + if prevAction != "DELETE" { + appliedResources, err := applyConfig(instanceID, configPrev, profileChannel, prevAction, nil) if err != nil { return pkgerrors.Wrap(err, "Apply Config failed") } + log.Printf("%s result: %s", prevAction, appliedResources) _, err = cs.updateConfig(configPrev) if err != nil { return pkgerrors.Wrap(err, "Update Config DB Entry") } - } else if action == "POST" { + } else { // POST is always preceeded by Config not existing - err = applyConfig(instanceID, configNew, profileChannel, "DELETE") - if err != nil { - return pkgerrors.Wrap(err, "Delete Config failed") - } - _, err = cs.deleteConfig() - if err != nil { - return pkgerrors.Wrap(err, "Delete Config DB Entry") - } - } else if action == "DELETE" { - // DELETE is proceeded by PUT or POST - err = applyConfig(instanceID, configPrev, profileChannel, "PUT") + _, err := applyConfig(instanceID, configNew, profileChannel, prevAction, resources) if err != nil { return pkgerrors.Wrap(err, "Delete Config failed") } + log.Printf("DELETE resources: %s", resources) _, err = cs.updateConfig(configPrev) if err != nil { return pkgerrors.Wrap(err, "Update Config DB Entry") @@ -380,7 +437,7 @@ func (v *ConfigClient) Rollback(instanceID string, rback ConfigRollback) error { } for i := currentVersion; i > rollbackIndex; i-- { // Delete rolled back items - err = cvs.deleteConfigVersion() + err = cvs.deleteConfigVersion(configName) if err != nil { return pkgerrors.Wrap(err, "Delete Config Version ") } @@ -389,12 +446,8 @@ func (v *ConfigClient) Rollback(instanceID string, rback ConfigRollback) error { } // Tagit tags the current version with the tag provided -func (v *ConfigClient) Tagit(instanceID string, tag ConfigTagit) error { - - rbName, rbVersion, profileName, _, err := resolveModelFromInstance(instanceID) - if err != nil { - return pkgerrors.Wrap(err, "Retrieving model info") - } +func (v *ConfigClient) Tagit(instanceID string, configName string, tag ConfigTagit) error { + log.Printf("[Config Tag It] Instance %s Config %s", instanceID, configName) lock, _ := getProfileData(instanceID) // Acquire per profile Mutex lock.Lock() @@ -402,39 +455,54 @@ func (v *ConfigClient) Tagit(instanceID string, tag ConfigTagit) error { cvs := ConfigVersionStore{ instanceID: instanceID, + configName: configName, } - currentVersion, err := cvs.getCurrentVersion() + err := cvs.tagCurrentVersion(configName, tag.TagName) if err != nil { - return pkgerrors.Wrap(err, "Get Current Config Version ") + return pkgerrors.Wrap(err, "Tag of current version failed") } - tagKey := constructKey(rbName, rbVersion, profileName, instanceID, v.tagTag, tag.TagName) + return nil +} - err = db.Etcd.Put(tagKey, strconv.Itoa(int(currentVersion))) +// GetTagVersion returns the version associated with the tag +func (v *ConfigClient) GetTagVersion(instanceID, configName string, tagName string) (string, error) { + log.Printf("[Config Get Tag Version] Instance %s Config %s", instanceID, configName) + cvs := ConfigVersionStore{ + instanceID: instanceID, + configName: configName, + } + value, err := cvs.getTagVersion(configName, tagName) if err != nil { - return pkgerrors.Wrap(err, "TagIt store DB") + return "", pkgerrors.Wrap(err, "Tag of current version failed") } - return nil + + return value, nil } -// GetTagVersion returns the version associated with the tag -func (v *ConfigClient) GetTagVersion(instanceID, tagName string) (string, error) { +// Cleanup version used only when instance is being deleted. We do not pass errors and we try to delete data +func (v *ConfigClient) Cleanup(instanceID string) error { + log.Printf("[Config Cleanup] Instance %s", instanceID) + configs, err := v.List(instanceID) - rbName, rbVersion, profileName, _, err := resolveModelFromInstance(instanceID) if err != nil { - return "", pkgerrors.Wrap(err, "Retrieving model info") + return pkgerrors.Wrap(err, "Retrieving active config list info") } - tagKey := constructKey(rbName, rbVersion, profileName, instanceID, v.tagTag, tagName) - value, err := db.Etcd.Get(tagKey) - if err != nil { - return "", pkgerrors.Wrap(err, "Config DB Entry Not found") + for _, config := range configs { + err = v.DeleteAll(instanceID, config.ConfigName) + if err != nil { + log.Printf("Config %s delete failed: %s", config.ConfigName, err.Error()) + } } - return string(value), nil + + removeProfileData(instanceID) + + return nil } // ApplyAllConfig starts from first configuration version and applies all versions in sequence -func (v *ConfigClient) ApplyAllConfig(instanceID string) error { - +func (v *ConfigClient) ApplyAllConfig(instanceID string, configName string) error { + log.Printf("[Config Apply All] Instance %s Config %s", instanceID, configName) lock, profileChannel := getProfileData(instanceID) // Acquire per profile Mutex lock.Lock() @@ -442,8 +510,9 @@ func (v *ConfigClient) ApplyAllConfig(instanceID string) error { cvs := ConfigVersionStore{ instanceID: instanceID, + configName: configName, } - currentVersion, err := cvs.getCurrentVersion() + currentVersion, err := cvs.getCurrentVersion(configName) if err != nil { return pkgerrors.Wrap(err, "Get Current Config Version ") } @@ -453,14 +522,19 @@ func (v *ConfigClient) ApplyAllConfig(instanceID string) error { //Apply all configurations var i uint for i = 1; i <= currentVersion; i++ { - configNew, _, action, err := cvs.getConfigVersion(i) + configNew, _, action, resources, err := cvs.getConfigVersion(configName, i) if err != nil { return pkgerrors.Wrap(err, "Get Config Version") } - err = applyConfig(instanceID, configNew, profileChannel, action) + if action != "DELETE" { + resources = nil + } + var appliedResources ([]helm.KubernetesResource) + appliedResources, err = applyConfig(instanceID, configNew, profileChannel, action, resources) if err != nil { return pkgerrors.Wrap(err, "Apply Config failed") } + log.Printf("%s result: %s", action, appliedResources) } return nil } diff --git a/src/k8splugin/internal/app/config_backend.go b/src/k8splugin/internal/app/config_backend.go index 30a480df..1f22922a 100644 --- a/src/k8splugin/internal/app/config_backend.go +++ b/src/k8splugin/internal/app/config_backend.go @@ -38,9 +38,10 @@ import ( //ConfigStore contains the values that will be stored in the database type configVersionDBContent struct { - ConfigNew Config `json:"config-new"` - ConfigPrev Config `json:"config-prev"` - Action string `json:"action"` // CRUD opration for this config + ConfigNew Config `json:"config-new"` + ConfigPrev Config `json:"config-prev"` + Action string `json:"action"` // CRUD opration for this config + Resources []helm.KubernetesResource `json:"resources"` } //ConfigStore to Store the Config @@ -52,11 +53,13 @@ type ConfigStore struct { //ConfigVersionStore to Store the Versions of the Config type ConfigVersionStore struct { instanceID string + configName string } type configResourceList struct { resourceTemplates []helm.KubernetesResourceTemplate - createdResources []helm.KubernetesResource + resources []helm.KubernetesResource + updatedResources chan []helm.KubernetesResource profile rb.Profile action string } @@ -71,6 +74,7 @@ const ( storeName = "config" tagCounter = "counter" tagVersion = "configversion" + tagName = "configtag" tagConfig = "configdata" ) @@ -222,10 +226,46 @@ func (c ConfigStore) deleteConfig() (Config, error) { return configPrev, nil } +//Cleanup stored data in etcd before instance is being deleted +func (c ConfigVersionStore) cleanupIstanceTags(configName string) error { + + rbName, rbVersion, profileName, _, err := resolveModelFromInstance(c.instanceID) + if err != nil { + return pkgerrors.Wrap(err, "Retrieving model info") + } + + versionKey := constructKey(rbName, rbVersion, profileName, c.instanceID, tagVersion, configName) + err = db.Etcd.DeletePrefix(versionKey) + if err != nil { + log.Printf("Deleting versions of instance failed: %s", err.Error()) + } + + counterKey := constructKey(rbName, rbVersion, profileName, c.instanceID, tagCounter, configName) + err = db.Etcd.DeletePrefix(counterKey) + if err != nil { + log.Printf("Deleting counters of instance failed: %s", err.Error()) + } + + nameKey := constructKey(rbName, rbVersion, profileName, c.instanceID, tagName, configName) + err = db.Etcd.DeletePrefix(nameKey) + if err != nil { + log.Printf("Deleting counters of instance failed: %s", err.Error()) + } + + return nil +} + // Create a version for the configuration. If previous config provided that is also stored -func (c ConfigVersionStore) createConfigVersion(configNew, configPrev Config, action string) (uint, error) { +func (c ConfigVersionStore) createConfigVersion(configNew, configPrev Config, action string, resources []helm.KubernetesResource) (uint, error) { - version, err := c.incrementVersion() + configName := "" + if configNew.ConfigName != "" { + configName = configNew.ConfigName + } else { + configName = configPrev.ConfigName + } + + version, err := c.incrementVersion(configName) if err != nil { return 0, pkgerrors.Wrap(err, "Get Next Version") @@ -234,12 +274,14 @@ func (c ConfigVersionStore) createConfigVersion(configNew, configPrev Config, ac if err != nil { return 0, pkgerrors.Wrap(err, "Retrieving model info") } - versionKey := constructKey(rbName, rbVersion, profileName, c.instanceID, tagVersion, strconv.Itoa(int(version))) + + versionKey := constructKey(rbName, rbVersion, profileName, c.instanceID, tagVersion, configName, strconv.Itoa(int(version))) var cs configVersionDBContent cs.Action = action cs.ConfigNew = configNew cs.ConfigPrev = configPrev + cs.Resources = resources //[]helm.KubernetesResource{} configValue, err := db.Serialize(cs) if err != nil { @@ -253,9 +295,9 @@ func (c ConfigVersionStore) createConfigVersion(configNew, configPrev Config, ac } // Delete current version of the configuration. Configuration always deleted from top -func (c ConfigVersionStore) deleteConfigVersion() error { +func (c ConfigVersionStore) deleteConfigVersion(configName string) error { - counter, err := c.getCurrentVersion() + counter, err := c.getCurrentVersion(configName) if err != nil { return pkgerrors.Wrap(err, "Get Next Version") @@ -264,13 +306,13 @@ func (c ConfigVersionStore) deleteConfigVersion() error { if err != nil { return pkgerrors.Wrap(err, "Retrieving model info") } - versionKey := constructKey(rbName, rbVersion, profileName, c.instanceID, tagVersion, strconv.Itoa(int(counter))) + versionKey := constructKey(rbName, rbVersion, profileName, c.instanceID, tagVersion, configName, strconv.Itoa(int(counter))) err = db.Etcd.Delete(versionKey) if err != nil { return pkgerrors.Wrap(err, "Delete Config DB Entry") } - err = c.decrementVersion() + err = c.decrementVersion(configName) if err != nil { return pkgerrors.Wrap(err, "Decrement Version") } @@ -279,37 +321,37 @@ func (c ConfigVersionStore) deleteConfigVersion() error { // Read the specified version of the configuration and return its prev and current value. // Also returns the action for the config version -func (c ConfigVersionStore) getConfigVersion(version uint) (Config, Config, string, error) { +func (c ConfigVersionStore) getConfigVersion(configName string, version uint) (Config, Config, string, []helm.KubernetesResource, error) { rbName, rbVersion, profileName, _, err := resolveModelFromInstance(c.instanceID) if err != nil { - return Config{}, Config{}, "", pkgerrors.Wrap(err, "Retrieving model info") + return Config{}, Config{}, "", []helm.KubernetesResource{}, pkgerrors.Wrap(err, "Retrieving model info") } - versionKey := constructKey(rbName, rbVersion, profileName, c.instanceID, tagVersion, strconv.Itoa(int(version))) + versionKey := constructKey(rbName, rbVersion, profileName, c.instanceID, tagVersion, configName, strconv.Itoa(int(version))) configBytes, err := db.Etcd.Get(versionKey) if err != nil { - return Config{}, Config{}, "", pkgerrors.Wrap(err, "Get Config Version ") + return Config{}, Config{}, "", []helm.KubernetesResource{}, pkgerrors.Wrap(err, "Get Config Version ") } if configBytes != nil { pr := configVersionDBContent{} err = db.DeSerialize(string(configBytes), &pr) if err != nil { - return Config{}, Config{}, "", pkgerrors.Wrap(err, "DeSerialize Config Version") + return Config{}, Config{}, "", []helm.KubernetesResource{}, pkgerrors.Wrap(err, "DeSerialize Config Version") } - return pr.ConfigNew, pr.ConfigPrev, pr.Action, nil + return pr.ConfigNew, pr.ConfigPrev, pr.Action, pr.Resources, nil } - return Config{}, Config{}, "", pkgerrors.Wrap(err, "Invalid data ") + return Config{}, Config{}, "", []helm.KubernetesResource{}, pkgerrors.Wrap(err, "Invalid data ") } // Get the counter for the version -func (c ConfigVersionStore) getCurrentVersion() (uint, error) { +func (c ConfigVersionStore) getCurrentVersion(configName string) (uint, error) { rbName, rbVersion, profileName, _, err := resolveModelFromInstance(c.instanceID) if err != nil { return 0, pkgerrors.Wrap(err, "Retrieving model info") } - cfgKey := constructKey(rbName, rbVersion, profileName, c.instanceID, tagCounter) + cfgKey := constructKey(rbName, rbVersion, profileName, c.instanceID, tagCounter, configName) value, err := db.Etcd.Get(cfgKey) if err != nil { @@ -329,13 +371,13 @@ func (c ConfigVersionStore) getCurrentVersion() (uint, error) { } // Update the counter for the version -func (c ConfigVersionStore) updateVersion(counter uint) error { +func (c ConfigVersionStore) updateVersion(configName string, counter uint) error { rbName, rbVersion, profileName, _, err := resolveModelFromInstance(c.instanceID) if err != nil { return pkgerrors.Wrap(err, "Retrieving model info") } - cfgKey := constructKey(rbName, rbVersion, profileName, c.instanceID, tagCounter) + cfgKey := constructKey(rbName, rbVersion, profileName, c.instanceID, tagCounter, configName) err = db.Etcd.Put(cfgKey, strconv.Itoa(int(counter))) if err != nil { return pkgerrors.Wrap(err, "Counter DB Entry") @@ -344,15 +386,15 @@ func (c ConfigVersionStore) updateVersion(counter uint) error { } // Increment the version counter -func (c ConfigVersionStore) incrementVersion() (uint, error) { +func (c ConfigVersionStore) incrementVersion(configName string) (uint, error) { - counter, err := c.getCurrentVersion() + counter, err := c.getCurrentVersion(configName) if err != nil { return 0, pkgerrors.Wrap(err, "Get Next Counter Value") } //This is done while Profile lock is taken counter++ - err = c.updateVersion(counter) + err = c.updateVersion(configName, counter) if err != nil { return 0, pkgerrors.Wrap(err, "Store Next Counter Value") } @@ -361,15 +403,15 @@ func (c ConfigVersionStore) incrementVersion() (uint, error) { } // Decrement the version counter -func (c ConfigVersionStore) decrementVersion() error { +func (c ConfigVersionStore) decrementVersion(configName string) error { - counter, err := c.getCurrentVersion() + counter, err := c.getCurrentVersion(configName) if err != nil { return pkgerrors.Wrap(err, "Get Next Counter Value") } //This is done while Profile lock is taken counter-- - err = c.updateVersion(counter) + err = c.updateVersion(configName, counter) if err != nil { return pkgerrors.Wrap(err, "Store Next Counter Value") } @@ -377,45 +419,87 @@ func (c ConfigVersionStore) decrementVersion() error { return nil } +// Get tag version +func (c ConfigVersionStore) getTagVersion(configName, tagNameValue string) (string, error) { + rbName, rbVersion, profileName, _, err := resolveModelFromInstance(c.instanceID) + if err != nil { + return "", pkgerrors.Wrap(err, "Retrieving model info") + } + tagKey := constructKey(rbName, rbVersion, profileName, c.instanceID, tagName, configName, tagNameValue) + + value, err := db.Etcd.Get(tagKey) + if err != nil { + return "", pkgerrors.Wrap(err, "Config DB Entry Not found") + } + return string(value), nil +} + +// Tag current version +func (c ConfigVersionStore) tagCurrentVersion(configName, tagNameValue string) error { + currentVersion, err := c.getCurrentVersion(configName) + if err != nil { + return pkgerrors.Wrap(err, "Get Current Config Version ") + } + rbName, rbVersion, profileName, _, err := resolveModelFromInstance(c.instanceID) + if err != nil { + return pkgerrors.Wrap(err, "Retrieving model info") + } + tagKey := constructKey(rbName, rbVersion, profileName, c.instanceID, tagName, configName, tagNameValue) + + err = db.Etcd.Put(tagKey, strconv.Itoa(int(currentVersion))) + if err != nil { + return pkgerrors.Wrap(err, "TagIt store DB") + } + return nil +} + // Apply Config -func applyConfig(instanceID string, p Config, pChannel chan configResourceList, action string) error { +func applyConfig(instanceID string, p Config, pChannel chan configResourceList, action string, resources []helm.KubernetesResource) ([]helm.KubernetesResource, error) { rbName, rbVersion, profileName, releaseName, err := resolveModelFromInstance(instanceID) if err != nil { - return pkgerrors.Wrap(err, "Retrieving model info") + return []helm.KubernetesResource{}, pkgerrors.Wrap(err, "Retrieving model info") } // Get Template and Resolve the template with values crl, err := resolve(rbName, rbVersion, profileName, p, releaseName) if err != nil { - return pkgerrors.Wrap(err, "Resolve Config") + return []helm.KubernetesResource{}, pkgerrors.Wrap(err, "Resolve Config") } + var updatedResources (chan []helm.KubernetesResource) = make(chan []helm.KubernetesResource) crl.action = action + crl.resources = resources + crl.updatedResources = updatedResources // Send the configResourceList to the channel. Using select for non-blocking channel + log.Printf("Before Sent to goroutine %v", crl.profile) select { case pChannel <- crl: log.Printf("Message Sent to goroutine %v", crl.profile) default: } - return nil + var resultResources []helm.KubernetesResource = <-updatedResources + return resultResources, nil } // Per Profile Go routine to apply the configuration to Cloud Region func scheduleResources(c chan configResourceList) { // Keep thread running + log.Printf("[scheduleResources]: START thread") for { data := <-c //TODO: ADD Check to see if Application running ic := NewInstanceClient() resp, err := ic.Find(data.profile.RBName, data.profile.RBVersion, data.profile.ProfileName, nil) - if err != nil || len(resp) == 0 { + if (err != nil || len(resp) == 0) && data.action != "STOP" { log.Println("Error finding a running instance. Retrying later...") - time.Sleep(time.Second * 10) + data.updatedResources <- []helm.KubernetesResource{} continue } + breakThread := false switch { case data.action == "POST": log.Printf("[scheduleResources]: POST %v %v", data.profile, data.resourceTemplates) + var resources []helm.KubernetesResource for _, inst := range resp { k8sClient := KubernetesClient{} err = k8sClient.Init(inst.Request.CloudRegion, inst.ID) @@ -425,11 +509,11 @@ func scheduleResources(c chan configResourceList) { continue } //assuming - the resource is not exist already - data.createdResources, err = k8sClient.createResources(data.resourceTemplates, inst.Namespace) + resources, err = k8sClient.createResources(data.resourceTemplates, inst.Namespace) errCreate := err if err != nil { // assuming - the err represent the resource is already exist, so going for update - data.createdResources, err = k8sClient.updateResources(data.resourceTemplates, inst.Namespace) + resources, err = k8sClient.updateResources(data.resourceTemplates, inst.Namespace) if err != nil { log.Printf("Error Creating resources: %s", errCreate.Error()) log.Printf("Error Updating resources: %s", err.Error()) @@ -437,12 +521,28 @@ func scheduleResources(c chan configResourceList) { } } } - //TODO: Needs to add code to call Kubectl create + data.updatedResources <- resources case data.action == "PUT": log.Printf("[scheduleResources]: PUT %v %v", data.profile, data.resourceTemplates) - //TODO: Needs to add code to call Kubectl apply + var resources []helm.KubernetesResource + for _, inst := range resp { + k8sClient := KubernetesClient{} + err = k8sClient.Init(inst.Request.CloudRegion, inst.ID) + if err != nil { + log.Printf("Getting CloudRegion Information: %s", err.Error()) + //Move onto the next cloud region + continue + } + + resources, err = k8sClient.updateResources(data.resourceTemplates, inst.Namespace) + if err != nil { + log.Printf("Error Updating resources: %s", err.Error()) + continue + } + } + data.updatedResources <- resources case data.action == "DELETE": - log.Printf("[scheduleResources]: DELETE %v %v", data.profile, data.resourceTemplates) + log.Printf("[scheduleResources]: DELETE %v %v", data.profile, data.resources) for _, inst := range resp { k8sClient := KubernetesClient{} err = k8sClient.Init(inst.Request.CloudRegion, inst.ID) @@ -451,14 +551,22 @@ func scheduleResources(c chan configResourceList) { //Move onto the next cloud region continue } - err = k8sClient.deleteResources(data.createdResources, inst.Namespace) + err = k8sClient.deleteResources(helm.GetReverseK8sResources(data.resources), inst.Namespace) if err != nil { log.Printf("Error Deleting resources: %s", err.Error()) continue } } + data.updatedResources <- []helm.KubernetesResource{} + + case data.action == "STOP": + breakThread = true + } + if breakThread { + break } } + log.Printf("[scheduleResources]: STOP thread") } //Resolve returns the path where the helm chart merged with @@ -523,14 +631,18 @@ var resolve = func(rbName, rbVersion, profileName string, p Config, releaseName finalReleaseName) chartPath := filepath.Join(chartBasePath, t.ChartName) - resTemplates, _, err = helmClient.GenerateKubernetesArtifacts(chartPath, + resTemplates, crdList, _, err := helmClient.GenerateKubernetesArtifacts(chartPath, []string{outputfile.Name()}, nil) if err != nil { return configResourceList{}, pkgerrors.Wrap(err, "Generate final k8s yaml") } + for _, tmp := range resTemplates { + crdList = append(crdList, tmp) + } + crl := configResourceList{ - resourceTemplates: resTemplates, + resourceTemplates: crdList, profile: profile, } @@ -549,6 +661,25 @@ func getProfileData(key string) (*sync.Mutex, chan configResourceList) { if !ok { profileData.resourceChannel[key] = make(chan configResourceList) go scheduleResources(profileData.resourceChannel[key]) + time.Sleep(time.Second * 5) } return profileData.profileLockMap[key], profileData.resourceChannel[key] } + +func removeProfileData(key string) { + profileData.Lock() + defer profileData.Unlock() + _, ok := profileData.profileLockMap[key] + if ok { + delete(profileData.profileLockMap, key) + } + _, ok = profileData.resourceChannel[key] + if ok { + log.Printf("Stop config thread for %s", key) + crl := configResourceList{ + action: "STOP", + } + profileData.resourceChannel[key] <- crl + delete(profileData.resourceChannel, key) + } +} diff --git a/src/k8splugin/internal/app/config_test.go b/src/k8splugin/internal/app/config_test.go index 9ee96881..0cc3c3ce 100644 --- a/src/k8splugin/internal/app/config_test.go +++ b/src/k8splugin/internal/app/config_test.go @@ -293,7 +293,7 @@ func TestRollbackConfig(t *testing.T) { } } testCase.rollbackConfig.AnyOf.ConfigVersion = "2" - err = impl.Rollback(testCase.instanceID, testCase.rollbackConfig) + err = impl.Rollback(testCase.instanceID, testCase.inp.ConfigName, testCase.rollbackConfig) if err != nil { if testCase.expectedError == "" { t.Fatalf("Create returned an unexpected error %s", err) @@ -319,3 +319,6 @@ func TestRollbackConfig(t *testing.T) { }) } } + +func main() { +} diff --git a/src/k8splugin/internal/app/instance.go b/src/k8splugin/internal/app/instance.go index ad36aaa5..e50a59e5 100644 --- a/src/k8splugin/internal/app/instance.go +++ b/src/k8splugin/internal/app/instance.go @@ -30,11 +30,11 @@ import ( appsv1 "k8s.io/api/apps/v1" batchv1 "k8s.io/api/batch/v1" corev1 "k8s.io/api/core/v1" - apiextv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/cli-runtime/pkg/resource" + "github.com/onap/multicloud-k8s/src/k8splugin/internal/config" "github.com/onap/multicloud-k8s/src/k8splugin/internal/db" "github.com/onap/multicloud-k8s/src/k8splugin/internal/helm" "github.com/onap/multicloud-k8s/src/k8splugin/internal/namegenerator" @@ -225,7 +225,7 @@ func (v *InstanceClient) Create(i InstanceRequest) (InstanceResponse, error) { } //Execute the kubernetes create command - sortedTemplates, hookList, releaseName, err := rb.NewProfileClient().Resolve(i.RBName, i.RBVersion, i.ProfileName, overrideValues, i.ReleaseName) + sortedTemplates, crdList, hookList, releaseName, err := rb.NewProfileClient().Resolve(i.RBName, i.RBVersion, i.ProfileName, overrideValues, i.ReleaseName) if err != nil { return InstanceResponse{}, pkgerrors.Wrap(err, "Error resolving helm charts") } @@ -245,6 +245,12 @@ func (v *InstanceClient) Create(i InstanceRequest) (InstanceResponse, error) { log.Printf(" Kind: %s", t.GVK.Kind) } + log.Printf("Crd rss info") + for _, t := range crdList { + log.Printf(" Path: %s", t.FilePath) + log.Printf(" Kind: %s", t.GVK.Kind) + } + log.Printf("Hook info") for _, h := range hookList { log.Printf(" Name: %s", h.Hook.Name) @@ -280,6 +286,15 @@ func (v *InstanceClient) Create(i InstanceRequest) (InstanceResponse, error) { return InstanceResponse{}, pkgerrors.Wrap(err, "Creating Namespace") } + if len(crdList) > 0 { + log.Printf("Pre-Installing CRDs") + _, err = k8sClient.createResources(crdList, profile.Namespace) + + if err != nil { + return InstanceResponse{}, pkgerrors.Wrap(err, "Pre-Installing CRDs") + } + } + hookClient := NewHookClient(profile.Namespace, id, v.storeName, v.tagInst) if len(hookClient.getHookByEvent(hookList, release.HookPreInstall)) != 0 { err = hookClient.ExecHook(k8sClient, hookList, release.HookPreInstall, preInstallTimeOut, 0, &dbData) @@ -308,7 +323,7 @@ func (v *InstanceClient) Create(i InstanceRequest) (InstanceResponse, error) { if err != nil { if len(createdResources) > 0 { log.Printf("[Instance] Reverting created resources on Error: %s", err.Error()) - k8sClient.deleteResources(createdResources, profile.Namespace) + k8sClient.deleteResources(helm.GetReverseK8sResources(createdResources), profile.Namespace) } log.Printf(" Instance: %s, Main rss are failed, skip post-install and remove instance in DB", id) //main rss creation failed -> remove instance in DB @@ -447,7 +462,15 @@ func (v *InstanceClient) Query(id, apiVersion, kind, name, labels string) (Insta return InstanceStatus{}, pkgerrors.Wrap(err, "Unmarshaling Instance Value") } - resources, err := queryClient.Query(resResp.Namespace, resResp.Request.CloudRegion, apiVersion, kind, name, labels, id) + if labels == "" || strings.Contains(strings.ToLower(labels), config.GetConfiguration().KubernetesLabelName) == false { + labelValue := config.GetConfiguration().KubernetesLabelName + "=" + id + if labels != "" { + labels = labels + "," + } + labels = labels + labelValue + } + + resources, err := queryClient.Query(resResp.Namespace, resResp.Request.CloudRegion, apiVersion, kind, name, labels) if err != nil { return InstanceStatus{}, pkgerrors.Wrap(err, "Querying Resources") } @@ -488,6 +511,11 @@ func (v *InstanceClient) Status(id string) (InstanceStatus, error) { if err != nil { return InstanceStatus{}, pkgerrors.Wrap(err, "Getting CloudRegion Information") } + req := resResp.Request + profile, err := rb.NewProfileClient().Get(req.RBName, req.RBVersion, req.ProfileName) + if err != nil { + return InstanceStatus{}, pkgerrors.New("Unable to find Profile instance status") + } cumulatedErrorMsg := make([]string, 0) podsStatus, err := k8sClient.getPodsByLabel(resResp.Namespace) @@ -520,12 +548,36 @@ Main: } } } + generalStatus = append(generalStatus, podsStatus...) + + if profile.ExtraResourceTypes != nil && len(profile.ExtraResourceTypes) > 0 { + queryClient := NewQueryClient() + labelValue := config.GetConfiguration().KubernetesLabelName + "=" + id + for _, extraType := range profile.ExtraResourceTypes { + queryStatus, err := queryClient.Query(resResp.Namespace, resResp.Request.CloudRegion, extraType.GroupVersion().Identifier(), extraType.Kind, "", labelValue) + if err != nil { + return InstanceStatus{}, pkgerrors.Wrap(err, "Querying Resources") + } + for _, rs := range queryStatus.ResourcesStatus { + foundRes := false + for _, res := range generalStatus { + if res.GVK == rs.GVK && res.Name == rs.Name { + foundRes = true + break + } + } + if !foundRes { + generalStatus = append(generalStatus, rs) + } + } + } + } //We still need to iterate through rss list even the status is not DONE, to gather status of rss + pod for the response resp := InstanceStatus{ Request: resResp.Request, - ResourceCount: int32(len(generalStatus) + len(podsStatus)), + ResourceCount: int32(len(generalStatus)), Ready: isReady && resResp.Status == "DONE", - ResourcesStatus: append(generalStatus, podsStatus...), + ResourcesStatus: generalStatus, } if len(cumulatedErrorMsg) != 0 { @@ -561,8 +613,6 @@ func (v *InstanceClient) checkRssStatus(rss helm.KubernetesResource, k8sClient K parsedRes = new(corev1.Service) case "DaemonSet": parsedRes = new(appsv1.DaemonSet) - case "CustomResourceDefinition": - parsedRes = new(apiextv1.CustomResourceDefinition) case "StatefulSet": parsedRes = new(appsv1.StatefulSet) case "ReplicationController": @@ -709,7 +759,8 @@ func (v *InstanceClient) Delete(id string) error { return nil } else if inst.Status != "DONE" { //Recover is ongoing, do nothing here - return nil + //return nil + //TODO: implement recovery } k8sClient := KubernetesClient{} @@ -717,6 +768,7 @@ func (v *InstanceClient) Delete(id string) error { if err != nil { return pkgerrors.Wrap(err, "Getting CloudRegion Information") } + inst.Status = "PRE-DELETE" inst.HookProgress = "" err = db.DBconn.Update(v.storeName, key, v.tagInst, inst) @@ -743,7 +795,14 @@ func (v *InstanceClient) Delete(id string) error { if err != nil { log.Printf("Update Instance DB Entry for release %s has error.", inst.ReleaseName) } - err = k8sClient.deleteResources(inst.Resources, inst.Namespace) + + configClient := NewConfigClient() + err = configClient.Cleanup(id) + if err != nil { + return pkgerrors.Wrap(err, "Cleanup Config Resources") + } + + err = k8sClient.deleteResources(helm.GetReverseK8sResources(inst.Resources), inst.Namespace) if err != nil { return pkgerrors.Wrap(err, "Deleting Instance Resources") } @@ -782,7 +841,7 @@ func (v *InstanceClient) RecoverCreateOrDelete(id string) error { ID: id, } log.Printf(" Resolving template for release %s", instance.Request.ReleaseName) - _, hookList, _, err := rb.NewProfileClient().Resolve(instance.Request.RBName, instance.Request.RBVersion, instance.Request.ProfileName, overrideValues, instance.Request.ReleaseName) + _, _, hookList, _, err := rb.NewProfileClient().Resolve(instance.Request.RBName, instance.Request.RBVersion, instance.Request.ProfileName, overrideValues, instance.Request.ReleaseName) instance.Hooks = hookList err = db.DBconn.Update(v.storeName, key, v.tagInst, instance) if err != nil { @@ -851,7 +910,7 @@ func (v *InstanceClient) RecoverCreateOrDelete(id string) error { return } - err = k8sClient.deleteResources(instance.Resources, instance.Namespace) + err = k8sClient.deleteResources(helm.GetReverseK8sResources(instance.Resources), instance.Namespace) if err != nil { log.Printf(" Error running deleting instance resources, error: %s", err) return diff --git a/src/k8splugin/internal/app/query.go b/src/k8splugin/internal/app/query.go index cb645afd..251b14e6 100644 --- a/src/k8splugin/internal/app/query.go +++ b/src/k8splugin/internal/app/query.go @@ -33,7 +33,7 @@ type QueryStatus struct { // QueryManager is an interface exposes the instantiation functionality type QueryManager interface { - Query(namespace, cloudRegion, apiVersion, kind, name, labels, id string) (QueryStatus, error) + Query(namespace, cloudRegion, apiVersion, kind, name, labels string) (QueryStatus, error) } // QueryClient implements the InstanceManager interface @@ -53,12 +53,12 @@ func NewQueryClient() *QueryClient { } // Query returns state of instance's filtered resources -func (v *QueryClient) Query(namespace, cloudRegion, apiVersion, kind, name, labels, id string) (QueryStatus, error) { +func (v *QueryClient) Query(namespace, cloudRegion, apiVersion, kind, name, labels string) (QueryStatus, error) { //Read the status from the DD k8sClient := KubernetesClient{} - err := k8sClient.Init(cloudRegion, id) + err := k8sClient.Init(cloudRegion, "dummy") //we don't care about instance id in this request if err != nil { return QueryStatus{}, pkgerrors.Wrap(err, "Getting CloudRegion Information") } diff --git a/src/k8splugin/internal/db/etcd.go b/src/k8splugin/internal/db/etcd.go index a435b435..e455cc1a 100644 --- a/src/k8splugin/internal/db/etcd.go +++ b/src/k8splugin/internal/db/etcd.go @@ -39,6 +39,7 @@ type EtcdStore interface { GetAll(key string) ([][]byte, error) Put(key, value string) error Delete(key string) error + DeletePrefix(keyPrefix string) error } // EtcdClient for Etcd @@ -151,3 +152,16 @@ func (e EtcdClient) Delete(key string) error { } return nil } + +// Delete values by prefix from Etcd DB +func (e EtcdClient) DeletePrefix(keyPrefix string) error { + + if e.cli == nil { + return pkgerrors.Errorf("Etcd Client not initialized") + } + _, err := e.cli.Delete(context.Background(), keyPrefix, clientv3.WithPrefix()) + if err != nil { + return pkgerrors.Errorf("Delete prefix failed etcd entry:%s", err.Error()) + } + return nil +} diff --git a/src/k8splugin/internal/db/etcd_testing.go b/src/k8splugin/internal/db/etcd_testing.go index 9dfcad82..4b4dfe3e 100644 --- a/src/k8splugin/internal/db/etcd_testing.go +++ b/src/k8splugin/internal/db/etcd_testing.go @@ -55,3 +55,12 @@ func (c *MockEtcdClient) Delete(key string) error { delete(c.Items, key) return c.Err } + +func (c *MockEtcdClient) DeletePrefix(key string) error { + for kvKey := range c.Items { + if strings.HasPrefix(kvKey, key) { + delete(c.Items, key) + } + } + return c.Err +} diff --git a/src/k8splugin/internal/db/mongo.go b/src/k8splugin/internal/db/mongo.go index c15b108b..aa05820a 100644 --- a/src/k8splugin/internal/db/mongo.go +++ b/src/k8splugin/internal/db/mongo.go @@ -17,9 +17,10 @@ package db import ( - "golang.org/x/net/context" "log" + "golang.org/x/net/context" + "github.com/onap/multicloud-k8s/src/k8splugin/internal/config" pkgerrors "github.com/pkg/errors" @@ -374,7 +375,7 @@ func (m *MongoStore) ReadAll(coll, tag string) (map[string][]byte, error) { //Get objectID of tag document tid, ok := d.Lookup(tag).ObjectIDOK() if !ok { - log.Printf("Did not find tag: %s", tag) + //"Did not find tag: %s", tag) continue } diff --git a/src/k8splugin/internal/helm/helm.go b/src/k8splugin/internal/helm/helm.go index 849674a9..b27c8aee 100644 --- a/src/k8splugin/internal/helm/helm.go +++ b/src/k8splugin/internal/helm/helm.go @@ -19,13 +19,14 @@ package helm import ( "fmt" - "github.com/onap/multicloud-k8s/src/k8splugin/internal/utils" "io/ioutil" "os" "path/filepath" "regexp" "strings" + "github.com/onap/multicloud-k8s/src/k8splugin/internal/utils" + pkgerrors "github.com/pkg/errors" "helm.sh/helm/v3/pkg/action" "helm.sh/helm/v3/pkg/chart/loader" @@ -47,7 +48,7 @@ type Template interface { GenerateKubernetesArtifacts( chartPath string, valueFiles []string, - values []string) ([]KubernetesResourceTemplate, []*Hook, error) + values []string) ([]KubernetesResourceTemplate, []KubernetesResourceTemplate, []*Hook, error) } // TemplateClient implements the Template interface @@ -90,10 +91,11 @@ func (h *TemplateClient) processValues(valueFiles []string, values []string) (ma // GenerateKubernetesArtifacts a mapping of type to fully evaluated helm template func (h *TemplateClient) GenerateKubernetesArtifacts(inputPath string, valueFiles []string, - values []string) ([]KubernetesResourceTemplate, []*Hook, error) { + values []string) ([]KubernetesResourceTemplate, []KubernetesResourceTemplate, []*Hook, error) { var outputDir, chartPath, namespace, releaseName string var retData []KubernetesResourceTemplate + var crdData []KubernetesResourceTemplate var hookList []*Hook releaseName = h.releaseName @@ -102,16 +104,16 @@ func (h *TemplateClient) GenerateKubernetesArtifacts(inputPath string, valueFile // verify chart path exists if _, err := os.Stat(inputPath); err == nil { if chartPath, err = filepath.Abs(inputPath); err != nil { - return retData, hookList, err + return retData, crdData, hookList, err } } else { - return retData, hookList, err + return retData, crdData, hookList, err } //Create a temp directory in the system temp folder outputDir, err := ioutil.TempDir("", "helm-tmpl-") if err != nil { - return retData, hookList, pkgerrors.Wrap(err, "Got error creating temp dir") + return retData, crdData, hookList, pkgerrors.Wrap(err, "Got error creating temp dir") } if namespace == "" { @@ -121,11 +123,11 @@ func (h *TemplateClient) GenerateKubernetesArtifacts(inputPath string, valueFile // get combined values and create config rawVals, err := h.processValues(valueFiles, values) if err != nil { - return retData, hookList, err + return retData, crdData, hookList, err } if msgs := validation.IsDNS1123Label(releaseName); releaseName != "" && len(msgs) > 0 { - return retData, hookList, fmt.Errorf("release name %s is not a valid DNS label: %s", releaseName, strings.Join(msgs, ";")) + return retData, crdData, hookList, fmt.Errorf("release name %s is not a valid DNS label: %s", releaseName, strings.Join(msgs, ";")) } // Initialize the install client @@ -133,27 +135,52 @@ func (h *TemplateClient) GenerateKubernetesArtifacts(inputPath string, valueFile client.DryRun = true client.ClientOnly = true client.ReleaseName = releaseName - client.IncludeCRDs = true + client.IncludeCRDs = false client.DisableHooks = true //to ensure no duplicates in case of defined pre/post install hooks // Check chart dependencies to make sure all are present in /charts chartRequested, err := loader.Load(chartPath) if err != nil { - return retData, hookList, err + return retData, crdData, hookList, err } if chartRequested.Metadata.Type != "" && chartRequested.Metadata.Type != "application" { - return retData, hookList, fmt.Errorf( + return retData, crdData, hookList, fmt.Errorf( "chart %q has an unsupported type and is not installable: %q", chartRequested.Metadata.Name, chartRequested.Metadata.Type, ) } + for _, crd := range chartRequested.CRDObjects() { + if strings.HasPrefix(crd.Name, "_") { + continue + } + filePath := filepath.Join(outputDir, crd.Name) + data := string(crd.File.Data) + // blank template after execution + if h.emptyRegex.MatchString(data) { + continue + } + utils.EnsureDirectory(filePath) + err = ioutil.WriteFile(filePath, []byte(crd.File.Data), 0600) + if err != nil { + return retData, crdData, hookList, err + } + gvk, err := getGroupVersionKind(data) + if err != nil { + return retData, crdData, hookList, err + } + kres := KubernetesResourceTemplate{ + GVK: gvk, + FilePath: filePath, + } + crdData = append(crdData, kres) + } client.Namespace = namespace release, err := client.Run(chartRequested, rawVals) if err != nil { - return retData, hookList, err + return retData, crdData, hookList, err } // SplitManifests returns integer-sortable so that manifests get output // in the same order as the input by `BySplitManifestsOrder`. @@ -161,7 +188,7 @@ func (h *TemplateClient) GenerateKubernetesArtifacts(inputPath string, valueFile // We won't get any meaningful hooks from here _, m, err := releaseutil.SortManifests(rmap, nil, releaseutil.InstallOrder) if err != nil { - return retData, hookList, err + return retData, crdData, hookList, err } for _, k := range m { data := k.Content @@ -180,11 +207,11 @@ func (h *TemplateClient) GenerateKubernetesArtifacts(inputPath string, valueFile utils.EnsureDirectory(mfilePath) err = ioutil.WriteFile(mfilePath, []byte(k.Content), 0600) if err != nil { - return retData, hookList, err + return retData, crdData, hookList, err } gvk, err := getGroupVersionKind(data) if err != nil { - return retData, hookList, err + return retData, crdData, hookList, err } kres := KubernetesResourceTemplate{ GVK: gvk, @@ -197,15 +224,15 @@ func (h *TemplateClient) GenerateKubernetesArtifacts(inputPath string, valueFile utils.EnsureDirectory(hFilePath) err = ioutil.WriteFile(hFilePath, []byte(h.Manifest), 0600) if err != nil { - return retData, hookList, err + return retData, crdData, hookList, err } gvk, err := getGroupVersionKind(h.Manifest) if err != nil { - return retData, hookList, err + return retData, crdData, hookList, err } hookList = append(hookList, &Hook{*h, KubernetesResourceTemplate{gvk, hFilePath}}) } - return retData, hookList, nil + return retData, crdData, hookList, nil } func getGroupVersionKind(data string) (schema.GroupVersionKind, error) { @@ -222,3 +249,13 @@ func getGroupVersionKind(data string) (schema.GroupVersionKind, error) { return *gvk, nil } + +//GetReverseK8sResources reverse list of resources for delete purpose +func GetReverseK8sResources(resources []KubernetesResource) []KubernetesResource { + reversed := []KubernetesResource{} + + for i := len(resources) - 1; i >= 0; i-- { + reversed = append(reversed, resources[i]) + } + return reversed +} diff --git a/src/k8splugin/internal/helm/helm_test.go b/src/k8splugin/internal/helm/helm_test.go index 29d446fa..951ff92b 100644 --- a/src/k8splugin/internal/helm/helm_test.go +++ b/src/k8splugin/internal/helm/helm_test.go @@ -20,11 +20,13 @@ package helm import ( "crypto/sha256" "fmt" - "gopkg.in/yaml.v2" "io/ioutil" "path/filepath" "strings" "testing" + + "gopkg.in/yaml.v2" + "k8s.io/apimachinery/pkg/runtime/schema" ) func TestProcessValues(t *testing.T) { @@ -202,7 +204,7 @@ func TestGenerateKubernetesArtifacts(t *testing.T) { for _, testCase := range testCases { t.Run(testCase.label, func(t *testing.T) { tc := NewTemplateClient("1.12.3", "testnamespace", "testreleasename") - out, hooks, err := tc.GenerateKubernetesArtifacts(testCase.chartPath, testCase.valueFiles, + out, _, hooks, err := tc.GenerateKubernetesArtifacts(testCase.chartPath, testCase.valueFiles, testCase.values) if err != nil { if testCase.expectedError == "" { @@ -264,3 +266,45 @@ func TestGenerateKubernetesArtifacts(t *testing.T) { }) } } + +func TestReverseResources(t *testing.T) { + + t.Run("Successfully reverse resources", func(t *testing.T) { + data := []KubernetesResource{ + { + GVK: schema.GroupVersionKind{ + Group: "apps", + Version: "v1", + Kind: "Deployment"}, + Name: "deployment-1", + }, + { + GVK: schema.GroupVersionKind{ + Group: "apps", + Version: "v1", + Kind: "Deployment"}, + Name: "deployment-2", + }, + { + GVK: schema.GroupVersionKind{ + Group: "", + Version: "v1", + Kind: "Service"}, + Name: "service-1", + }, + { + GVK: schema.GroupVersionKind{ + Group: "", + Version: "v1", + Kind: "Service"}, + Name: "service-2", + }, + } + + reversed := GetReverseK8sResources(data) + + if reversed[0] != data[len(data)-1] { + t.Fatalf("Unexpected k8s resource at position 0 %s", reversed[0]) + } + }) +} diff --git a/src/k8splugin/internal/rb/config_template.go b/src/k8splugin/internal/rb/config_template.go index cf45a6f2..97fe0fb4 100644 --- a/src/k8splugin/internal/rb/config_template.go +++ b/src/k8splugin/internal/rb/config_template.go @@ -20,14 +20,16 @@ import ( "bytes" "encoding/json" "io/ioutil" - "github.com/onap/multicloud-k8s/src/k8splugin/internal/db" "os" "path/filepath" + "github.com/onap/multicloud-k8s/src/k8splugin/internal/db" + "encoding/base64" - pkgerrors "github.com/pkg/errors" "log" + + pkgerrors "github.com/pkg/errors" ) // ConfigTemplate contains the parameters needed for ConfigTemplates @@ -39,8 +41,9 @@ type ConfigTemplate struct { // ConfigTemplateManager is an interface exposes the resource bundle ConfigTemplate functionality type ConfigTemplateManager interface { - Create(rbName, rbVersion string, p ConfigTemplate) error + CreateOrUpdate(rbName, rbVersion string, p ConfigTemplate, update bool) error Get(rbName, rbVersion, templateName string) (ConfigTemplate, error) + List(rbName, rbVersion string) ([]ConfigTemplate, error) Delete(rbName, rbVersion, templateName string) error Upload(rbName, rbVersion, templateName string, inp []byte) error } @@ -76,13 +79,13 @@ type ConfigTemplateClient struct { func NewConfigTemplateClient() *ConfigTemplateClient { return &ConfigTemplateClient{ storeName: "rbdef", - tagMeta: "metadata", - tagContent: "content", + tagMeta: "confdefmetadata", + tagContent: "confdefcontent", } } -// Create an entry for the resource bundle ConfigTemplate in the database -func (v *ConfigTemplateClient) Create(rbName, rbVersion string, p ConfigTemplate) error { +// CreateOrUpdate an entry for the resource bundle ConfigTemplate in the database +func (v *ConfigTemplateClient) CreateOrUpdate(rbName, rbVersion string, p ConfigTemplate, update bool) error { log.Printf("[ConfigiTemplate]: create %s", rbName) // Name is required @@ -92,9 +95,12 @@ func (v *ConfigTemplateClient) Create(rbName, rbVersion string, p ConfigTemplate //Check if ConfigTemplate already exists _, err := v.Get(rbName, rbVersion, p.TemplateName) - if err == nil { + if err == nil && !update { return pkgerrors.New(" ConfigTemplate already exists for this Definition") } + if err != nil && update { + return pkgerrors.New(" ConfigTemplate does not exist for this Definition") + } //Check if provided resource bundle information is valid _, err = NewDefinitionClient().Get(rbName, rbVersion) @@ -108,9 +114,16 @@ func (v *ConfigTemplateClient) Create(rbName, rbVersion string, p ConfigTemplate TemplateName: p.TemplateName, } - err = db.DBconn.Create(v.storeName, key, v.tagMeta, p) - if err != nil { - return pkgerrors.Wrap(err, "Creating ConfigTemplate DB Entry") + if update { + err = db.DBconn.Update(v.storeName, key, v.tagMeta, p) + if err != nil { + return pkgerrors.Wrap(err, "Updating ConfigTemplate DB Entry") + } + } else { + err = db.DBconn.Create(v.storeName, key, v.tagMeta, p) + if err != nil { + return pkgerrors.Wrap(err, "Creating ConfigTemplate DB Entry") + } } return nil @@ -141,6 +154,44 @@ func (v *ConfigTemplateClient) Get(rbName, rbVersion, templateName string) (Conf return ConfigTemplate{}, pkgerrors.New("Error getting ConfigTemplate") } +// List returns the Resource Bundle ConfigTemplate for corresponding ID +func (v *ConfigTemplateClient) List(rbName, rbVersion string) ([]ConfigTemplate, error) { + + //Get all config templates + dbres, err := db.DBconn.ReadAll(v.storeName, v.tagMeta) + if err != nil || len(dbres) == 0 { + return []ConfigTemplate{}, pkgerrors.Wrap(err, "No Config Templates Found") + } + + var results []ConfigTemplate + for key, value := range dbres { + //value is a byte array + if value != nil { + tmp := ConfigTemplate{} + err = db.DBconn.Unmarshal(value, &tmp) + if err != nil { + log.Printf("[ConfigTemplate] Error: %s Unmarshaling value for: %s", err.Error(), key) + continue + } + keyTmp := ConfigTemplateKey{ + RBName: rbName, + RBVersion: rbVersion, + TemplateName: tmp.TemplateName, + } + _, err := db.DBconn.Read(v.storeName, keyTmp, v.tagMeta) + if err == nil && keyTmp.RBName == rbName && keyTmp.RBVersion == rbVersion { + results = append(results, tmp) + } + } + } + + if len(results) == 0 { + return results, pkgerrors.New("No Config Templates Found for Definition and Version") + } + + return results, nil +} + // Delete the Resource Bundle ConfigTemplate from database func (v *ConfigTemplateClient) Delete(rbName, rbVersion, templateName string) error { key := ConfigTemplateKey{ diff --git a/src/k8splugin/internal/rb/definition.go b/src/k8splugin/internal/rb/definition.go index 73ea44da..aa76afaa 100644 --- a/src/k8splugin/internal/rb/definition.go +++ b/src/k8splugin/internal/rb/definition.go @@ -61,6 +61,7 @@ func (dk DefinitionKey) String() string { // DefinitionManager is an interface exposes the resource bundle definition functionality type DefinitionManager interface { Create(def Definition) (Definition, error) + Update(def Definition) (Definition, error) List(name string) ([]Definition, error) Get(name string, version string) (Definition, error) Delete(name string, version string) error @@ -104,13 +105,13 @@ func (v *DefinitionClient) Create(def Definition) (Definition, error) { // Create a default profile automatically prc := NewProfileClient() - pr, err := prc.Create(Profile{ + pr, err := prc.CreateOrUpdate(Profile{ RBName: def.RBName, RBVersion: def.RBVersion, ProfileName: "default", Namespace: "default", ReleaseName: "default", - }) + }, false) if err != nil { logutils.Error("Create Default Profile", logutils.Fields{ @@ -139,6 +140,26 @@ func (v *DefinitionClient) Create(def Definition) (Definition, error) { return def, nil } +// Update an entry for the resource in the database` +func (v *DefinitionClient) Update(def Definition) (Definition, error) { + + //Construct composite key consisting of name and version + key := DefinitionKey{RBName: def.RBName, RBVersion: def.RBVersion} + + //Check if this definition already exists + _, err := v.Get(def.RBName, def.RBVersion) + if err != nil { + return Definition{}, pkgerrors.New("Definition does not exists") + } + + err = db.DBconn.Update(v.storeName, key, v.tagMeta, def) + if err != nil { + return Definition{}, pkgerrors.Wrap(err, "Updating DB Entry") + } + + return def, nil +} + // List all resource entry's versions in the database func (v *DefinitionClient) List(name string) ([]Definition, error) { res, err := db.DBconn.ReadAll(v.storeName, v.tagMeta) diff --git a/src/k8splugin/internal/rb/definition_test.go b/src/k8splugin/internal/rb/definition_test.go index 0140b459..42fb5374 100644 --- a/src/k8splugin/internal/rb/definition_test.go +++ b/src/k8splugin/internal/rb/definition_test.go @@ -18,12 +18,13 @@ package rb import ( "bytes" - "github.com/onap/multicloud-k8s/src/k8splugin/internal/db" "reflect" "sort" "strings" "testing" + "github.com/onap/multicloud-k8s/src/k8splugin/internal/db" + pkgerrors "github.com/pkg/errors" ) diff --git a/src/k8splugin/internal/rb/profile.go b/src/k8splugin/internal/rb/profile.go index 3db6c40f..77398580 100644 --- a/src/k8splugin/internal/rb/profile.go +++ b/src/k8splugin/internal/rb/profile.go @@ -26,6 +26,7 @@ import ( "github.com/onap/multicloud-k8s/src/k8splugin/internal/db" "github.com/onap/multicloud-k8s/src/k8splugin/internal/helm" + "k8s.io/apimachinery/pkg/runtime/schema" pkgerrors "github.com/pkg/errors" ) @@ -33,18 +34,19 @@ import ( // Profile contains the parameters needed for resource bundle (rb) profiles // It implements the interface for managing the profiles type Profile struct { - RBName string `json:"rb-name"` - RBVersion string `json:"rb-version"` - ProfileName string `json:"profile-name"` - ReleaseName string `json:"release-name"` - Namespace string `json:"namespace"` - KubernetesVersion string `json:"kubernetes-version"` - Labels map[string]string `json:"labels"` + RBName string `json:"rb-name"` + RBVersion string `json:"rb-version"` + ProfileName string `json:"profile-name"` + ReleaseName string `json:"release-name"` + Namespace string `json:"namespace"` + KubernetesVersion string `json:"kubernetes-version"` + Labels map[string]string `json:"labels"` + ExtraResourceTypes []schema.GroupVersionKind `json:"extra-resource-types"` } // ProfileManager is an interface exposes the resource bundle profile functionality type ProfileManager interface { - Create(def Profile) (Profile, error) + CreateOrUpdate(def Profile, update bool) (Profile, error) Get(rbName, rbVersion, prName string) (Profile, error) List(rbName, rbVersion string) ([]Profile, error) Delete(rbName, rbVersion, prName string) error @@ -87,8 +89,8 @@ func NewProfileClient() *ProfileClient { } } -// Create an entry for the resource bundle profile in the database -func (v *ProfileClient) Create(p Profile) (Profile, error) { +// CreateOrUpdate an entry for the resource bundle profile in the database +func (v *ProfileClient) CreateOrUpdate(p Profile, update bool) (Profile, error) { // Name is required if p.ProfileName == "" { @@ -97,10 +99,12 @@ func (v *ProfileClient) Create(p Profile) (Profile, error) { //Check if profile already exists _, err := v.Get(p.RBName, p.RBVersion, p.ProfileName) - if err == nil { + if err == nil && !update { return Profile{}, pkgerrors.New("Profile already exists for this Definition") } - + if err != nil && update { + return Profile{}, pkgerrors.New("Profile does not exists for this Definition") + } //Check if provided resource bundle information is valid _, err = NewDefinitionClient().Get(p.RBName, p.RBVersion) if err != nil { @@ -118,9 +122,16 @@ func (v *ProfileClient) Create(p Profile) (Profile, error) { ProfileName: p.ProfileName, } - err = db.DBconn.Create(v.storeName, key, v.tagMeta, p) - if err != nil { - return Profile{}, pkgerrors.Wrap(err, "Creating Profile DB Entry") + if update { + err = db.DBconn.Update(v.storeName, key, v.tagMeta, p) + if err != nil { + return Profile{}, pkgerrors.Wrap(err, "Updating Profile DB Entry") + } + } else { + err = db.DBconn.Create(v.storeName, key, v.tagMeta, p) + if err != nil { + return Profile{}, pkgerrors.Wrap(err, "Creating Profile DB Entry") + } } return p, nil @@ -271,9 +282,10 @@ func (v *ProfileClient) Download(rbName, rbVersion, prName string) ([]byte, erro //Resolve returns the path where the helm chart merged with //configuration overrides resides and final ReleaseName picked for instantiation func (v *ProfileClient) Resolve(rbName string, rbVersion string, - profileName string, values []string, overrideReleaseName string) ([]helm.KubernetesResourceTemplate, []*helm.Hook, string, error) { + profileName string, values []string, overrideReleaseName string) ([]helm.KubernetesResourceTemplate, []helm.KubernetesResourceTemplate, []*helm.Hook, string, error) { var sortedTemplates []helm.KubernetesResourceTemplate + var crdList []helm.KubernetesResourceTemplate var hookList []*helm.Hook var finalReleaseName string @@ -281,40 +293,40 @@ func (v *ProfileClient) Resolve(rbName string, rbVersion string, //If everything seems okay, then download the definition prData, err := v.Download(rbName, rbVersion, profileName) if err != nil { - return sortedTemplates, hookList, finalReleaseName, pkgerrors.Wrap(err, "Downloading Profile") + return sortedTemplates, crdList, hookList, finalReleaseName, pkgerrors.Wrap(err, "Downloading Profile") } prPath, err := ExtractTarBall(bytes.NewBuffer(prData)) if err != nil { - return sortedTemplates, hookList, finalReleaseName, pkgerrors.Wrap(err, "Extracting Profile Content") + return sortedTemplates, crdList, hookList, finalReleaseName, pkgerrors.Wrap(err, "Extracting Profile Content") } prYamlClient, err := ProcessProfileYaml(prPath, v.manifestName) if err != nil { - return sortedTemplates, hookList, finalReleaseName, pkgerrors.Wrap(err, "Processing Profile Manifest") + return sortedTemplates, crdList, hookList, finalReleaseName, pkgerrors.Wrap(err, "Processing Profile Manifest") } definitionClient := NewDefinitionClient() definition, err := definitionClient.Get(rbName, rbVersion) if err != nil { - return sortedTemplates, hookList, finalReleaseName, pkgerrors.Wrap(err, "Getting Definition Metadata") + return sortedTemplates, crdList, hookList, finalReleaseName, pkgerrors.Wrap(err, "Getting Definition Metadata") } defData, err := definitionClient.Download(rbName, rbVersion) if err != nil { - return sortedTemplates, hookList, finalReleaseName, pkgerrors.Wrap(err, "Downloading Definition") + return sortedTemplates, crdList, hookList, finalReleaseName, pkgerrors.Wrap(err, "Downloading Definition") } chartBasePath, err := ExtractTarBall(bytes.NewBuffer(defData)) if err != nil { - return sortedTemplates, hookList, finalReleaseName, pkgerrors.Wrap(err, "Extracting Definition Charts") + return sortedTemplates, crdList, hookList, finalReleaseName, pkgerrors.Wrap(err, "Extracting Definition Charts") } //Get the definition ID and download its contents profile, err := v.Get(rbName, rbVersion, profileName) if err != nil { - return sortedTemplates, hookList, finalReleaseName, pkgerrors.Wrap(err, "Getting Profile") + return sortedTemplates, crdList, hookList, finalReleaseName, pkgerrors.Wrap(err, "Getting Profile") } //Copy the profile configresources to the chart locations @@ -324,7 +336,7 @@ func (v *ProfileClient) Resolve(rbName string, rbVersion string, // chartpath: chart/config/resources/config.yaml err = prYamlClient.CopyConfigurationOverrides(chartBasePath) if err != nil { - return sortedTemplates, hookList, finalReleaseName, pkgerrors.Wrap(err, "Copying configresources to chart") + return sortedTemplates, crdList, hookList, finalReleaseName, pkgerrors.Wrap(err, "Copying configresources to chart") } if overrideReleaseName == "" { @@ -338,14 +350,14 @@ func (v *ProfileClient) Resolve(rbName string, rbVersion string, finalReleaseName) chartPath := filepath.Join(chartBasePath, definition.ChartName) - sortedTemplates, hookList, err = helmClient.GenerateKubernetesArtifacts(chartPath, + sortedTemplates, crdList, hookList, err = helmClient.GenerateKubernetesArtifacts(chartPath, []string{prYamlClient.GetValues()}, values) if err != nil { - return sortedTemplates, hookList, finalReleaseName, pkgerrors.Wrap(err, "Generate final k8s yaml") + return sortedTemplates, crdList, hookList, finalReleaseName, pkgerrors.Wrap(err, "Generate final k8s yaml") } - return sortedTemplates, hookList, finalReleaseName, nil + return sortedTemplates, crdList, hookList, finalReleaseName, nil } // Returns an empty profile with the following contents diff --git a/src/k8splugin/internal/rb/profile_test.go b/src/k8splugin/internal/rb/profile_test.go index 3c40c2c9..e52897ce 100644 --- a/src/k8splugin/internal/rb/profile_test.go +++ b/src/k8splugin/internal/rb/profile_test.go @@ -105,7 +105,7 @@ func TestCreateProfile(t *testing.T) { t.Run(testCase.label, func(t *testing.T) { db.DBconn = testCase.mockdb impl := NewProfileClient() - got, err := impl.Create(testCase.inp) + got, err := impl.CreateOrUpdate(testCase.inp, false) if err != nil { if testCase.expectedError == "" { t.Fatalf("Create returned an unexpected error %s", err) @@ -773,7 +773,7 @@ func TestResolveProfile(t *testing.T) { t.Run(testCase.label, func(t *testing.T) { db.DBconn = testCase.mockdb impl := NewProfileClient() - data, _, releaseName, err := impl.Resolve(testCase.rbname, + data, _, _, releaseName, err := impl.Resolve(testCase.rbname, testCase.rbversion, testCase.prname, []string{}, testCase.releaseName) defer cleanup(data) if err != nil { diff --git a/src/k8splugin/plugins/generic/plugin.go b/src/k8splugin/plugins/generic/plugin.go index f71c436c..a210f6d6 100644 --- a/src/k8splugin/plugins/generic/plugin.go +++ b/src/k8splugin/plugins/generic/plugin.go @@ -22,6 +22,7 @@ import ( appsv1 "k8s.io/api/apps/v1" "k8s.io/client-go/kubernetes" + //appsv1beta1 "k8s.io/api/apps/v1beta1" //appsv1beta2 "k8s.io/api/apps/v1beta2" batchv1 "k8s.io/api/batch/v1" @@ -30,6 +31,7 @@ import ( "k8s.io/apimachinery/pkg/util/intstr" pkgerrors "github.com/pkg/errors" + "github.com/prometheus/common/log" "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" @@ -304,7 +306,18 @@ func (g genericPlugin) Create(yamlFilePath string, namespace string, client plug if err != nil { return "", pkgerrors.Wrap(err, "Mapping kind to resource error") } - + if gvk.Kind == "CustomResourceDefinition" { + //according the helm spec, CRD is created only once, and we raise only warn if we try to do it once more + resource := helm.KubernetesResource{} + resource.GVK = gvk + resource.Name = unstruct.GetName() + name, err := g.Get(resource, namespace, client) + if err == nil && name == resource.Name { + //CRD update is not supported according to Helm spec + log.Warn(fmt.Sprintf("CRD %s create will be skipped. It already exists", name)) + return name, nil + } + } //Add the tracking label to all resources created here labels := unstruct.GetLabels() //Check if labels exist for this object @@ -340,58 +353,70 @@ func (g genericPlugin) Create(yamlFilePath string, namespace string, client plug // Update deployment object in a specific Kubernetes cluster func (g genericPlugin) Update(yamlFilePath string, namespace string, client plugin.KubernetesConnector) (string, error) { - if namespace == "" { - namespace = "default" - } - - //Decode the yaml file to create a runtime.Object - unstruct := &unstructured.Unstructured{} - //Ignore the returned obj as we expect the data in unstruct - _, err := utils.DecodeYAML(yamlFilePath, unstruct) - if err != nil { - return "", pkgerrors.Wrap(err, "Decode deployment object error") - } - - dynClient := client.GetDynamicClient() - mapper := client.GetMapper() - - gvk := unstruct.GroupVersionKind() - mapping, err := mapper.RESTMapping(schema.GroupKind{Group: gvk.Group, Kind: gvk.Kind}, gvk.Version) - if err != nil { - return "", pkgerrors.Wrap(err, "Mapping kind to resource error") - } - - //Add the tracking label to all resources created here - labels := unstruct.GetLabels() - //Check if labels exist for this object - if labels == nil { - labels = map[string]string{} - } - labels[config.GetConfiguration().KubernetesLabelName] = client.GetInstanceID() - unstruct.SetLabels(labels) - - // This checks if the resource we are creating has a podSpec in it - // Eg: Deployment, StatefulSet, Job etc.. - // If a PodSpec is found, the label will be added to it too. - plugin.TagPodsIfPresent(unstruct, client.GetInstanceID()) - - gvr := mapping.Resource - var updatedObj *unstructured.Unstructured - - switch mapping.Scope.Name() { - case meta.RESTScopeNameNamespace: - updatedObj, err = dynClient.Resource(gvr).Namespace(namespace).Update(context.TODO(), unstruct, metav1.UpdateOptions{}) - case meta.RESTScopeNameRoot: - updatedObj, err = dynClient.Resource(gvr).Update(context.TODO(), unstruct, metav1.UpdateOptions{}) - default: - return "", pkgerrors.New("Got an unknown RESTSCopeName for mapping: " + gvk.String()) - } - - if err != nil { - return "", pkgerrors.Wrap(err, "Update object error") - } - - return updatedObj.GetName(), nil + if namespace == "" { + namespace = "default" + } + + //Decode the yaml file to create a runtime.Object + unstruct := &unstructured.Unstructured{} + //Ignore the returned obj as we expect the data in unstruct + _, err := utils.DecodeYAML(yamlFilePath, unstruct) + if err != nil { + return "", pkgerrors.Wrap(err, "Decode deployment object error") + } + + dynClient := client.GetDynamicClient() + mapper := client.GetMapper() + + gvk := unstruct.GroupVersionKind() + mapping, err := mapper.RESTMapping(schema.GroupKind{Group: gvk.Group, Kind: gvk.Kind}, gvk.Version) + if err != nil { + return "", pkgerrors.Wrap(err, "Mapping kind to resource error") + } + + if gvk.Kind == "CustomResourceDefinition" { + resource := helm.KubernetesResource{} + resource.GVK = gvk + resource.Name = unstruct.GetName() + name, err := g.Get(resource, namespace, client) + if err == nil && name == resource.Name { + //CRD update is not supported according to Helm spec + log.Warn(fmt.Sprintf("CRD %s update will be skipped", name)) + return name, nil + } + } + + //Add the tracking label to all resources created here + labels := unstruct.GetLabels() + //Check if labels exist for this object + if labels == nil { + labels = map[string]string{} + } + labels[config.GetConfiguration().KubernetesLabelName] = client.GetInstanceID() + unstruct.SetLabels(labels) + + // This checks if the resource we are creating has a podSpec in it + // Eg: Deployment, StatefulSet, Job etc.. + // If a PodSpec is found, the label will be added to it too. + plugin.TagPodsIfPresent(unstruct, client.GetInstanceID()) + + gvr := mapping.Resource + var updatedObj *unstructured.Unstructured + + switch mapping.Scope.Name() { + case meta.RESTScopeNameNamespace: + updatedObj, err = dynClient.Resource(gvr).Namespace(namespace).Update(context.TODO(), unstruct, metav1.UpdateOptions{}) + case meta.RESTScopeNameRoot: + updatedObj, err = dynClient.Resource(gvr).Update(context.TODO(), unstruct, metav1.UpdateOptions{}) + default: + return "", pkgerrors.New("Got an unknown RESTSCopeName for mapping: " + gvk.String()) + } + + if err != nil { + return "", pkgerrors.Wrap(err, "Update object error") + } + + return updatedObj.GetName(), nil } // Get an existing resource hosted in a specific Kubernetes cluster @@ -425,7 +450,7 @@ func (g genericPlugin) Get(resource helm.KubernetesResource, } if err != nil { - return "", pkgerrors.Wrap(err, "Delete object error") + return "", pkgerrors.Wrap(err, "Get object error") } return unstruct.GetName(), nil @@ -462,6 +487,11 @@ func (g genericPlugin) Delete(resource helm.KubernetesResource, namespace string opts := metav1.DeleteOptions{ PropagationPolicy: &deletePolicy, } + if resource.GVK.Kind == "CustomResourceDefinition" { + //CRD deletion is not supported according to Helm spec + log.Warn(fmt.Sprintf("CRD %s deletion will be skipped", resource.Name)) + return nil + } switch mapping.Scope.Name() { case meta.RESTScopeNameNamespace: diff --git a/src/k8splugin/plugins/namespace/plugin.go b/src/k8splugin/plugins/namespace/plugin.go index 8732442e..6c6d1f6c 100644 --- a/src/k8splugin/plugins/namespace/plugin.go +++ b/src/k8splugin/plugins/namespace/plugin.go @@ -21,10 +21,10 @@ import ( pkgerrors "github.com/pkg/errors" coreV1 "k8s.io/api/core/v1" - metaV1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/api/meta" + metaV1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/client-go/kubernetes" "k8s.io/client-go/rest" @@ -60,7 +60,12 @@ func (p namespacePlugin) Create(yamlFilePath string, namespace string, client pl Name: namespace, }, } - _, err := client.GetStandardClient().CoreV1().Namespaces().Create(context.TODO(), namespaceObj, metaV1.CreateOptions{}) + existingNs, err := client.GetStandardClient().CoreV1().Namespaces().Get(context.TODO(), namespace, metaV1.GetOptions{}) + if err == nil && len(existingNs.ManagedFields) > 0 && existingNs.ManagedFields[0].Manager == "k8plugin" { + log.Printf("Namespace (%s) already ensured by plugin. Skip", namespace) + return namespace, nil + } + _, err = client.GetStandardClient().CoreV1().Namespaces().Create(context.TODO(), namespaceObj, metaV1.CreateOptions{}) if err != nil { return "", pkgerrors.Wrap(err, "Create Namespace error") } @@ -128,5 +133,5 @@ func (p namespacePlugin) List(gvk schema.GroupVersionKind, namespace string, cli func (p namespacePlugin) Update(yamlFilePath string, namespace string, client plugin.KubernetesConnector) (string, error) { - return "", nil + return namespace, nil } diff --git a/src/k8splugin/plugins/service/plugin.go b/src/k8splugin/plugins/service/plugin.go index aa5c685c..52dd4591 100644 --- a/src/k8splugin/plugins/service/plugin.go +++ b/src/k8splugin/plugins/service/plugin.go @@ -21,10 +21,10 @@ import ( pkgerrors "github.com/pkg/errors" coreV1 "k8s.io/api/core/v1" - metaV1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/api/meta" + metaV1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/client-go/kubernetes" "k8s.io/client-go/rest" @@ -156,8 +156,43 @@ func (p servicePlugin) Get(resource helm.KubernetesResource, namespace string, c return service.Name, nil } +// Update a service object in a specific Kubernetes cluster func (p servicePlugin) Update(yamlFilePath string, namespace string, client plugin.KubernetesConnector) (string, error) { + if namespace == "" { + namespace = "default" + } - return "", nil + obj, err := utils.DecodeYAML(yamlFilePath, nil) + if err != nil { + return "", pkgerrors.Wrap(err, "Decode service object error") + } + service, ok := obj.(*coreV1.Service) + if !ok { + return "", pkgerrors.New("Decoded object contains another resource different than Service") + } + service.Namespace = namespace + + existingService, err := client.GetStandardClient().CoreV1().Services(namespace).Get(context.TODO(), service.Name, metaV1.GetOptions{}) + if err == nil { + service.ResourceVersion = existingService.ResourceVersion + service.Spec.ClusterIP = existingService.Spec.ClusterIP + } else { + return p.Create(yamlFilePath, namespace, client) + } + labels := service.GetLabels() + //Check if labels exist for this object + if labels == nil { + labels = map[string]string{} + } + labels[config.GetConfiguration().KubernetesLabelName] = client.GetInstanceID() + service.SetLabels(labels) + + _, err = client.GetStandardClient().CoreV1().Services(namespace).Update(context.TODO(), service, metaV1.UpdateOptions{}) + + if err != nil { + return "", pkgerrors.Wrap(err, "Update object error") + } + + return service.Name, nil } |