From 120019529489b5cbcf82d77eec228283fb12d43a Mon Sep 17 00:00:00 2001 From: Konrad Bańka Date: Fri, 12 Mar 2021 08:46:20 +0100 Subject: Fix Healthcheck API MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Fix several issues related to Healthcheck creation. Updated GET/DELETE methods to work properly. This commit leaves few FIXME/TODOs that will be handled within Helm3 Rebase commit Issue-ID: MULTICLOUD-1308 Signed-off-by: Konrad Bańka Change-Id: I5da50363bb240fdc85d3624f43cb0526786da542 --- src/k8splugin/api/instancehchandler.go | 67 ++++++- src/k8splugin/go.mod | 1 + src/k8splugin/internal/healthcheck/healthcheck.go | 214 ++++++++++++++++++---- src/k8splugin/internal/healthcheck/kubeclient.go | 4 +- src/k8splugin/internal/healthcheck/stream.go | 75 ++++++++ src/k8splugin/internal/helm/helm.go | 16 +- 6 files changed, 330 insertions(+), 47 deletions(-) create mode 100644 src/k8splugin/internal/healthcheck/stream.go diff --git a/src/k8splugin/api/instancehchandler.go b/src/k8splugin/api/instancehchandler.go index fc1c3be4..90715178 100644 --- a/src/k8splugin/api/instancehchandler.go +++ b/src/k8splugin/api/instancehchandler.go @@ -46,7 +46,7 @@ func (ih instanceHCHandler) createHandler(w http.ResponseWriter, r *http.Request } w.Header().Set("Content-Type", "application/json") - w.WriteHeader(http.StatusOK) + w.WriteHeader(http.StatusCreated) err = json.NewEncoder(w).Encode(resp) if err != nil { log.Error("Error Marshaling Reponse", log.Fields{ @@ -59,10 +59,75 @@ func (ih instanceHCHandler) createHandler(w http.ResponseWriter, r *http.Request } func (ih instanceHCHandler) getHandler(w http.ResponseWriter, r *http.Request) { + vars := mux.Vars(r) + instanceId := vars["instID"] + healthcheckId := vars["hcID"] + + resp, err := ih.client.Get(instanceId, healthcheckId) + if err != nil { + log.Error("Error getting Instance's healthcheck", log.Fields{ + "error": err, + "instanceID": instanceId, + "healthcheckID": healthcheckId, + }) + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(http.StatusOK) + err = json.NewEncoder(w).Encode(resp) + if err != nil { + log.Error("Error Marshaling Response", log.Fields{ + "error": err, + "response": resp, + }) + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } } func (ih instanceHCHandler) deleteHandler(w http.ResponseWriter, r *http.Request) { + vars := mux.Vars(r) + instanceId := vars["instID"] + healthcheckId := vars["hcID"] + + err := ih.client.Delete(instanceId, healthcheckId) + if err != nil { + log.Error("Error deleting Instance's healthcheck", log.Fields{ + "error": err, + "instanceID": instanceId, + "healthcheckID": healthcheckId, + }) + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + w.WriteHeader(http.StatusAccepted) } func (ih instanceHCHandler) listHandler(w http.ResponseWriter, r *http.Request) { + vars := mux.Vars(r) + id := vars["instID"] + + resp, err := ih.client.List(id) + if err != nil { + log.Error("Error getting instance healthcheck overview", log.Fields{ + "error": err, + "instance-id": id, + }) + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(http.StatusOK) + err = json.NewEncoder(w).Encode(resp) + if err != nil { + log.Error("Error Marshaling Response", log.Fields{ + "error": err, + "response": resp, + }) + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } } diff --git a/src/k8splugin/go.mod b/src/k8splugin/go.mod index 5371bb50..dfb3259a 100644 --- a/src/k8splugin/go.mod +++ b/src/k8splugin/go.mod @@ -81,6 +81,7 @@ require ( golang.org/x/oauth2 v0.0.0-20190604053449-0f29369cfe45 // indirect golang.org/x/time v0.0.0-20181108054448-85acf8d2951c // indirect google.golang.org/appengine v1.5.0 // indirect + google.golang.org/grpc v1.19.0 gopkg.in/gorp.v1 v1.7.2 // indirect gopkg.in/inf.v0 v0.9.1 // indirect gopkg.in/square/go-jose.v2 v2.2.2 // indirect diff --git a/src/k8splugin/internal/healthcheck/healthcheck.go b/src/k8splugin/internal/healthcheck/healthcheck.go index 341b1dba..cf1d39bd 100644 --- a/src/k8splugin/internal/healthcheck/healthcheck.go +++ b/src/k8splugin/internal/healthcheck/healthcheck.go @@ -15,6 +15,8 @@ package healthcheck import ( "encoding/json" + "strings" + "time" protorelease "k8s.io/helm/pkg/proto/hapi/release" "k8s.io/helm/pkg/releasetesting" @@ -27,22 +29,18 @@ import ( pkgerrors "github.com/pkg/errors" ) -// HealthcheckState holds possible states of Healthcheck instance -type HealthcheckState string - -const ( - HcS_UNKNOWN HealthcheckState = "UNKNOWN" - HcS_STARTED HealthcheckState = "STARTED" - HcS_RUNNING HealthcheckState = "RUNNING" - HcS_SUCCESS HealthcheckState = "SUCCESS" - HcS_FAILURE HealthcheckState = "FAILURE" +var ( + HcS_UNKNOWN string = protorelease.TestRun_Status_name[int32(protorelease.TestRun_UNKNOWN)] + HcS_RUNNING string = protorelease.TestRun_Status_name[int32(protorelease.TestRun_RUNNING)] + HcS_SUCCESS string = protorelease.TestRun_Status_name[int32(protorelease.TestRun_SUCCESS)] + HcS_FAILURE string = protorelease.TestRun_Status_name[int32(protorelease.TestRun_FAILURE)] ) // InstanceHCManager interface exposes instance Healthcheck fuctionalities type InstanceHCManager interface { - Create(instanceId string) (InstanceHCStatus, error) + Create(instanceId string) (InstanceMiniHCStatus, error) Get(instanceId, healthcheckId string) (InstanceHCStatus, error) - List(instanceId string) ([]InstanceHCStatus, error) + List(instanceId string) (InstanceHCOverview, error) Delete(instanceId, healthcheckId string) error } @@ -71,9 +69,24 @@ type InstanceHCClient struct { // InstanceHCStatus holds healthcheck status type InstanceHCStatus struct { - releasetesting.TestSuite - Id string - Status HealthcheckState + InstanceId string `json:"instance-id"` + HealthcheckId string `json:"healthcheck-id"` + Status string `json:"status"` + Info string `json:"info"` + releasetesting.TestSuite `json:"test-suite"` +} + +// InstanceMiniHCStatus holds only healthcheck summary +type InstanceMiniHCStatus struct { + HealthcheckId string `json:"healthcheck-id"` + Status string `json:"status"` +} + +// InstanceHCOverview holds Healthcheck-related data +type InstanceHCOverview struct { + InstanceId string `json:"instance-id"` + HCSummary []InstanceMiniHCStatus `json:"healthcheck-summary"` + Hooks []*protorelease.Hook `json:"hooks"` } func NewHCClient() *InstanceHCClient { @@ -83,7 +96,11 @@ func NewHCClient() *InstanceHCClient { } } -func (ihc InstanceHCClient) Create(instanceId string) (InstanceHCStatus, error) { +func instanceMiniHCStatusFromStatus(ihcs InstanceHCStatus) InstanceMiniHCStatus { + return InstanceMiniHCStatus{ihcs.HealthcheckId, ihcs.Status} +} + +func (ihc InstanceHCClient) Create(instanceId string) (InstanceMiniHCStatus, error) { //Generate ID id := namegenerator.Generate() @@ -91,67 +108,190 @@ func (ihc InstanceHCClient) Create(instanceId string) (InstanceHCStatus, error) v := app.NewInstanceClient() instance, err := v.Get(instanceId) if err != nil { - return InstanceHCStatus{}, pkgerrors.Wrap(err, "Getting instance") + return InstanceMiniHCStatus{}, pkgerrors.Wrap(err, "Getting instance") } //Prepare Environment, Request and Release structs //TODO In future could derive params from request client, err := NewKubeClient(instanceId, instance.Request.CloudRegion) if err != nil { - return InstanceHCStatus{}, pkgerrors.Wrap(err, "Preparing KubeClient") + return InstanceMiniHCStatus{}, pkgerrors.Wrap(err, "Preparing KubeClient") + } + key := HealthcheckKey{ + InstanceId: instanceId, + HealthcheckId: id, } env := &releasetesting.Environment{ Namespace: instance.Namespace, KubeClient: client, Parallel: false, + Stream: NewStream(key, ihc.storeName, ihc.tagInst), } release := protorelease.Release{ Name: instance.ReleaseName, Hooks: instance.Hooks, } - //Run HC + //Define HC testSuite, err := releasetesting.NewTestSuite(&release) if err != nil { log.Error("Error creating TestSuite", log.Fields{ "Release": release, }) - return InstanceHCStatus{}, pkgerrors.Wrap(err, "Creating TestSuite") - } - if err = testSuite.Run(env); err != nil { - log.Error("Error running TestSuite", log.Fields{ - "TestSuite": testSuite, - "Environment": env, - }) - return InstanceHCStatus{}, pkgerrors.Wrap(err, "Running TestSuite") + return InstanceMiniHCStatus{}, pkgerrors.Wrap(err, "Creating TestSuite") } //Save state ihcs := InstanceHCStatus{ - TestSuite: *testSuite, - Id: id, - Status: HcS_STARTED, - } - key := HealthcheckKey{ - InstanceId: instance.ID, + TestSuite: *testSuite, HealthcheckId: id, + InstanceId: instanceId, + Status: HcS_RUNNING, } err = db.DBconn.Create(ihc.storeName, key, ihc.tagInst, ihcs) if err != nil { - return ihcs, pkgerrors.Wrap(err, "Creating Instance DB Entry") + return instanceMiniHCStatusFromStatus(ihcs), + pkgerrors.Wrap(err, "Creating Instance DB Entry") } - return ihcs, nil + // Run HC async + // If testSuite doesn't fail immediately, we let it continue in background + errC := make(chan error, 1) + timeout := make(chan bool, 1) + // Stream handles updates of testSuite run so we don't need to care + var RunAsync func() = func() { + err := ihcs.TestSuite.Run(env) + if err != nil { + log.Error("Error running TestSuite", log.Fields{ + "TestSuite": ihcs.TestSuite, + "Environment": env, + "Error": err.Error(), + }) + ihcs.Status = HcS_FAILURE + ihcs.Info = ihcs.Info + "\n" + err.Error() + } + // Send to channel before db update as it can be slow + errC <- err + // Download latest Status/Info + resp, err := ihc.Get(ihcs.InstanceId, ihcs.HealthcheckId) + if err != nil { + log.Error("Error querying Healthcheck status", log.Fields{"error": err}) + return + } + ihcs.Status = resp.Status + ihcs.Info = resp.Info + // Update DB + err = db.DBconn.Update(ihc.storeName, key, ihc.tagInst, ihcs) + if err != nil { + log.Error("Error saving Testsuite failure in DB", log.Fields{ + "InstanceHCStatus": ihcs, + "Error": err, + }) + } + } + go func() { + //Time is hardcoded but the only impact is that typically http response is sent after 2s + //Could be considered shorter in future + time.Sleep(2 * time.Second) + timeout <- true + }() + go RunAsync() + select { + case err := <-errC: + if err == nil { + return instanceMiniHCStatusFromStatus(ihcs), nil + } else { + return instanceMiniHCStatusFromStatus(ihcs), err + } + case <-timeout: + return instanceMiniHCStatusFromStatus(ihcs), nil + } } func (ihc InstanceHCClient) Get(instanceId, healthcheckId string) (InstanceHCStatus, error) { - return InstanceHCStatus{}, nil + key := HealthcheckKey{ + InstanceId: instanceId, + HealthcheckId: healthcheckId, + } + DBResp, err := db.DBconn.Read(ihc.storeName, key, ihc.tagInst) + if err != nil || DBResp == nil { + return InstanceHCStatus{}, pkgerrors.Wrap(err, "Error retrieving Healthcheck data") + } + + resp := InstanceHCStatus{} + err = db.DBconn.Unmarshal(DBResp, &resp) + if err != nil { + return InstanceHCStatus{}, pkgerrors.Wrap(err, "Unmarshaling Instance Value") + } + return resp, nil } func (ihc InstanceHCClient) Delete(instanceId, healthcheckId string) error { + v := app.NewInstanceClient() + instance, err := v.Get(instanceId) + if err != nil { + return pkgerrors.Wrap(err, "Getting instance") + } + client, err := NewKubeClient(instanceId, instance.Request.CloudRegion) + if err != nil { + return pkgerrors.Wrap(err, "Preparing KubeClient") + } + env := &releasetesting.Environment{ + Namespace: instance.Namespace, + KubeClient: client, + } + ihcs, err := ihc.Get(instanceId, healthcheckId) + if err != nil { + return pkgerrors.Wrap(err, "Error querying Healthcheck status") + } + env.DeleteTestPods(ihcs.TestSuite.TestManifests) + key := HealthcheckKey{instanceId, healthcheckId} + err = db.DBconn.Delete(ihc.storeName, key, ihc.tagInst) + if err != nil { + return pkgerrors.Wrap(err, "Removing Healthcheck in DB") + } return nil } -func (ihc InstanceHCClient) List(instanceId string) ([]InstanceHCStatus, error) { - return make([]InstanceHCStatus, 0), nil +func (ihc InstanceHCClient) List(instanceId string) (InstanceHCOverview, error) { + ihco := InstanceHCOverview{ + InstanceId: instanceId, + } + + // Retrieve info about available hooks + v := app.NewInstanceClient() + instance, err := v.Get(instanceId) + if err != nil { + return ihco, pkgerrors.Wrap(err, "Getting Instance data") + } + ihco.Hooks = instance.Hooks + + // Retrieve info about running/completed healthchecks + dbResp, err := db.DBconn.ReadAll(ihc.storeName, ihc.tagInst) + if err != nil { + if !strings.Contains(err.Error(), "Did not find any objects with tag") { + return ihco, pkgerrors.Wrap(err, "Getting Healthcheck data") + } + } + miniStatus := make([]InstanceMiniHCStatus, 0) + for key, value := range dbResp { + //value is a byte array + if value != nil { + resp := InstanceHCStatus{} + err = db.DBconn.Unmarshal(value, &resp) + if err != nil { + log.Error("Error unmarshaling Instance HC data", log.Fields{ + "error": err.Error(), + "key": key}) + } + //Filter-out healthchecks from other instances + if instanceId != resp.InstanceId { + continue + } + miniStatus = append(miniStatus, instanceMiniHCStatusFromStatus(resp)) + } + } + ihco.HCSummary = miniStatus + + return ihco, nil } diff --git a/src/k8splugin/internal/healthcheck/kubeclient.go b/src/k8splugin/internal/healthcheck/kubeclient.go index be4c6fcc..2a168a78 100644 --- a/src/k8splugin/internal/healthcheck/kubeclient.go +++ b/src/k8splugin/internal/healthcheck/kubeclient.go @@ -25,13 +25,15 @@ import ( //implements environment.KubeClient but overrides it so that //custom labels can be injected into created resources -// using internal k8sClient +//using internal k8sClient type KubeClientImpl struct { environment.KubeClient labels map[string]string k app.KubernetesClient } +var _ environment.KubeClient = KubeClientImpl{} + func NewKubeClient(instanceId, cloudRegion string) (*KubeClientImpl, error) { k8sClient := app.KubernetesClient{} err := k8sClient.Init(cloudRegion, instanceId) diff --git a/src/k8splugin/internal/healthcheck/stream.go b/src/k8splugin/internal/healthcheck/stream.go new file mode 100644 index 00000000..d7c6e654 --- /dev/null +++ b/src/k8splugin/internal/healthcheck/stream.go @@ -0,0 +1,75 @@ +/* +Copyright © 2021 Samsung Electronics +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package healthcheck + +import ( + "google.golang.org/grpc" + "k8s.io/helm/pkg/proto/hapi/release" + "k8s.io/helm/pkg/proto/hapi/services" + + "github.com/onap/multicloud-k8s/src/k8splugin/internal/db" + log "github.com/onap/multicloud-k8s/src/k8splugin/internal/logutils" + + pkgerrors "github.com/pkg/errors" +) + +//implements basic stream implementation that logs progress +//and updates state in DB +type StreamImpl struct { + Key HealthcheckKey + StoreName string + Tag string + grpc.ServerStream //only to satisfy interface needs, it's not used +} + +var _ services.ReleaseService_RunReleaseTestServer = StreamImpl{} + +func NewStream(key HealthcheckKey, store, tag string) *StreamImpl { + s := StreamImpl{ + Key: key, + StoreName: store, + Tag: tag, + } + return &s +} + +func (si StreamImpl) Send(m *services.TestReleaseResponse) error { + log.Info("Stream message", log.Fields{"msg": m}) + + DBResp, err := db.DBconn.Read(si.StoreName, si.Key, si.Tag) + if err != nil || DBResp == nil { + return pkgerrors.Wrap(err, "Error retrieving Healthcheck data") + } + + resp := InstanceHCStatus{} + err = db.DBconn.Unmarshal(DBResp, &resp) + if err != nil { + return pkgerrors.Wrap(err, "Unmarshaling Healthcheck Value") + } + + resp.Status = release.TestRun_Status_name[int32(m.Status)] + if m.Msg != "" { + if resp.Info == "" { + resp.Info = m.Msg + } else { + resp.Info = resp.Info + "\n" + m.Msg + } + } + + err = db.DBconn.Update(si.StoreName, si.Key, si.Tag, resp) + if err != nil { + return pkgerrors.Wrap(err, "Updating Healthcheck") + } + return nil +} diff --git a/src/k8splugin/internal/helm/helm.go b/src/k8splugin/internal/helm/helm.go index d39e8404..31047eb6 100644 --- a/src/k8splugin/internal/helm/helm.go +++ b/src/k8splugin/internal/helm/helm.go @@ -305,14 +305,6 @@ func (h *TemplateClient) GenerateKubernetesArtifacts(inputPath string, valueFile continue } - hook, _ := isHook(b, data) - // if hook is not nil, then append it to hooks list and continue - // if it's not, disregard error - if hook != nil { - hookList = append(hookList, hook) - continue - } - mfilePath := filepath.Join(outputDir, m.Name) utils.EnsureDirectory(mfilePath) err = ioutil.WriteFile(mfilePath, []byte(data), 0666) @@ -320,6 +312,14 @@ func (h *TemplateClient) GenerateKubernetesArtifacts(inputPath string, valueFile return retData, hookList, err } + hook, _ := isHook(mfilePath, data) + // if hook is not nil, then append it to hooks list and continue + // if it's not, disregard error + if hook != nil { + hookList = append(hookList, hook) + continue + } + gvk, err := getGroupVersionKind(data) if err != nil { return retData, hookList, err -- cgit 1.2.3-korg