diff options
author | Lukasz Rajewski <lukasz.rajewski@orange.com> | 2021-04-20 16:59:11 +0000 |
---|---|---|
committer | Gerrit Code Review <gerrit@onap.org> | 2021-04-20 16:59:11 +0000 |
commit | 4ff7900c3c74fa6b497af6d1e36f81e08a193d83 (patch) | |
tree | d6ce5f3dffe6130fd8c20b853ba6dc2fce48c349 /src/k8splugin/internal/healthcheck | |
parent | c7f90394d625a8e323e8095e711338ccc44c472b (diff) | |
parent | 1f60346da61383f18b7277037439711aef38a0fe (diff) |
Merge "Migrate to use Helm v3 libraries"
Diffstat (limited to 'src/k8splugin/internal/healthcheck')
-rw-r--r-- | src/k8splugin/internal/healthcheck/healthcheck.go | 261 | ||||
-rw-r--r-- | src/k8splugin/internal/healthcheck/hooks.go | 145 | ||||
-rw-r--r-- | src/k8splugin/internal/healthcheck/kubeclient.go | 61 | ||||
-rw-r--r-- | src/k8splugin/internal/healthcheck/stream.go | 75 |
4 files changed, 307 insertions, 235 deletions
diff --git a/src/k8splugin/internal/healthcheck/healthcheck.go b/src/k8splugin/internal/healthcheck/healthcheck.go index cf1d39bd..70f5bec2 100644 --- a/src/k8splugin/internal/healthcheck/healthcheck.go +++ b/src/k8splugin/internal/healthcheck/healthcheck.go @@ -16,26 +16,20 @@ package healthcheck import ( "encoding/json" "strings" - "time" + "sync" - protorelease "k8s.io/helm/pkg/proto/hapi/release" - "k8s.io/helm/pkg/releasetesting" + "helm.sh/helm/v3/pkg/release" + "helm.sh/helm/v3/pkg/time" "github.com/onap/multicloud-k8s/src/k8splugin/internal/app" "github.com/onap/multicloud-k8s/src/k8splugin/internal/db" + "github.com/onap/multicloud-k8s/src/k8splugin/internal/helm" log "github.com/onap/multicloud-k8s/src/k8splugin/internal/logutils" "github.com/onap/multicloud-k8s/src/k8splugin/internal/namegenerator" pkgerrors "github.com/pkg/errors" ) -var ( - HcS_UNKNOWN string = protorelease.TestRun_Status_name[int32(protorelease.TestRun_UNKNOWN)] - HcS_RUNNING string = protorelease.TestRun_Status_name[int32(protorelease.TestRun_RUNNING)] - HcS_SUCCESS string = protorelease.TestRun_Status_name[int32(protorelease.TestRun_SUCCESS)] - HcS_FAILURE string = protorelease.TestRun_Status_name[int32(protorelease.TestRun_FAILURE)] -) - // InstanceHCManager interface exposes instance Healthcheck fuctionalities type InstanceHCManager interface { Create(instanceId string) (InstanceMiniHCStatus, error) @@ -69,24 +63,30 @@ type InstanceHCClient struct { // InstanceHCStatus holds healthcheck status type InstanceHCStatus struct { - InstanceId string `json:"instance-id"` - HealthcheckId string `json:"healthcheck-id"` - Status string `json:"status"` - Info string `json:"info"` - releasetesting.TestSuite `json:"test-suite"` + InstanceId string `json:"instance-id"` + HealthcheckId string `json:"healthcheck-id"` + Status release.HookPhase `json:"status"` + TestSuite *TestSuite `json:"test-suite"` //TODO could be merged with current struct +} + +// TestSuite is a structure to hold compatibility with pre helm3 output +type TestSuite struct { + StartedAt time.Time + CompletedAt time.Time + Results []*HookStatus } // InstanceMiniHCStatus holds only healthcheck summary type InstanceMiniHCStatus struct { - HealthcheckId string `json:"healthcheck-id"` - Status string `json:"status"` + HealthcheckId string `json:"healthcheck-id"` + Status release.HookPhase `json:"status"` } // InstanceHCOverview holds Healthcheck-related data type InstanceHCOverview struct { InstanceId string `json:"instance-id"` HCSummary []InstanceMiniHCStatus `json:"healthcheck-summary"` - Hooks []*protorelease.Hook `json:"hooks"` + Hooks []*helm.Hook `json:"hooks"` } func NewHCClient() *InstanceHCClient { @@ -101,6 +101,8 @@ func instanceMiniHCStatusFromStatus(ihcs InstanceHCStatus) InstanceMiniHCStatus } func (ihc InstanceHCClient) Create(instanceId string) (InstanceMiniHCStatus, error) { + //TODO Handle hook delete policies + //Generate ID id := namegenerator.Generate() @@ -111,9 +113,8 @@ func (ihc InstanceHCClient) Create(instanceId string) (InstanceMiniHCStatus, err return InstanceMiniHCStatus{}, pkgerrors.Wrap(err, "Getting instance") } - //Prepare Environment, Request and Release structs - //TODO In future could derive params from request - client, err := NewKubeClient(instanceId, instance.Request.CloudRegion) + k8sClient := app.KubernetesClient{} + err = k8sClient.Init(instance.Request.CloudRegion, instanceId) if err != nil { return InstanceMiniHCStatus{}, pkgerrors.Wrap(err, "Preparing KubeClient") } @@ -121,91 +122,146 @@ func (ihc InstanceHCClient) Create(instanceId string) (InstanceMiniHCStatus, err InstanceId: instanceId, HealthcheckId: id, } - env := &releasetesting.Environment{ - Namespace: instance.Namespace, - KubeClient: client, - Parallel: false, - Stream: NewStream(key, ihc.storeName, ihc.tagInst), - } - release := protorelease.Release{ - Name: instance.ReleaseName, - Hooks: instance.Hooks, - } - //Define HC - testSuite, err := releasetesting.NewTestSuite(&release) - if err != nil { - log.Error("Error creating TestSuite", log.Fields{ - "Release": release, - }) - return InstanceMiniHCStatus{}, pkgerrors.Wrap(err, "Creating TestSuite") + //Filter out only relevant hooks + hooks := hookPairs{} + for _, hook := range instance.Hooks { + for _, hookEvent := range hook.Hook.Events { + if hookEvent == release.HookTest { //Helm3 no longer supports test-failure + hooks = append(hooks, hookPair{ + Definition: hook, + Status: &HookStatus{ + Name: hook.Hook.Name, + }, + }) + } + } } //Save state + testSuite := TestSuite{ + StartedAt: time.Now(), + Results: hooks.statuses(), + } ihcs := InstanceHCStatus{ - TestSuite: *testSuite, - HealthcheckId: id, InstanceId: instanceId, - Status: HcS_RUNNING, + HealthcheckId: id, + Status: release.HookPhaseRunning, + TestSuite: &testSuite, + } + + for _, h := range hooks { + h.Status.StartedAt = time.Now() + kr, err := k8sClient.CreateKind(h.Definition.KRT, instance.Namespace) + if err != nil { + // Set status fields + h.Status.Status = release.HookPhaseFailed + h.Status.CompletedAt = time.Now() + testSuite.CompletedAt = time.Now() + ihcs.Status = release.HookPhaseFailed + retErr := "Starting hook " + h.Status.Name + + // Dump to DB + err = db.DBconn.Create(ihc.storeName, key, ihc.tagInst, ihcs) + if err != nil { + retErr = retErr + " and couldn't save to DB" + } + + return instanceMiniHCStatusFromStatus(ihcs), + pkgerrors.Wrap(err, retErr) + } else { + h.Status.Status = release.HookPhaseRunning + h.Status.KR = kr + } } err = db.DBconn.Create(ihc.storeName, key, ihc.tagInst, ihcs) if err != nil { return instanceMiniHCStatusFromStatus(ihcs), pkgerrors.Wrap(err, "Creating Instance DB Entry") } - - // Run HC async - // If testSuite doesn't fail immediately, we let it continue in background - errC := make(chan error, 1) - timeout := make(chan bool, 1) - // Stream handles updates of testSuite run so we don't need to care - var RunAsync func() = func() { - err := ihcs.TestSuite.Run(env) - if err != nil { - log.Error("Error running TestSuite", log.Fields{ - "TestSuite": ihcs.TestSuite, - "Environment": env, - "Error": err.Error(), - }) - ihcs.Status = HcS_FAILURE - ihcs.Info = ihcs.Info + "\n" + err.Error() - } - // Send to channel before db update as it can be slow - errC <- err - // Download latest Status/Info - resp, err := ihc.Get(ihcs.InstanceId, ihcs.HealthcheckId) - if err != nil { - log.Error("Error querying Healthcheck status", log.Fields{"error": err}) - return + log.Info("Successfully initialized Healthcheck resources", log.Fields{ + "InstanceId": instanceId, + "HealthcheckId": id, + }) + go func() { + var wg sync.WaitGroup + update := make(chan bool) //True - hook finished, False - all hooks finished + for _, h := range hooks { + wg.Add(1) + go func(hookStatus *HookStatus) { + //TODO Handle errors here better in future, for now it's ok + hookStatus.Status, _ = getHookState(*hookStatus, k8sClient, instance.Namespace) + hookStatus.CompletedAt = time.Now() + log.Info("Hook finished", log.Fields{ + "HealthcheckId": id, + "InstanceId": instanceId, + "Hook": hookStatus.Name, + "Status": hookStatus.Status, + }) + update <- true + wg.Done() + return + }(h.Status) } - ihcs.Status = resp.Status - ihcs.Info = resp.Info - // Update DB - err = db.DBconn.Update(ihc.storeName, key, ihc.tagInst, ihcs) - if err != nil { - log.Error("Error saving Testsuite failure in DB", log.Fields{ - "InstanceHCStatus": ihcs, - "Error": err, + go func() { + wg.Wait() + log.Info("All hooks finished", log.Fields{ + "HealthcheckId": id, + "InstanceId": instanceId, }) + update <- false + return + }() + for { + select { + case b := <-update: + log.Info("Healthcheck update", log.Fields{ + "HealthcheckId": id, + "InstanceId": instanceId, + "Reason": map[bool]string{true: "Hook finished", false: "All hooks finished"}[b], + }) + if b { //Some hook finished - need to update DB + err = db.DBconn.Update(ihc.storeName, key, ihc.tagInst, ihcs) + if err != nil { + log.Error("Couldn't update database", log.Fields{ + "Store": ihc.storeName, + "Key": key, + "Payload": ihcs, + }) + } + } else { //All hooks finished - need to terminate goroutine + testSuite.CompletedAt = time.Now() + //If everything's fine, final result is OK + finalResult := release.HookPhaseSucceeded + //If at least single hook wasn't determined - it's Unknown + for _, h := range hooks { + if h.Status.Status == release.HookPhaseUnknown { + finalResult = release.HookPhaseUnknown + break + } + } + //Unless at least one hook failed, then we've failed + for _, h := range hooks { + if h.Status.Status == release.HookPhaseFailed { + finalResult = release.HookPhaseFailed + break + } + } + ihcs.Status = finalResult + err = db.DBconn.Update(ihc.storeName, key, ihc.tagInst, ihcs) + if err != nil { + log.Error("Couldn't update database", log.Fields{ + "Store": ihc.storeName, + "Key": key, + "Payload": ihcs, + }) + } + return + } + } } - } - go func() { - //Time is hardcoded but the only impact is that typically http response is sent after 2s - //Could be considered shorter in future - time.Sleep(2 * time.Second) - timeout <- true }() - go RunAsync() - select { - case err := <-errC: - if err == nil { - return instanceMiniHCStatusFromStatus(ihcs), nil - } else { - return instanceMiniHCStatusFromStatus(ihcs), err - } - case <-timeout: - return instanceMiniHCStatusFromStatus(ihcs), nil - } + return instanceMiniHCStatusFromStatus(ihcs), nil } func (ihc InstanceHCClient) Get(instanceId, healthcheckId string) (InstanceHCStatus, error) { @@ -227,25 +283,32 @@ func (ihc InstanceHCClient) Get(instanceId, healthcheckId string) (InstanceHCSta } func (ihc InstanceHCClient) Delete(instanceId, healthcheckId string) error { + key := HealthcheckKey{instanceId, healthcheckId} v := app.NewInstanceClient() instance, err := v.Get(instanceId) if err != nil { return pkgerrors.Wrap(err, "Getting instance") } - client, err := NewKubeClient(instanceId, instance.Request.CloudRegion) + ihcs, err := ihc.Get(instanceId, healthcheckId) + if err != nil { + return pkgerrors.Wrap(err, "Error querying Healthcheck status") + } + k8sClient := app.KubernetesClient{} + err = k8sClient.Init(instance.Request.CloudRegion, instanceId) if err != nil { return pkgerrors.Wrap(err, "Preparing KubeClient") } - env := &releasetesting.Environment{ - Namespace: instance.Namespace, - KubeClient: client, + cumulatedErr := "" + for _, hook := range ihcs.TestSuite.Results { + err = k8sClient.DeleteKind(hook.KR, instance.Namespace) + //FIXME handle "missing resource" error as not error - hook may be already deleted + if err != nil { + cumulatedErr += err.Error() + "\n" + } } - ihcs, err := ihc.Get(instanceId, healthcheckId) - if err != nil { - return pkgerrors.Wrap(err, "Error querying Healthcheck status") + if cumulatedErr != "" { + return pkgerrors.New("Removing hooks failed with errors:\n" + cumulatedErr) } - env.DeleteTestPods(ihcs.TestSuite.TestManifests) - key := HealthcheckKey{instanceId, healthcheckId} err = db.DBconn.Delete(ihc.storeName, key, ihc.tagInst) if err != nil { return pkgerrors.Wrap(err, "Removing Healthcheck in DB") diff --git a/src/k8splugin/internal/healthcheck/hooks.go b/src/k8splugin/internal/healthcheck/hooks.go new file mode 100644 index 00000000..0b1be9ac --- /dev/null +++ b/src/k8splugin/internal/healthcheck/hooks.go @@ -0,0 +1,145 @@ +/* +Copyright © 2021 Samsung Electronics +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package healthcheck + +import ( + "time" + + "github.com/onap/multicloud-k8s/src/k8splugin/internal/app" + "github.com/onap/multicloud-k8s/src/k8splugin/internal/helm" + log "github.com/onap/multicloud-k8s/src/k8splugin/internal/logutils" + + "helm.sh/helm/v3/pkg/release" + helmtime "helm.sh/helm/v3/pkg/time" + + batch "k8s.io/api/batch/v1" + v1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/runtime" + + pkgerrors "github.com/pkg/errors" +) + +type HookStatus struct { + StartedAt helmtime.Time `json:"started_at"` + CompletedAt helmtime.Time `json:"completed_at"` + Status release.HookPhase `json:"status"` + Name string `json:"name"` + KR helm.KubernetesResource `json:"-"` +} + +// Helper type to combine Hook definition with Status +type hookPair struct { + Definition *helm.Hook + Status *HookStatus +} + +// Wraper type to implement helper method for extraction +type hookPairs []hookPair + +// Helper function to retrieve slice of Statuses from slice of hookPairs +func (hps hookPairs) statuses() (hsps []*HookStatus) { + for _, hp := range hps { + hsps = append(hsps, hp.Status) + } + return +} + +// TODO Optimize by using k8s.io/client-go/tools/cache.NewListWatchFromClient just like +// in helm.sh/helm/v3/pkg/kube/client.go -> watchUntilReady() +func getHookState(hookStatus HookStatus, k8sClient app.KubernetesClient, namespace string) (release.HookPhase, error) { + // Initial check of Hook Resource type + switch hookStatus.KR.GVK.Kind { + case "Job", "Pod": + default: + //We don't know how to check state of such resource + return release.HookPhaseUnknown, nil + } + + for { + res, err := k8sClient.GetResourceStatus(hookStatus.KR, namespace) + if err != nil { + log.Error("Unable to check Resource Status", log.Fields{ + "Resource": hookStatus.KR, + "Namespace": namespace, + "Error": err, + }) + return release.HookPhaseUnknown, + pkgerrors.Wrap(err, "Unable to check Resource Status") + } + + var parsedRes runtime.Object + switch hookStatus.KR.GVK.Kind { + case "Job": + parsedRes = new(batch.Job) + case "Pod": + parsedRes = new(v1.Pod) + } + err = runtime.DefaultUnstructuredConverter.FromUnstructured(res.Status.Object, parsedRes) + if err != nil { + log.Error("Couldn't convert Response into runtime object", log.Fields{ + "Response": res.Status.Object, + "Error": err, + }) + return release.HookPhaseUnknown, + pkgerrors.Wrap(err, "Couldn't conver Response into runtime object") + } + + var tempState release.HookPhase + switch hookStatus.KR.GVK.Kind { + case "Job": + tempState = parseJobState(parsedRes) + case "Pod": + tempState = parsePodState(parsedRes) + } + if tempState != release.HookPhaseRunning { + return tempState, nil + } + //TODO should be changed to "Watching" of resource as pointed earlier + time.Sleep(5 * time.Second) + } +} + +// Based on kube/client.go -> waitForJob() +func parseJobState(obj runtime.Object) (state release.HookPhase) { + job, ok := obj.(*batch.Job) + if !ok { + //Something went wrong, and we don't want to parse such resource again + return release.HookPhaseUnknown + } + for _, c := range job.Status.Conditions { + if c.Type == batch.JobComplete && c.Status == "True" { + return release.HookPhaseSucceeded + } else if c.Type == batch.JobFailed && c.Status == "True" { + return release.HookPhaseFailed + } + } + return release.HookPhaseRunning +} + +// Based on kube/client.go -> waitForPodSuccess() +func parsePodState(obj runtime.Object) (state release.HookPhase) { + pod, ok := obj.(*v1.Pod) + if !ok { + return release.HookPhaseUnknown + } + + switch pod.Status.Phase { + case v1.PodSucceeded: + return release.HookPhaseSucceeded + case v1.PodFailed: + return release.HookPhaseFailed + default: + return release.HookPhaseRunning + } +} diff --git a/src/k8splugin/internal/healthcheck/kubeclient.go b/src/k8splugin/internal/healthcheck/kubeclient.go deleted file mode 100644 index 2a168a78..00000000 --- a/src/k8splugin/internal/healthcheck/kubeclient.go +++ /dev/null @@ -1,61 +0,0 @@ -/* -Copyright © 2021 Samsung Electronics -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - http://www.apache.org/licenses/LICENSE-2.0 -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package healthcheck - -import ( - "k8s.io/helm/pkg/kube" - "k8s.io/helm/pkg/tiller/environment" - - "github.com/onap/multicloud-k8s/src/k8splugin/internal/app" - "github.com/onap/multicloud-k8s/src/k8splugin/internal/config" - - pkgerrors "github.com/pkg/errors" -) - -//implements environment.KubeClient but overrides it so that -//custom labels can be injected into created resources -//using internal k8sClient -type KubeClientImpl struct { - environment.KubeClient - labels map[string]string - k app.KubernetesClient -} - -var _ environment.KubeClient = KubeClientImpl{} - -func NewKubeClient(instanceId, cloudRegion string) (*KubeClientImpl, error) { - k8sClient := app.KubernetesClient{} - err := k8sClient.Init(cloudRegion, instanceId) - if err != nil { - return nil, pkgerrors.Wrap(err, "Initializing k8sClient") - } - return &KubeClientImpl{ - labels: map[string]string{ - config.GetConfiguration().KubernetesLabelName: instanceId, - }, - KubeClient: kube.New(&k8sClient), - k: k8sClient, - }, nil -} - -/* FIXME -// Need to correct this later and provide override of Create method to use our k8sClient -// So that healthcheck hook resources would be labeled with vf-module data just like currently -// every k8splugin-managed resource is - -//Create function is overrided to label test resources with custom labels -func (kci *KubeClientImpl) Create(namespace string, reader io.Reader, timeout int64, shouldWait bool) error { - return nil -} -*/ diff --git a/src/k8splugin/internal/healthcheck/stream.go b/src/k8splugin/internal/healthcheck/stream.go deleted file mode 100644 index d7c6e654..00000000 --- a/src/k8splugin/internal/healthcheck/stream.go +++ /dev/null @@ -1,75 +0,0 @@ -/* -Copyright © 2021 Samsung Electronics -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - http://www.apache.org/licenses/LICENSE-2.0 -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package healthcheck - -import ( - "google.golang.org/grpc" - "k8s.io/helm/pkg/proto/hapi/release" - "k8s.io/helm/pkg/proto/hapi/services" - - "github.com/onap/multicloud-k8s/src/k8splugin/internal/db" - log "github.com/onap/multicloud-k8s/src/k8splugin/internal/logutils" - - pkgerrors "github.com/pkg/errors" -) - -//implements basic stream implementation that logs progress -//and updates state in DB -type StreamImpl struct { - Key HealthcheckKey - StoreName string - Tag string - grpc.ServerStream //only to satisfy interface needs, it's not used -} - -var _ services.ReleaseService_RunReleaseTestServer = StreamImpl{} - -func NewStream(key HealthcheckKey, store, tag string) *StreamImpl { - s := StreamImpl{ - Key: key, - StoreName: store, - Tag: tag, - } - return &s -} - -func (si StreamImpl) Send(m *services.TestReleaseResponse) error { - log.Info("Stream message", log.Fields{"msg": m}) - - DBResp, err := db.DBconn.Read(si.StoreName, si.Key, si.Tag) - if err != nil || DBResp == nil { - return pkgerrors.Wrap(err, "Error retrieving Healthcheck data") - } - - resp := InstanceHCStatus{} - err = db.DBconn.Unmarshal(DBResp, &resp) - if err != nil { - return pkgerrors.Wrap(err, "Unmarshaling Healthcheck Value") - } - - resp.Status = release.TestRun_Status_name[int32(m.Status)] - if m.Msg != "" { - if resp.Info == "" { - resp.Info = m.Msg - } else { - resp.Info = resp.Info + "\n" + m.Msg - } - } - - err = db.DBconn.Update(si.StoreName, si.Key, si.Tag, resp) - if err != nil { - return pkgerrors.Wrap(err, "Updating Healthcheck") - } - return nil -} |