summaryrefslogtreecommitdiffstats
path: root/src/k8splugin/internal/healthcheck
diff options
context:
space:
mode:
authorRitu Sood <ritu.sood@intel.com>2021-04-02 13:14:11 +0000
committerGerrit Code Review <gerrit@onap.org>2021-04-02 13:14:11 +0000
commite9a12efd25a570e64b728d8fc482939a727e9214 (patch)
treea0806977f1f153380a9dad9bd84679d9a9eb0ff1 /src/k8splugin/internal/healthcheck
parent705fad712373e73463fff22a3136e6e92b372048 (diff)
parent120019529489b5cbcf82d77eec228283fb12d43a (diff)
Merge "Fix Healthcheck API"
Diffstat (limited to 'src/k8splugin/internal/healthcheck')
-rw-r--r--src/k8splugin/internal/healthcheck/healthcheck.go214
-rw-r--r--src/k8splugin/internal/healthcheck/kubeclient.go4
-rw-r--r--src/k8splugin/internal/healthcheck/stream.go75
3 files changed, 255 insertions, 38 deletions
diff --git a/src/k8splugin/internal/healthcheck/healthcheck.go b/src/k8splugin/internal/healthcheck/healthcheck.go
index 341b1dba..cf1d39bd 100644
--- a/src/k8splugin/internal/healthcheck/healthcheck.go
+++ b/src/k8splugin/internal/healthcheck/healthcheck.go
@@ -15,6 +15,8 @@ package healthcheck
import (
"encoding/json"
+ "strings"
+ "time"
protorelease "k8s.io/helm/pkg/proto/hapi/release"
"k8s.io/helm/pkg/releasetesting"
@@ -27,22 +29,18 @@ import (
pkgerrors "github.com/pkg/errors"
)
-// HealthcheckState holds possible states of Healthcheck instance
-type HealthcheckState string
-
-const (
- HcS_UNKNOWN HealthcheckState = "UNKNOWN"
- HcS_STARTED HealthcheckState = "STARTED"
- HcS_RUNNING HealthcheckState = "RUNNING"
- HcS_SUCCESS HealthcheckState = "SUCCESS"
- HcS_FAILURE HealthcheckState = "FAILURE"
+var (
+ HcS_UNKNOWN string = protorelease.TestRun_Status_name[int32(protorelease.TestRun_UNKNOWN)]
+ HcS_RUNNING string = protorelease.TestRun_Status_name[int32(protorelease.TestRun_RUNNING)]
+ HcS_SUCCESS string = protorelease.TestRun_Status_name[int32(protorelease.TestRun_SUCCESS)]
+ HcS_FAILURE string = protorelease.TestRun_Status_name[int32(protorelease.TestRun_FAILURE)]
)
// InstanceHCManager interface exposes instance Healthcheck fuctionalities
type InstanceHCManager interface {
- Create(instanceId string) (InstanceHCStatus, error)
+ Create(instanceId string) (InstanceMiniHCStatus, error)
Get(instanceId, healthcheckId string) (InstanceHCStatus, error)
- List(instanceId string) ([]InstanceHCStatus, error)
+ List(instanceId string) (InstanceHCOverview, error)
Delete(instanceId, healthcheckId string) error
}
@@ -71,9 +69,24 @@ type InstanceHCClient struct {
// InstanceHCStatus holds healthcheck status
type InstanceHCStatus struct {
- releasetesting.TestSuite
- Id string
- Status HealthcheckState
+ InstanceId string `json:"instance-id"`
+ HealthcheckId string `json:"healthcheck-id"`
+ Status string `json:"status"`
+ Info string `json:"info"`
+ releasetesting.TestSuite `json:"test-suite"`
+}
+
+// InstanceMiniHCStatus holds only healthcheck summary
+type InstanceMiniHCStatus struct {
+ HealthcheckId string `json:"healthcheck-id"`
+ Status string `json:"status"`
+}
+
+// InstanceHCOverview holds Healthcheck-related data
+type InstanceHCOverview struct {
+ InstanceId string `json:"instance-id"`
+ HCSummary []InstanceMiniHCStatus `json:"healthcheck-summary"`
+ Hooks []*protorelease.Hook `json:"hooks"`
}
func NewHCClient() *InstanceHCClient {
@@ -83,7 +96,11 @@ func NewHCClient() *InstanceHCClient {
}
}
-func (ihc InstanceHCClient) Create(instanceId string) (InstanceHCStatus, error) {
+func instanceMiniHCStatusFromStatus(ihcs InstanceHCStatus) InstanceMiniHCStatus {
+ return InstanceMiniHCStatus{ihcs.HealthcheckId, ihcs.Status}
+}
+
+func (ihc InstanceHCClient) Create(instanceId string) (InstanceMiniHCStatus, error) {
//Generate ID
id := namegenerator.Generate()
@@ -91,67 +108,190 @@ func (ihc InstanceHCClient) Create(instanceId string) (InstanceHCStatus, error)
v := app.NewInstanceClient()
instance, err := v.Get(instanceId)
if err != nil {
- return InstanceHCStatus{}, pkgerrors.Wrap(err, "Getting instance")
+ return InstanceMiniHCStatus{}, pkgerrors.Wrap(err, "Getting instance")
}
//Prepare Environment, Request and Release structs
//TODO In future could derive params from request
client, err := NewKubeClient(instanceId, instance.Request.CloudRegion)
if err != nil {
- return InstanceHCStatus{}, pkgerrors.Wrap(err, "Preparing KubeClient")
+ return InstanceMiniHCStatus{}, pkgerrors.Wrap(err, "Preparing KubeClient")
+ }
+ key := HealthcheckKey{
+ InstanceId: instanceId,
+ HealthcheckId: id,
}
env := &releasetesting.Environment{
Namespace: instance.Namespace,
KubeClient: client,
Parallel: false,
+ Stream: NewStream(key, ihc.storeName, ihc.tagInst),
}
release := protorelease.Release{
Name: instance.ReleaseName,
Hooks: instance.Hooks,
}
- //Run HC
+ //Define HC
testSuite, err := releasetesting.NewTestSuite(&release)
if err != nil {
log.Error("Error creating TestSuite", log.Fields{
"Release": release,
})
- return InstanceHCStatus{}, pkgerrors.Wrap(err, "Creating TestSuite")
- }
- if err = testSuite.Run(env); err != nil {
- log.Error("Error running TestSuite", log.Fields{
- "TestSuite": testSuite,
- "Environment": env,
- })
- return InstanceHCStatus{}, pkgerrors.Wrap(err, "Running TestSuite")
+ return InstanceMiniHCStatus{}, pkgerrors.Wrap(err, "Creating TestSuite")
}
//Save state
ihcs := InstanceHCStatus{
- TestSuite: *testSuite,
- Id: id,
- Status: HcS_STARTED,
- }
- key := HealthcheckKey{
- InstanceId: instance.ID,
+ TestSuite: *testSuite,
HealthcheckId: id,
+ InstanceId: instanceId,
+ Status: HcS_RUNNING,
}
err = db.DBconn.Create(ihc.storeName, key, ihc.tagInst, ihcs)
if err != nil {
- return ihcs, pkgerrors.Wrap(err, "Creating Instance DB Entry")
+ return instanceMiniHCStatusFromStatus(ihcs),
+ pkgerrors.Wrap(err, "Creating Instance DB Entry")
}
- return ihcs, nil
+ // Run HC async
+ // If testSuite doesn't fail immediately, we let it continue in background
+ errC := make(chan error, 1)
+ timeout := make(chan bool, 1)
+ // Stream handles updates of testSuite run so we don't need to care
+ var RunAsync func() = func() {
+ err := ihcs.TestSuite.Run(env)
+ if err != nil {
+ log.Error("Error running TestSuite", log.Fields{
+ "TestSuite": ihcs.TestSuite,
+ "Environment": env,
+ "Error": err.Error(),
+ })
+ ihcs.Status = HcS_FAILURE
+ ihcs.Info = ihcs.Info + "\n" + err.Error()
+ }
+ // Send to channel before db update as it can be slow
+ errC <- err
+ // Download latest Status/Info
+ resp, err := ihc.Get(ihcs.InstanceId, ihcs.HealthcheckId)
+ if err != nil {
+ log.Error("Error querying Healthcheck status", log.Fields{"error": err})
+ return
+ }
+ ihcs.Status = resp.Status
+ ihcs.Info = resp.Info
+ // Update DB
+ err = db.DBconn.Update(ihc.storeName, key, ihc.tagInst, ihcs)
+ if err != nil {
+ log.Error("Error saving Testsuite failure in DB", log.Fields{
+ "InstanceHCStatus": ihcs,
+ "Error": err,
+ })
+ }
+ }
+ go func() {
+ //Time is hardcoded but the only impact is that typically http response is sent after 2s
+ //Could be considered shorter in future
+ time.Sleep(2 * time.Second)
+ timeout <- true
+ }()
+ go RunAsync()
+ select {
+ case err := <-errC:
+ if err == nil {
+ return instanceMiniHCStatusFromStatus(ihcs), nil
+ } else {
+ return instanceMiniHCStatusFromStatus(ihcs), err
+ }
+ case <-timeout:
+ return instanceMiniHCStatusFromStatus(ihcs), nil
+ }
}
func (ihc InstanceHCClient) Get(instanceId, healthcheckId string) (InstanceHCStatus, error) {
- return InstanceHCStatus{}, nil
+ key := HealthcheckKey{
+ InstanceId: instanceId,
+ HealthcheckId: healthcheckId,
+ }
+ DBResp, err := db.DBconn.Read(ihc.storeName, key, ihc.tagInst)
+ if err != nil || DBResp == nil {
+ return InstanceHCStatus{}, pkgerrors.Wrap(err, "Error retrieving Healthcheck data")
+ }
+
+ resp := InstanceHCStatus{}
+ err = db.DBconn.Unmarshal(DBResp, &resp)
+ if err != nil {
+ return InstanceHCStatus{}, pkgerrors.Wrap(err, "Unmarshaling Instance Value")
+ }
+ return resp, nil
}
func (ihc InstanceHCClient) Delete(instanceId, healthcheckId string) error {
+ v := app.NewInstanceClient()
+ instance, err := v.Get(instanceId)
+ if err != nil {
+ return pkgerrors.Wrap(err, "Getting instance")
+ }
+ client, err := NewKubeClient(instanceId, instance.Request.CloudRegion)
+ if err != nil {
+ return pkgerrors.Wrap(err, "Preparing KubeClient")
+ }
+ env := &releasetesting.Environment{
+ Namespace: instance.Namespace,
+ KubeClient: client,
+ }
+ ihcs, err := ihc.Get(instanceId, healthcheckId)
+ if err != nil {
+ return pkgerrors.Wrap(err, "Error querying Healthcheck status")
+ }
+ env.DeleteTestPods(ihcs.TestSuite.TestManifests)
+ key := HealthcheckKey{instanceId, healthcheckId}
+ err = db.DBconn.Delete(ihc.storeName, key, ihc.tagInst)
+ if err != nil {
+ return pkgerrors.Wrap(err, "Removing Healthcheck in DB")
+ }
return nil
}
-func (ihc InstanceHCClient) List(instanceId string) ([]InstanceHCStatus, error) {
- return make([]InstanceHCStatus, 0), nil
+func (ihc InstanceHCClient) List(instanceId string) (InstanceHCOverview, error) {
+ ihco := InstanceHCOverview{
+ InstanceId: instanceId,
+ }
+
+ // Retrieve info about available hooks
+ v := app.NewInstanceClient()
+ instance, err := v.Get(instanceId)
+ if err != nil {
+ return ihco, pkgerrors.Wrap(err, "Getting Instance data")
+ }
+ ihco.Hooks = instance.Hooks
+
+ // Retrieve info about running/completed healthchecks
+ dbResp, err := db.DBconn.ReadAll(ihc.storeName, ihc.tagInst)
+ if err != nil {
+ if !strings.Contains(err.Error(), "Did not find any objects with tag") {
+ return ihco, pkgerrors.Wrap(err, "Getting Healthcheck data")
+ }
+ }
+ miniStatus := make([]InstanceMiniHCStatus, 0)
+ for key, value := range dbResp {
+ //value is a byte array
+ if value != nil {
+ resp := InstanceHCStatus{}
+ err = db.DBconn.Unmarshal(value, &resp)
+ if err != nil {
+ log.Error("Error unmarshaling Instance HC data", log.Fields{
+ "error": err.Error(),
+ "key": key})
+ }
+ //Filter-out healthchecks from other instances
+ if instanceId != resp.InstanceId {
+ continue
+ }
+ miniStatus = append(miniStatus, instanceMiniHCStatusFromStatus(resp))
+ }
+ }
+ ihco.HCSummary = miniStatus
+
+ return ihco, nil
}
diff --git a/src/k8splugin/internal/healthcheck/kubeclient.go b/src/k8splugin/internal/healthcheck/kubeclient.go
index be4c6fcc..2a168a78 100644
--- a/src/k8splugin/internal/healthcheck/kubeclient.go
+++ b/src/k8splugin/internal/healthcheck/kubeclient.go
@@ -25,13 +25,15 @@ import (
//implements environment.KubeClient but overrides it so that
//custom labels can be injected into created resources
-// using internal k8sClient
+//using internal k8sClient
type KubeClientImpl struct {
environment.KubeClient
labels map[string]string
k app.KubernetesClient
}
+var _ environment.KubeClient = KubeClientImpl{}
+
func NewKubeClient(instanceId, cloudRegion string) (*KubeClientImpl, error) {
k8sClient := app.KubernetesClient{}
err := k8sClient.Init(cloudRegion, instanceId)
diff --git a/src/k8splugin/internal/healthcheck/stream.go b/src/k8splugin/internal/healthcheck/stream.go
new file mode 100644
index 00000000..d7c6e654
--- /dev/null
+++ b/src/k8splugin/internal/healthcheck/stream.go
@@ -0,0 +1,75 @@
+/*
+Copyright © 2021 Samsung Electronics
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+ http://www.apache.org/licenses/LICENSE-2.0
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package healthcheck
+
+import (
+ "google.golang.org/grpc"
+ "k8s.io/helm/pkg/proto/hapi/release"
+ "k8s.io/helm/pkg/proto/hapi/services"
+
+ "github.com/onap/multicloud-k8s/src/k8splugin/internal/db"
+ log "github.com/onap/multicloud-k8s/src/k8splugin/internal/logutils"
+
+ pkgerrors "github.com/pkg/errors"
+)
+
+//implements basic stream implementation that logs progress
+//and updates state in DB
+type StreamImpl struct {
+ Key HealthcheckKey
+ StoreName string
+ Tag string
+ grpc.ServerStream //only to satisfy interface needs, it's not used
+}
+
+var _ services.ReleaseService_RunReleaseTestServer = StreamImpl{}
+
+func NewStream(key HealthcheckKey, store, tag string) *StreamImpl {
+ s := StreamImpl{
+ Key: key,
+ StoreName: store,
+ Tag: tag,
+ }
+ return &s
+}
+
+func (si StreamImpl) Send(m *services.TestReleaseResponse) error {
+ log.Info("Stream message", log.Fields{"msg": m})
+
+ DBResp, err := db.DBconn.Read(si.StoreName, si.Key, si.Tag)
+ if err != nil || DBResp == nil {
+ return pkgerrors.Wrap(err, "Error retrieving Healthcheck data")
+ }
+
+ resp := InstanceHCStatus{}
+ err = db.DBconn.Unmarshal(DBResp, &resp)
+ if err != nil {
+ return pkgerrors.Wrap(err, "Unmarshaling Healthcheck Value")
+ }
+
+ resp.Status = release.TestRun_Status_name[int32(m.Status)]
+ if m.Msg != "" {
+ if resp.Info == "" {
+ resp.Info = m.Msg
+ } else {
+ resp.Info = resp.Info + "\n" + m.Msg
+ }
+ }
+
+ err = db.DBconn.Update(si.StoreName, si.Key, si.Tag, resp)
+ if err != nil {
+ return pkgerrors.Wrap(err, "Updating Healthcheck")
+ }
+ return nil
+}