summaryrefslogtreecommitdiffstats
path: root/src/k8splugin/internal
diff options
context:
space:
mode:
Diffstat (limited to 'src/k8splugin/internal')
-rw-r--r--src/k8splugin/internal/app/client.go162
-rw-r--r--src/k8splugin/internal/app/client_test.go2
-rw-r--r--src/k8splugin/internal/app/config.go21
-rw-r--r--src/k8splugin/internal/app/config_backend.go43
-rw-r--r--src/k8splugin/internal/app/config_test.go45
-rw-r--r--src/k8splugin/internal/app/deploymentutil.go178
-rw-r--r--src/k8splugin/internal/app/hook.go183
-rw-r--r--src/k8splugin/internal/app/hook_sorter.go50
-rw-r--r--src/k8splugin/internal/app/hook_test.go264
-rw-r--r--src/k8splugin/internal/app/instance.go594
-rw-r--r--src/k8splugin/internal/app/instance_test.go99
-rw-r--r--src/k8splugin/internal/app/query.go108
-rw-r--r--src/k8splugin/internal/db/etcd.go19
-rw-r--r--src/k8splugin/internal/db/etcd_testing.go12
-rw-r--r--src/k8splugin/internal/helm/helm.go3
-rw-r--r--src/k8splugin/internal/plugin/helpers.go15
-rw-r--r--src/k8splugin/internal/plugin/helpers_test.go2
-rw-r--r--src/k8splugin/internal/rb/archive.go3
-rw-r--r--src/k8splugin/internal/statuscheck/converter.go69
-rw-r--r--src/k8splugin/internal/statuscheck/ready.go393
-rw-r--r--src/k8splugin/internal/statuscheck/ready_test.go517
-rw-r--r--src/k8splugin/internal/statuscheck/resource.go85
-rw-r--r--src/k8splugin/internal/statuscheck/resource_test.go61
-rw-r--r--src/k8splugin/internal/statuscheck/wait.go109
-rw-r--r--src/k8splugin/internal/utils/deploymentutil.go178
-rw-r--r--src/k8splugin/internal/utils/utils.go (renamed from src/k8splugin/internal/utils.go)0
-rw-r--r--src/k8splugin/internal/utils/utils_test.go (renamed from src/k8splugin/internal/utils_test.go)2
27 files changed, 3128 insertions, 89 deletions
diff --git a/src/k8splugin/internal/app/client.go b/src/k8splugin/internal/app/client.go
index 00fd8e97..9813333e 100644
--- a/src/k8splugin/internal/app/client.go
+++ b/src/k8splugin/internal/app/client.go
@@ -1,6 +1,8 @@
/*
Copyright 2018 Intel Corporation.
Copyright © 2021 Samsung Electronics
+Copyright © 2021 Orange
+Copyright © 2021 Nokia Bell Labs.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
@@ -18,6 +20,15 @@ package app
import (
"context"
"io/ioutil"
+ appsv1 "k8s.io/api/apps/v1"
+ //appsv1beta1 "k8s.io/api/apps/v1beta1"
+ //appsv1beta2 "k8s.io/api/apps/v1beta2"
+ batchv1 "k8s.io/api/batch/v1"
+ corev1 "k8s.io/api/core/v1"
+ //extensionsv1beta1 "k8s.io/api/extensions/v1beta1"
+ //apiextv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
+ //apiextv1beta1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1beta1"
+ metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"os"
"strings"
"time"
@@ -27,10 +38,10 @@ import (
"github.com/onap/multicloud-k8s/src/k8splugin/internal/helm"
log "github.com/onap/multicloud-k8s/src/k8splugin/internal/logutils"
"github.com/onap/multicloud-k8s/src/k8splugin/internal/plugin"
+ logger "log"
pkgerrors "github.com/pkg/errors"
"k8s.io/apimachinery/pkg/api/meta"
- metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
@@ -61,6 +72,137 @@ type ResourceStatus struct {
Status unstructured.Unstructured `json:"status"`
}
+func (k *KubernetesClient) getObjTypeForHook(kind string) (runtime.Object, error) {
+ switch kind {
+ case "Job":
+ return &batchv1.Job{}, nil
+ case "Pod":
+ return &corev1.Pod{}, nil
+ case "Deployment":
+ return &appsv1.Deployment{}, nil
+ case "DaemonSet":
+ return &appsv1.DaemonSet{}, nil
+ case "StatefulSet":
+ return &appsv1.StatefulSet{}, nil
+ default:
+ return nil, pkgerrors.New("kind " + kind + " unknown")
+ }
+}
+
+func (k *KubernetesClient) getRestApi(apiVersion string) (rest.Interface, error) {
+ //based on kubectl api-versions
+ switch apiVersion {
+ case "admissionregistration.k8s.io/v1":
+ return k.clientSet.AdmissionregistrationV1().RESTClient(), nil
+ case "admissionregistration.k8s.io/v1beta1":
+ return k.clientSet.AdmissionregistrationV1beta1().RESTClient(), nil
+ case "apps/v1":
+ return k.clientSet.AppsV1().RESTClient(), nil
+ case "apps/v1beta1":
+ return k.clientSet.AppsV1beta1().RESTClient(), nil
+ case "apps/v1beta2":
+ return k.clientSet.AppsV1beta2().RESTClient(), nil
+ case "authentication.k8s.io/v1":
+ return k.clientSet.AuthenticationV1().RESTClient(), nil
+ case "authentication.k8s.io/v1beta1":
+ return k.clientSet.AuthenticationV1beta1().RESTClient(), nil
+ case "authorization.k8s.io/v1":
+ return k.clientSet.AuthorizationV1().RESTClient(), nil
+ case "authorization.k8s.io/v1beta1":
+ return k.clientSet.AuthorizationV1beta1().RESTClient(), nil
+ case "autoscaling/v1":
+ return k.clientSet.AutoscalingV1().RESTClient(), nil
+ case "autoscaling/v2beta1":
+ return k.clientSet.AutoscalingV2beta1().RESTClient(), nil
+ case "autoscaling/v2beta2":
+ return k.clientSet.AutoscalingV2beta2().RESTClient(), nil
+ case "batch/v1":
+ return k.clientSet.BatchV1().RESTClient(), nil
+ case "batch/v1beta1":
+ return k.clientSet.BatchV1beta1().RESTClient(), nil
+ case "certificates.k8s.io/v1":
+ return k.clientSet.CertificatesV1().RESTClient(), nil
+ case "certificates.k8s.io/v1beta1":
+ return k.clientSet.CertificatesV1beta1().RESTClient(), nil
+ case "coordination.k8s.io/v1":
+ return k.clientSet.CoordinationV1().RESTClient(), nil
+ case "coordination.k8s.io/v1beta1":
+ return k.clientSet.CoordinationV1beta1().RESTClient(), nil
+ case "v1":
+ return k.clientSet.CoreV1().RESTClient(), nil
+ case "discovery.k8s.io/v1beta1":
+ return k.clientSet.DiscoveryV1beta1().RESTClient(), nil
+ case "events.k8s.io/v1":
+ return k.clientSet.EventsV1().RESTClient(), nil
+ case "events.k8s.io/v1beta1":
+ return k.clientSet.EventsV1beta1().RESTClient(), nil
+ case "extensions/v1beta1":
+ return k.clientSet.ExtensionsV1beta1().RESTClient(), nil
+ case "flowcontrol.apiserver.k8s.io/v1alpha1":
+ return k.clientSet.FlowcontrolV1alpha1().RESTClient(), nil
+ case "networking.k8s.io/v1":
+ return k.clientSet.NetworkingV1().RESTClient(), nil
+ case "networking.k8s.io/v1beta1":
+ return k.clientSet.NetworkingV1beta1().RESTClient(), nil
+ case "node.k8s.io/v1alpha1":
+ return k.clientSet.NodeV1alpha1().RESTClient(), nil
+ case "node.k8s.io/v1beta1":
+ return k.clientSet.NodeV1beta1().RESTClient(), nil
+ case "policy/v1beta1":
+ return k.clientSet.PolicyV1beta1().RESTClient(), nil
+ case "rbac.authorization.k8s.io/v1":
+ return k.clientSet.RbacV1().RESTClient(), nil
+ case "rbac.authorization.k8s.io/v1alpha1":
+ return k.clientSet.RbacV1alpha1().RESTClient(), nil
+ case "rbac.authorization.k8s.io/v1beta1":
+ return k.clientSet.RbacV1beta1().RESTClient(), nil
+ case "scheduling.k8s.io/v1":
+ return k.clientSet.SchedulingV1().RESTClient(), nil
+ case "scheduling.k8s.io/v1alpha1":
+ return k.clientSet.SchedulingV1alpha1().RESTClient(), nil
+ case "scheduling.k8s.io/v1beta1":
+ return k.clientSet.SchedulingV1beta1().RESTClient(), nil
+ case "storage.k8s.io/v1":
+ return k.clientSet.StorageV1().RESTClient(), nil
+ case "storage.k8s.io/v1alpha1":
+ return k.clientSet.StorageV1alpha1().RESTClient(), nil
+ case "storage.k8s.io/v1beta1":
+ return k.clientSet.StorageV1beta1().RESTClient(), nil
+ default:
+ return nil, pkgerrors.New("Api version " + apiVersion + " unknown")
+ }
+}
+
+func (k *KubernetesClient) WatchHookUntilReady(timeout time.Duration, ns string, res helm.KubernetesResource) error {
+ //for now, only generic plugin has dedicated WatchUntilReady implemented. Later, we can implement this function
+ //for each plugin separately.
+ pluginImpl, err := plugin.GetPluginByKind("generic")
+ if err != nil {
+ return pkgerrors.Wrap(err, "Error loading plugin")
+ }
+
+ mapper := k.GetMapper()
+ apiVersion, kind := res.GVK.ToAPIVersionAndKind()
+ if apiVersion == "" {
+ //apiVersion is empty -> we can suppose that the rss is ready
+ logger.Printf("apiVersion is empty, consider that the rss is ready")
+ return nil
+ }
+ objType, err := k.getObjTypeForHook(kind)
+ if err != nil {
+ //have error from getObjTypeForHook -> this kind is not considered in hook -> consider ready
+ return nil
+ }
+
+ logger.Printf("apiVersion: %s, Kind: %s", apiVersion, kind)
+ restClient, err := k.getRestApi(apiVersion)
+ if err != nil {
+ return pkgerrors.Wrap(err, "Get rest client")
+ }
+
+ return pluginImpl.WatchUntilReady(timeout, ns, res, mapper, restClient, objType, k.clientSet)
+}
+
// getPodsByLabel yields status of all pods under given instance ID
func (k *KubernetesClient) getPodsByLabel(namespace string) ([]ResourceStatus, error) {
client := k.GetStandardClient().CoreV1().Pods(namespace)
@@ -121,9 +263,11 @@ func (k *KubernetesClient) queryResources(apiVersion, kind, labelSelector, names
return nil, pkgerrors.Wrap(err, "Querying for resources")
}
- resp := make([]ResourceStatus, len(unstrList.Items))
+ resp := make([]ResourceStatus, 0)
for _, unstr := range unstrList.Items {
- resp = append(resp, ResourceStatus{unstr.GetName(), gvk, unstr})
+ if unstr.GetName() != "" {
+ resp = append(resp, ResourceStatus{unstr.GetName(), gvk, unstr})
+ }
}
return resp, nil
}
@@ -276,8 +420,7 @@ func (k *KubernetesClient) ensureNamespace(namespace string) error {
return nil
}
-func (k *KubernetesClient) CreateKind(resTempl helm.KubernetesResourceTemplate,
- namespace string) (helm.KubernetesResource, error) {
+func (k *KubernetesClient) CreateKind(resTempl helm.KubernetesResourceTemplate, namespace string) (helm.KubernetesResource, error) {
if _, err := os.Stat(resTempl.FilePath); os.IsNotExist(err) {
return helm.KubernetesResource{}, pkgerrors.New("File " + resTempl.FilePath + "does not exists")
@@ -317,7 +460,7 @@ func (k *KubernetesClient) updateKind(resTempl helm.KubernetesResourceTemplate,
namespace string) (helm.KubernetesResource, error) {
if _, err := os.Stat(resTempl.FilePath); os.IsNotExist(err) {
- return helm.KubernetesResource{}, pkgerrors.New("File " + resTempl.FilePath + "does not exists")
+ return helm.KubernetesResource{}, pkgerrors.New("File " + resTempl.FilePath + " does not exists")
}
log.Info("Processing Kubernetes Resource", log.Fields{
@@ -353,16 +496,17 @@ func (k *KubernetesClient) updateKind(resTempl helm.KubernetesResourceTemplate,
func (k *KubernetesClient) createResources(sortedTemplates []helm.KubernetesResourceTemplate,
namespace string) ([]helm.KubernetesResource, error) {
+ var createdResources []helm.KubernetesResource
+
err := k.ensureNamespace(namespace)
if err != nil {
- return nil, pkgerrors.Wrap(err, "Creating Namespace")
+ return createdResources, pkgerrors.Wrap(err, "Creating Namespace")
}
- var createdResources []helm.KubernetesResource
for _, resTempl := range sortedTemplates {
resCreated, err := k.CreateKind(resTempl, namespace)
if err != nil {
- return nil, pkgerrors.Wrapf(err, "Error creating kind: %+v", resTempl.GVK)
+ return createdResources, pkgerrors.Wrapf(err, "Error creating kind: %+v", resTempl.GVK)
}
createdResources = append(createdResources, resCreated)
}
diff --git a/src/k8splugin/internal/app/client_test.go b/src/k8splugin/internal/app/client_test.go
index 6db541a4..0ba244d2 100644
--- a/src/k8splugin/internal/app/client_test.go
+++ b/src/k8splugin/internal/app/client_test.go
@@ -15,13 +15,13 @@ package app
import (
"encoding/base64"
+ "github.com/onap/multicloud-k8s/src/k8splugin/internal/utils"
"io/ioutil"
"os"
"plugin"
"reflect"
"testing"
- utils "github.com/onap/multicloud-k8s/src/k8splugin/internal"
"github.com/onap/multicloud-k8s/src/k8splugin/internal/connection"
"github.com/onap/multicloud-k8s/src/k8splugin/internal/db"
"github.com/onap/multicloud-k8s/src/k8splugin/internal/helm"
diff --git a/src/k8splugin/internal/app/config.go b/src/k8splugin/internal/app/config.go
index d0f8876d..94acadcc 100644
--- a/src/k8splugin/internal/app/config.go
+++ b/src/k8splugin/internal/app/config.go
@@ -42,7 +42,7 @@ type ConfigResult struct {
ProfileName string `json:"profile-name"`
ConfigName string `json:"config-name"`
TemplateName string `json:"template-name"`
- ConfigVersion uint `json:"config-verion"`
+ ConfigVersion uint `json:"config-version"`
}
//ConfigRollback input
@@ -62,6 +62,7 @@ type ConfigTagit struct {
type ConfigManager interface {
Create(instanceID string, p Config) (ConfigResult, error)
Get(instanceID, configName string) (Config, error)
+ List(instanceID string) ([]Config, error)
Help() map[string]string
Update(instanceID, configName string, p Config) (ConfigResult, error)
Delete(instanceID, configName string) (ConfigResult, error)
@@ -225,6 +226,24 @@ func (v *ConfigClient) Get(instanceID, configName string) (Config, error) {
return cfg, nil
}
+// List config entry in the database
+func (v *ConfigClient) List(instanceID string) ([]Config, error) {
+
+ // Acquire per profile Mutex
+ lock, _ := getProfileData(instanceID)
+ lock.Lock()
+ defer lock.Unlock()
+ // Read Config DB
+ cs := ConfigStore{
+ instanceID: instanceID,
+ }
+ cfg, err := cs.getConfigList()
+ if err != nil {
+ return []Config{}, pkgerrors.Wrap(err, "Get Config DB Entry")
+ }
+ return cfg, nil
+}
+
// Delete the Config from database
func (v *ConfigClient) Delete(instanceID, configName string) (ConfigResult, error) {
diff --git a/src/k8splugin/internal/app/config_backend.go b/src/k8splugin/internal/app/config_backend.go
index e2f802c7..30a480df 100644
--- a/src/k8splugin/internal/app/config_backend.go
+++ b/src/k8splugin/internal/app/config_backend.go
@@ -170,6 +170,33 @@ func (c ConfigStore) getConfig() (Config, error) {
return Config{}, pkgerrors.Wrap(err, "Get Config DB Entry")
}
+// Read the config entry in the database
+func (c ConfigStore) getConfigList() ([]Config, error) {
+ rbName, rbVersion, profileName, _, err := resolveModelFromInstance(c.instanceID)
+ if err != nil {
+ return []Config{}, pkgerrors.Wrap(err, "Retrieving model info")
+ }
+ cfgKey := constructKey(rbName, rbVersion, profileName, c.instanceID, tagConfig)
+ values, err := db.Etcd.GetAll(cfgKey)
+ if err != nil {
+ return []Config{}, pkgerrors.Wrap(err, "Get Config DB List")
+ }
+ //value is a byte array
+ if values != nil {
+ result := make([]Config, 0)
+ for _, value := range values {
+ cfg := Config{}
+ err = db.DeSerialize(string(value), &cfg)
+ if err != nil {
+ return []Config{}, pkgerrors.Wrap(err, "Unmarshaling Config Value")
+ }
+ result = append(result, cfg)
+ }
+ return result, nil
+ }
+ return []Config{}, pkgerrors.Wrap(err, "Get Config DB List")
+}
+
// Delete the config entry in the database
func (c ConfigStore) deleteConfig() (Config, error) {
@@ -353,12 +380,12 @@ func (c ConfigVersionStore) decrementVersion() error {
// Apply Config
func applyConfig(instanceID string, p Config, pChannel chan configResourceList, action string) error {
- rbName, rbVersion, profileName, _, err := resolveModelFromInstance(instanceID)
+ rbName, rbVersion, profileName, releaseName, err := resolveModelFromInstance(instanceID)
if err != nil {
return pkgerrors.Wrap(err, "Retrieving model info")
}
// Get Template and Resolve the template with values
- crl, err := resolve(rbName, rbVersion, profileName, p)
+ crl, err := resolve(rbName, rbVersion, profileName, p, releaseName)
if err != nil {
return pkgerrors.Wrap(err, "Resolve Config")
}
@@ -436,7 +463,7 @@ func scheduleResources(c chan configResourceList) {
//Resolve returns the path where the helm chart merged with
//configuration overrides resides.
-var resolve = func(rbName, rbVersion, profileName string, p Config) (configResourceList, error) {
+var resolve = func(rbName, rbVersion, profileName string, p Config, releaseName string) (configResourceList, error) {
var resTemplates []helm.KubernetesResourceTemplate
@@ -483,9 +510,17 @@ var resolve = func(rbName, rbVersion, profileName string, p Config) (configResou
return configResourceList{}, pkgerrors.Wrap(err, "Extracting Template")
}
+ var finalReleaseName string
+
+ if releaseName == "" {
+ finalReleaseName = profile.ReleaseName
+ } else {
+ finalReleaseName = releaseName
+ }
+
helmClient := helm.NewTemplateClient(profile.KubernetesVersion,
profile.Namespace,
- profile.ReleaseName)
+ finalReleaseName)
chartPath := filepath.Join(chartBasePath, t.ChartName)
resTemplates, _, err = helmClient.GenerateKubernetesArtifacts(chartPath,
diff --git a/src/k8splugin/internal/app/config_test.go b/src/k8splugin/internal/app/config_test.go
index 028895d7..9ee96881 100644
--- a/src/k8splugin/internal/app/config_test.go
+++ b/src/k8splugin/internal/app/config_test.go
@@ -19,10 +19,11 @@ package app
import (
"fmt"
- "github.com/onap/multicloud-k8s/src/k8splugin/internal/db"
"reflect"
"strings"
"testing"
+
+ "github.com/onap/multicloud-k8s/src/k8splugin/internal/db"
// pkgerrors "github.com/pkg/errors"
)
@@ -90,7 +91,7 @@ func TestCreateConfig(t *testing.T) {
db.Etcd = testCase.mockdb
db.DBconn = provideMockModelData(testCase.instanceID, testCase.rbName,
testCase.rbVersion, testCase.profileName)
- resolve = func(rbName, rbVersion, profileName string, p Config) (configResourceList, error) {
+ resolve = func(rbName, rbVersion, profileName string, p Config, releaseName string) (configResourceList, error) {
return configResourceList{}, nil
}
impl := NewConfigClient()
@@ -104,7 +105,7 @@ func TestCreateConfig(t *testing.T) {
}
} else {
if reflect.DeepEqual(testCase.expected, got) == false {
- t.Errorf("Create Resource Bundle returned unexpected body: got %v;"+
+ t.Errorf("Create returned unexpected body: got %v;"+
" expected %v", got, testCase.expected)
}
}
@@ -203,7 +204,7 @@ func TestRollbackConfig(t *testing.T) {
db.Etcd = testCase.mockdb
db.DBconn = provideMockModelData(testCase.instanceID, testCase.rbName,
testCase.rbVersion, testCase.profileName)
- resolve = func(rbName, rbVersion, profileName string, p Config) (configResourceList, error) {
+ resolve = func(rbName, rbVersion, profileName string, p Config, releaseName string) (configResourceList, error) {
return configResourceList{}, nil
}
impl := NewConfigClient()
@@ -217,10 +218,38 @@ func TestRollbackConfig(t *testing.T) {
}
} else {
if reflect.DeepEqual(testCase.expected1, got) == false {
- t.Errorf("Create Resource Bundle returned unexpected body: got %v;"+
+ t.Errorf("Create returned unexpected body: got %v;"+
" expected %v", got, testCase.expected1)
}
}
+ get, err := impl.Get(testCase.instanceID, testCase.inp.ConfigName)
+ if err != nil {
+ if testCase.expectedError == "" {
+ t.Fatalf("Get returned an unexpected error %s", err)
+ }
+ if strings.Contains(err.Error(), testCase.expectedError) == false {
+ t.Fatalf("Get returned an unexpected error %s", err)
+ }
+ } else {
+ if reflect.DeepEqual(testCase.inp, get) == false {
+ t.Errorf("Get returned unexpected body: got %v;"+
+ " expected %v", get, testCase.inp)
+ }
+ }
+ getList, err := impl.List(testCase.instanceID)
+ if err != nil {
+ if testCase.expectedError == "" {
+ t.Fatalf("List returned an unexpected error %s", err)
+ }
+ if strings.Contains(err.Error(), testCase.expectedError) == false {
+ t.Fatalf("List returned an unexpected error %s", err)
+ }
+ } else {
+ if reflect.DeepEqual([]Config{testCase.inp}, getList) == false {
+ t.Errorf("List returned unexpected body: got %v;"+
+ " expected %v", getList, []Config{testCase.inp})
+ }
+ }
got, err = impl.Update(testCase.instanceID, testCase.inp.ConfigName, testCase.inpUpdate1)
if err != nil {
if testCase.expectedError == "" {
@@ -231,7 +260,7 @@ func TestRollbackConfig(t *testing.T) {
}
} else {
if reflect.DeepEqual(testCase.expected2, got) == false {
- t.Errorf("Create Resource Bundle returned unexpected body: got %v;"+
+ t.Errorf("Create returned unexpected body: got %v;"+
" expected %v", got, testCase.expected2)
}
}
@@ -245,7 +274,7 @@ func TestRollbackConfig(t *testing.T) {
}
} else {
if reflect.DeepEqual(testCase.expected3, got) == false {
- t.Errorf("Create Resource Bundle returned unexpected body: got %v;"+
+ t.Errorf("Create returned unexpected body: got %v;"+
" expected %v", got, testCase.expected3)
}
}
@@ -259,7 +288,7 @@ func TestRollbackConfig(t *testing.T) {
}
} else {
if reflect.DeepEqual(testCase.expected4, got) == false {
- t.Errorf("Create Resource Bundle returned unexpected body: got %v;"+
+ t.Errorf("Create returned unexpected body: got %v;"+
" expected %v", got, testCase.expected4)
}
}
diff --git a/src/k8splugin/internal/app/deploymentutil.go b/src/k8splugin/internal/app/deploymentutil.go
new file mode 100644
index 00000000..e945b055
--- /dev/null
+++ b/src/k8splugin/internal/app/deploymentutil.go
@@ -0,0 +1,178 @@
+/*
+Copyright 2016 The Kubernetes Authors.
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package app
+
+import (
+ "context"
+ "sort"
+
+ apps "k8s.io/api/apps/v1"
+ v1 "k8s.io/api/core/v1"
+ apiequality "k8s.io/apimachinery/pkg/api/equality"
+ metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
+ intstrutil "k8s.io/apimachinery/pkg/util/intstr"
+ appsclient "k8s.io/client-go/kubernetes/typed/apps/v1"
+)
+
+// deploymentutil contains a copy of a few functions from Kubernetes controller code to avoid a dependency on k8s.io/kubernetes.
+// This code is copied from https://github.com/kubernetes/kubernetes/blob/e856613dd5bb00bcfaca6974431151b5c06cbed5/pkg/controller/deployment/util/deployment_util.go
+// No changes to the code were made other than removing some unused functions
+
+// RsListFunc returns the ReplicaSet from the ReplicaSet namespace and the List metav1.ListOptions.
+type RsListFunc func(string, metav1.ListOptions) ([]*apps.ReplicaSet, error)
+
+// ListReplicaSets returns a slice of RSes the given deployment targets.
+// Note that this does NOT attempt to reconcile ControllerRef (adopt/orphan),
+// because only the controller itself should do that.
+// However, it does filter out anything whose ControllerRef doesn't match.
+func ListReplicaSets(deployment *apps.Deployment, getRSList RsListFunc) ([]*apps.ReplicaSet, error) {
+ // TODO: Right now we list replica sets by their labels. We should list them by selector, i.e. the replica set's selector
+ // should be a superset of the deployment's selector, see https://github.com/kubernetes/kubernetes/issues/19830.
+ namespace := deployment.Namespace
+ selector, err := metav1.LabelSelectorAsSelector(deployment.Spec.Selector)
+ if err != nil {
+ return nil, err
+ }
+ options := metav1.ListOptions{LabelSelector: selector.String()}
+ all, err := getRSList(namespace, options)
+ if err != nil {
+ return nil, err
+ }
+ // Only include those whose ControllerRef matches the Deployment.
+ owned := make([]*apps.ReplicaSet, 0, len(all))
+ for _, rs := range all {
+ if metav1.IsControlledBy(rs, deployment) {
+ owned = append(owned, rs)
+ }
+ }
+ return owned, nil
+}
+
+// ReplicaSetsByCreationTimestamp sorts a list of ReplicaSet by creation timestamp, using their names as a tie breaker.
+type ReplicaSetsByCreationTimestamp []*apps.ReplicaSet
+
+func (o ReplicaSetsByCreationTimestamp) Len() int { return len(o) }
+func (o ReplicaSetsByCreationTimestamp) Swap(i, j int) { o[i], o[j] = o[j], o[i] }
+func (o ReplicaSetsByCreationTimestamp) Less(i, j int) bool {
+ if o[i].CreationTimestamp.Equal(&o[j].CreationTimestamp) {
+ return o[i].Name < o[j].Name
+ }
+ return o[i].CreationTimestamp.Before(&o[j].CreationTimestamp)
+}
+
+// FindNewReplicaSet returns the new RS this given deployment targets (the one with the same pod template).
+func FindNewReplicaSet(deployment *apps.Deployment, rsList []*apps.ReplicaSet) *apps.ReplicaSet {
+ sort.Sort(ReplicaSetsByCreationTimestamp(rsList))
+ for i := range rsList {
+ if EqualIgnoreHash(&rsList[i].Spec.Template, &deployment.Spec.Template) {
+ // In rare cases, such as after cluster upgrades, Deployment may end up with
+ // having more than one new ReplicaSets that have the same template as its template,
+ // see https://github.com/kubernetes/kubernetes/issues/40415
+ // We deterministically choose the oldest new ReplicaSet.
+ return rsList[i]
+ }
+ }
+ // new ReplicaSet does not exist.
+ return nil
+}
+
+// EqualIgnoreHash returns true if two given podTemplateSpec are equal, ignoring the diff in value of Labels[pod-template-hash]
+// We ignore pod-template-hash because:
+// 1. The hash result would be different upon podTemplateSpec API changes
+// (e.g. the addition of a new field will cause the hash code to change)
+// 2. The deployment template won't have hash labels
+func EqualIgnoreHash(template1, template2 *v1.PodTemplateSpec) bool {
+ t1Copy := template1.DeepCopy()
+ t2Copy := template2.DeepCopy()
+ // Remove hash labels from template.Labels before comparing
+ delete(t1Copy.Labels, apps.DefaultDeploymentUniqueLabelKey)
+ delete(t2Copy.Labels, apps.DefaultDeploymentUniqueLabelKey)
+ return apiequality.Semantic.DeepEqual(t1Copy, t2Copy)
+}
+
+// GetNewReplicaSet returns a replica set that matches the intent of the given deployment; get ReplicaSetList from client interface.
+// Returns nil if the new replica set doesn't exist yet.
+func GetNewReplicaSet(deployment *apps.Deployment, c appsclient.AppsV1Interface) (*apps.ReplicaSet, error) {
+ rsList, err := ListReplicaSets(deployment, RsListFromClient(c))
+ if err != nil {
+ return nil, err
+ }
+ return FindNewReplicaSet(deployment, rsList), nil
+}
+
+// RsListFromClient returns an rsListFunc that wraps the given client.
+func RsListFromClient(c appsclient.AppsV1Interface) RsListFunc {
+ return func(namespace string, options metav1.ListOptions) ([]*apps.ReplicaSet, error) {
+ rsList, err := c.ReplicaSets(namespace).List(context.Background(), options)
+ if err != nil {
+ return nil, err
+ }
+ var ret []*apps.ReplicaSet
+ for i := range rsList.Items {
+ ret = append(ret, &rsList.Items[i])
+ }
+ return ret, err
+ }
+}
+
+// IsRollingUpdate returns true if the strategy type is a rolling update.
+func IsRollingUpdate(deployment *apps.Deployment) bool {
+ return deployment.Spec.Strategy.Type == apps.RollingUpdateDeploymentStrategyType
+}
+
+// MaxUnavailable returns the maximum unavailable pods a rolling deployment can take.
+func MaxUnavailable(deployment apps.Deployment) int32 {
+ if !IsRollingUpdate(&deployment) || *(deployment.Spec.Replicas) == 0 {
+ return int32(0)
+ }
+ // Error caught by validation
+ _, maxUnavailable, _ := ResolveFenceposts(deployment.Spec.Strategy.RollingUpdate.MaxSurge, deployment.Spec.Strategy.RollingUpdate.MaxUnavailable, *(deployment.Spec.Replicas))
+ if maxUnavailable > *deployment.Spec.Replicas {
+ return *deployment.Spec.Replicas
+ }
+ return maxUnavailable
+}
+
+// ResolveFenceposts resolves both maxSurge and maxUnavailable. This needs to happen in one
+// step. For example:
+//
+// 2 desired, max unavailable 1%, surge 0% - should scale old(-1), then new(+1), then old(-1), then new(+1)
+// 1 desired, max unavailable 1%, surge 0% - should scale old(-1), then new(+1)
+// 2 desired, max unavailable 25%, surge 1% - should scale new(+1), then old(-1), then new(+1), then old(-1)
+// 1 desired, max unavailable 25%, surge 1% - should scale new(+1), then old(-1)
+// 2 desired, max unavailable 0%, surge 1% - should scale new(+1), then old(-1), then new(+1), then old(-1)
+// 1 desired, max unavailable 0%, surge 1% - should scale new(+1), then old(-1)
+func ResolveFenceposts(maxSurge, maxUnavailable *intstrutil.IntOrString, desired int32) (int32, int32, error) {
+ surge, err := intstrutil.GetValueFromIntOrPercent(intstrutil.ValueOrDefault(maxSurge, intstrutil.FromInt(0)), int(desired), true)
+ if err != nil {
+ return 0, 0, err
+ }
+ unavailable, err := intstrutil.GetValueFromIntOrPercent(intstrutil.ValueOrDefault(maxUnavailable, intstrutil.FromInt(0)), int(desired), false)
+ if err != nil {
+ return 0, 0, err
+ }
+
+ if surge == 0 && unavailable == 0 {
+ // Validation should never allow the user to explicitly use zero values for both maxSurge
+ // maxUnavailable. Due to rounding down maxUnavailable though, it may resolve to zero.
+ // If both fenceposts resolve to zero, then we should set maxUnavailable to 1 on the
+ // theory that surge might not work due to quota.
+ unavailable = 1
+ }
+
+ return int32(surge), int32(unavailable), nil
+}
diff --git a/src/k8splugin/internal/app/hook.go b/src/k8splugin/internal/app/hook.go
new file mode 100644
index 00000000..ebf5f8e3
--- /dev/null
+++ b/src/k8splugin/internal/app/hook.go
@@ -0,0 +1,183 @@
+/*
+Copyright © 2021 Nokia Bell Labs
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+ http://www.apache.org/licenses/LICENSE-2.0
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package app
+
+import (
+ "fmt"
+ "github.com/onap/multicloud-k8s/src/k8splugin/internal/db"
+ "github.com/onap/multicloud-k8s/src/k8splugin/internal/helm"
+ "helm.sh/helm/v3/pkg/release"
+ "log"
+ "strings"
+ "time"
+)
+
+// Timeout used when deleting resources with a hook-delete-policy.
+const defaultHookDeleteTimeoutInSeconds = int64(60)
+
+// HookClient implements the Helm Hook interface
+type HookClient struct {
+ kubeNameSpace string
+ id string
+ dbStoreName string
+ dbTagInst string
+}
+
+type MultiCloudHook struct{
+ release.Hook
+ Group string
+ Version string
+}
+
+// NewHookClient returns a new instance of HookClient
+func NewHookClient(namespace, id, dbStoreName, dbTagInst string) *HookClient {
+ return &HookClient{
+ kubeNameSpace: namespace,
+ id: id,
+ dbStoreName: dbStoreName,
+ dbTagInst: dbTagInst,
+ }
+}
+
+func (hc *HookClient) getHookByEvent(hs []*helm.Hook, hook release.HookEvent) []*helm.Hook {
+ hooks := []*helm.Hook{}
+ for _, h := range hs {
+ for _, e := range h.Hook.Events {
+ if e == hook {
+ hooks = append(hooks, h)
+ }
+ }
+ }
+ return hooks
+}
+
+// Mimic function ExecHook in helm/pkg/tiller/release_server.go
+func (hc *HookClient) ExecHook(
+ k8sClient KubernetesClient,
+ hs []*helm.Hook,
+ hook release.HookEvent,
+ timeout int64,
+ startIndex int,
+ dbData *InstanceDbData) (error){
+ executingHooks := hc.getHookByEvent(hs, hook)
+ key := InstanceKey{
+ ID: hc.id,
+ }
+ log.Printf("Executing %d %s hook(s) for instance %s", len(executingHooks), hook, hc.id)
+ executingHooks = sortByHookWeight(executingHooks)
+
+ for index, h := range executingHooks {
+ if index < startIndex {
+ continue
+ }
+ // Set default delete policy to before-hook-creation
+ if h.Hook.DeletePolicies == nil || len(h.Hook.DeletePolicies) == 0 {
+ h.Hook.DeletePolicies = []release.HookDeletePolicy{release.HookBeforeHookCreation}
+ }
+ if err := hc.deleteHookByPolicy(h, release.HookBeforeHookCreation, k8sClient); err != nil {
+ return err
+ }
+ //update DB here before the creation of the hook, if the plugin quits
+ //-> when it comes back, it will continue from next hook and consider that this one is done
+ if dbData != nil {
+ dbData.HookProgress = fmt.Sprintf("%d/%d", index + 1, len(executingHooks))
+ err := db.DBconn.Update(hc.dbStoreName, key, hc.dbTagInst, dbData)
+ if err != nil {
+ return err
+ }
+ }
+ log.Printf(" Instance: %s, Creating %s hook %s, index %d", hc.id, hook, h.Hook.Name, index)
+ resTempl := helm.KubernetesResourceTemplate{
+ GVK: h.KRT.GVK,
+ FilePath: h.KRT.FilePath,
+ }
+ createdHook, err := k8sClient.CreateKind(resTempl, hc.kubeNameSpace)
+ if err != nil {
+ log.Printf(" Instance: %s, Warning: %s hook %s, filePath: %s, error: %s", hc.id, hook, h.Hook.Name, h.KRT.FilePath, err)
+ hc.deleteHookByPolicy(h, release.HookFailed, k8sClient)
+ return err
+ }
+ if hook != "crd-install" {
+ //timeout <= 0 -> do not wait
+ if timeout > 0 {
+ // Watch hook resources until they are completed
+ err = k8sClient.WatchHookUntilReady(time.Duration(timeout)*time.Second, hc.kubeNameSpace, createdHook)
+ if err != nil {
+ // If a hook is failed, check the annotation of the hook to determine whether the hook should be deleted
+ // under failed condition. If so, then clear the corresponding resource object in the hook
+ if err := hc.deleteHookByPolicy(h, release.HookFailed, k8sClient); err != nil {
+ return err
+ }
+ return err
+ }
+ }
+ } else {
+ //Do not handle CRD Hooks
+ }
+ }
+
+ for _, h := range executingHooks {
+ if err := hc.deleteHookByPolicy(h, release.HookSucceeded, k8sClient); err != nil {
+ log.Printf(" Instance: %s, Warning: Error deleting %s hook %s based on delete policy, continue", hc.id, hook, h.Hook.Name)
+ return err
+ }
+ }
+ log.Printf("%d %s hook(s) complete for release %s", len(executingHooks), hook, hc.id)
+ return nil
+}
+
+func (hc *HookClient) deleteHookByPolicy(h *helm.Hook, policy release.HookDeletePolicy, k8sClient KubernetesClient) error {
+ rss := helm.KubernetesResource{
+ GVK: h.KRT.GVK,
+ Name: h.Hook.Name,
+ }
+ if hookHasDeletePolicy(h, policy) {
+ log.Printf(" Instance: %s, Deleting hook %s due to %q policy", hc.id, h.Hook.Name, policy)
+ if errHookDelete := k8sClient.deleteResources(append([]helm.KubernetesResource{}, rss), hc.kubeNameSpace); errHookDelete != nil {
+ if strings.Contains(errHookDelete.Error(), "not found") {
+ return nil
+ } else {
+ log.Printf(" Instance: %s, Warning: hook %s, filePath %s could not be deleted: %s", hc.id, h.Hook.Name, h.KRT.FilePath ,errHookDelete)
+ return errHookDelete
+ }
+ } else {
+ //Verify that the rss is deleted
+ isDeleted := false
+ for !isDeleted {
+ log.Printf(" Instance: %s, Waiting on deleting hook %s for release %s due to %q policy", hc.id, h.Hook.Name, hc.id, policy)
+ if _, err := k8sClient.GetResourceStatus(rss, hc.kubeNameSpace); err != nil {
+ if strings.Contains(err.Error(), "not found") {
+ log.Printf(" Instance: %s, Deleted hook %s for release %s due to %q policy", hc.id, h.Hook.Name, hc.id, policy)
+ return nil
+ } else {
+ isDeleted = true
+ }
+ }
+ time.Sleep(5 * time.Second)
+ }
+ }
+ }
+ return nil
+}
+
+// hookHasDeletePolicy determines whether the defined hook deletion policy matches the hook deletion polices
+// supported by helm. If so, mark the hook as one should be deleted.
+func hookHasDeletePolicy(h *helm.Hook, policy release.HookDeletePolicy) bool {
+ for _, v := range h.Hook.DeletePolicies {
+ if policy == v {
+ return true
+ }
+ }
+ return false
+} \ No newline at end of file
diff --git a/src/k8splugin/internal/app/hook_sorter.go b/src/k8splugin/internal/app/hook_sorter.go
new file mode 100644
index 00000000..fa6a9830
--- /dev/null
+++ b/src/k8splugin/internal/app/hook_sorter.go
@@ -0,0 +1,50 @@
+/*
+Copyright © 2021 Nokia Bell Labs
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+ http://www.apache.org/licenses/LICENSE-2.0
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package app
+
+import (
+ "github.com/onap/multicloud-k8s/src/k8splugin/internal/helm"
+ sortLib "sort"
+)
+
+// sortByHookWeight does an in-place sort of hooks by their supplied weight.
+func sortByHookWeight(hooks []*helm.Hook) []*helm.Hook {
+ hs := newHookWeightSorter(hooks)
+ sortLib.Sort(hs)
+ return hs.hooks
+}
+
+type hookWeightSorter struct {
+ hooks []*helm.Hook
+}
+
+func newHookWeightSorter(h []*helm.Hook) *hookWeightSorter {
+ return &hookWeightSorter{
+ hooks: h,
+ }
+}
+
+func (hs *hookWeightSorter) Len() int { return len(hs.hooks) }
+
+func (hs *hookWeightSorter) Swap(i, j int) {
+ hs.hooks[i], hs.hooks[j] = hs.hooks[j], hs.hooks[i]
+}
+
+func (hs *hookWeightSorter) Less(i, j int) bool {
+ if hs.hooks[i].Hook.Weight == hs.hooks[j].Hook.Weight {
+ return hs.hooks[i].Hook.Name < hs.hooks[j].Hook.Name
+ }
+ return hs.hooks[i].Hook.Weight < hs.hooks[j].Hook.Weight
+}
+
diff --git a/src/k8splugin/internal/app/hook_test.go b/src/k8splugin/internal/app/hook_test.go
new file mode 100644
index 00000000..9c63194e
--- /dev/null
+++ b/src/k8splugin/internal/app/hook_test.go
@@ -0,0 +1,264 @@
+/*
+Copyright © 2021 Nokia Bell Labs.
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+ http://www.apache.org/licenses/LICENSE-2.0
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package app
+
+import (
+ "encoding/base64"
+ "github.com/onap/multicloud-k8s/src/k8splugin/internal/utils"
+ "github.com/onap/multicloud-k8s/src/k8splugin/internal/connection"
+ "github.com/onap/multicloud-k8s/src/k8splugin/internal/helm"
+ "github.com/onap/multicloud-k8s/src/k8splugin/internal/db"
+ "helm.sh/helm/v3/pkg/release"
+ "helm.sh/helm/v3/pkg/time"
+ "io/ioutil"
+ "k8s.io/apimachinery/pkg/runtime/schema"
+ "testing"
+)
+
+func generateHookList() []*helm.Hook {
+ var hookList []*helm.Hook
+ preInstallHook1 := helm.Hook{
+ Hook: release.Hook{
+ Name : "preinstall1",
+ Kind : "Job",
+ Path : "",
+ Manifest : "",
+ Events : []release.HookEvent{release.HookPreInstall},
+ LastRun : release.HookExecution{
+ StartedAt: time.Now(),
+ CompletedAt: time.Now(),
+ Phase: "",
+ },
+ Weight : -5,
+ DeletePolicies : []release.HookDeletePolicy{},
+ },
+ KRT: helm.KubernetesResourceTemplate{
+ GVK: schema.GroupVersionKind{
+ Group: "batch",
+ Version: "v1",
+ Kind: "Job",
+ },
+ FilePath: "../../mock_files/mock_yamls/job.yaml",
+ },
+ }
+ preInstallHook2 := helm.Hook{
+ Hook: release.Hook{
+ Name : "preinstall2",
+ Kind : "Deployment",
+ Path : "",
+ Manifest : "",
+ Events : []release.HookEvent{release.HookPreInstall},
+ LastRun : release.HookExecution{
+ StartedAt: time.Now(),
+ CompletedAt: time.Now(),
+ Phase: "",
+ },
+ Weight : 0,
+ DeletePolicies : []release.HookDeletePolicy{},
+ },
+ KRT: helm.KubernetesResourceTemplate{
+ GVK: schema.GroupVersionKind{
+ Group: "batch",
+ Version: "v1",
+ Kind: "Job",
+ },
+ FilePath: "../../mock_files/mock_yamls/job.yaml",
+ },
+ }
+ postInstallHook := helm.Hook{
+ Hook: release.Hook{
+ Name : "postinstall",
+ Kind : "Job",
+ Path : "",
+ Manifest : "",
+ Events : []release.HookEvent{release.HookPostInstall},
+ LastRun : release.HookExecution{
+ StartedAt: time.Now(),
+ CompletedAt: time.Now(),
+ Phase: "",
+ },
+ Weight : -5,
+ DeletePolicies : []release.HookDeletePolicy{},
+ },
+ KRT: helm.KubernetesResourceTemplate{
+ GVK: schema.GroupVersionKind{
+ Group: "batch",
+ Version: "v1",
+ Kind: "Job",
+ },
+ FilePath: "../../mock_files/mock_yamls/job.yaml",
+ },
+ }
+ preDeleteHook := helm.Hook{
+ Hook: release.Hook{
+ Name : "predelete",
+ Kind : "Job",
+ Path : "",
+ Manifest : "",
+ Events : []release.HookEvent{release.HookPreDelete},
+ LastRun : release.HookExecution{
+ StartedAt: time.Now(),
+ CompletedAt: time.Now(),
+ Phase: "",
+ },
+ Weight : -5,
+ DeletePolicies : []release.HookDeletePolicy{},
+ },
+ KRT: helm.KubernetesResourceTemplate{
+ GVK: schema.GroupVersionKind{
+ Group: "batch",
+ Version: "v1",
+ Kind: "Job",
+ },
+ FilePath: "../../mock_files/mock_yamls/job.yaml",
+ },
+ }
+ postDeleteHook := helm.Hook{
+ Hook: release.Hook{
+ Name : "postdelete",
+ Kind : "Job",
+ Path : "",
+ Manifest : "",
+ Events : []release.HookEvent{release.HookPostDelete},
+ LastRun : release.HookExecution{
+ StartedAt: time.Now(),
+ CompletedAt: time.Now(),
+ Phase: "",
+ },
+ Weight : -5,
+ DeletePolicies : []release.HookDeletePolicy{},
+ },
+ KRT: helm.KubernetesResourceTemplate{
+ GVK: schema.GroupVersionKind{
+ Group: "batch",
+ Version: "v1",
+ Kind: "Job",
+ },
+ FilePath: "../../mock_files/mock_yamls/job.yaml",
+ },
+ }
+ hookList = append(hookList, &preInstallHook2)
+ hookList = append(hookList, &preInstallHook1)
+ hookList = append(hookList, &postInstallHook)
+ hookList = append(hookList, &preDeleteHook)
+ hookList = append(hookList, &postDeleteHook)
+
+ return hookList
+}
+
+func TestGetHookByEvent(t *testing.T) {
+ hookList := generateHookList()
+ hookClient := NewHookClient("test", "test", "rbdef", "instance")
+ t.Run("Get pre-install hook", func(t *testing.T) {
+ preinstallList := hookClient.getHookByEvent(hookList, release.HookPreInstall)
+ if len(preinstallList) != 2 {
+ t.Fatalf("TestGetHookByEvent error: expected=2 preinstall hook, result= %d", len(preinstallList))
+ }
+ if preinstallList[0].Hook.Name != "preinstall2" {
+ t.Fatalf("TestGetHookByEvent error: expect name of 1st preinstall hook is preinstall2, result= %s", preinstallList[0].Hook.Name)
+ }
+ if preinstallList[1].Hook.Name != "preinstall1" {
+ t.Fatalf("TestGetHookByEvent error: expect name of 2nd preinstall hook is preinstall1, result= %s", preinstallList[0].Hook.Name)
+ }
+ })
+ t.Run("Get post-install hook", func(t *testing.T) {
+ postinstallList := hookClient.getHookByEvent(hookList, release.HookPostInstall)
+ if len(postinstallList) != 1 {
+ t.Fatalf("TestGetHookByEvent error: expected=1 postinstall hook, result= %d", len(postinstallList))
+ }
+ if postinstallList[0].Hook.Name != "postinstall" {
+ t.Fatalf("TestGetHookByEvent error: expect name of 1st postinstall hook is postinstall, result= %s", postinstallList[0].Hook.Name)
+ }
+ })
+ t.Run("Get pre-delete hook", func(t *testing.T) {
+ predeleteList := hookClient.getHookByEvent(hookList, release.HookPreDelete)
+ if len(predeleteList) != 1 {
+ t.Fatalf("TestGetHookByEvent error: expected=1 predelete hook, result= %d", len(predeleteList))
+ }
+ if predeleteList[0].Hook.Name != "predelete" {
+ t.Fatalf("TestGetHookByEvent error: expect name of 1st predelete hook is predelete, result= %s", predeleteList[0].Hook.Name)
+ }
+ })
+ t.Run("Get post-delete hook", func(t *testing.T) {
+ postdeleteList := hookClient.getHookByEvent(hookList, release.HookPostDelete)
+ if len(postdeleteList) != 1 {
+ t.Fatalf("TestGetHookByEvent error: expected=1 postdelete hook, result= %d", len(postdeleteList))
+ }
+ if postdeleteList[0].Hook.Name != "postdelete" {
+ t.Fatalf("TestGetHookByEvent error: expect name of 1st postdelete hook is postdelete, result= %s", postdeleteList[0].Hook.Name)
+ }
+ })
+}
+
+func TestShortHook(t *testing.T) {
+ hookList := generateHookList()
+ hookClient := NewHookClient("test", "test", "rbdef", "instance")
+ preinstallList := hookClient.getHookByEvent(hookList, release.HookPreInstall)
+ t.Run("Short pre-install hook", func(t *testing.T) {
+ shortedHooks := sortByHookWeight(preinstallList)
+ if shortedHooks[0].Hook.Name != "preinstall1" {
+ t.Fatalf("TestShortHook error: expect name of 1st preinstall hook is preinstall1, result= %s", preinstallList[0].Hook.Name)
+ }
+ if shortedHooks[1].Hook.Name != "preinstall2" {
+ t.Fatalf("TestShortHook error: expect name of 2nd preinstall hook is preinstall2, result= %s", preinstallList[0].Hook.Name)
+ }
+ })
+}
+
+func TestExecHook(t *testing.T) {
+ hookList := generateHookList()
+ hookClient := NewHookClient("test", "test", "rbdef", "instance")
+ err := LoadMockPlugins(utils.LoadedPlugins)
+ if err != nil {
+ t.Fatalf("LoadMockPlugins returned an error (%s)", err)
+ }
+
+ // Load the mock kube config file into memory
+ fd, err := ioutil.ReadFile("../../mock_files/mock_configs/mock_kube_config")
+ if err != nil {
+ t.Fatal("Unable to read mock_kube_config")
+ }
+ db.DBconn = &db.MockDB{
+ Items: map[string]map[string][]byte{
+ connection.ConnectionKey{CloudRegion: "mock_connection"}.String(): {
+ "metadata": []byte(
+ "{\"cloud-region\":\"mock_connection\"," +
+ "\"cloud-owner\":\"mock_owner\"," +
+ "\"kubeconfig\": \"" + base64.StdEncoding.EncodeToString(fd) + "\"}"),
+ },
+ },
+ }
+
+ k8sClient := KubernetesClient{}
+ err = k8sClient.Init("mock_connection", "test")
+ if err != nil {
+ t.Fatal(err.Error())
+ }
+ err = hookClient.ExecHook(k8sClient, hookList, release.HookPreInstall,10,0, nil)
+ if err != nil {
+ t.Fatal(err.Error())
+ }
+ err = hookClient.ExecHook(k8sClient, hookList, release.HookPostInstall,10,0, nil)
+ if err != nil {
+ t.Fatal(err.Error())
+ }
+ err = hookClient.ExecHook(k8sClient, hookList, release.HookPreDelete,10,0, nil)
+ if err != nil {
+ t.Fatal(err.Error())
+ }
+ err = hookClient.ExecHook(k8sClient, hookList, release.HookPostDelete,10,0, nil)
+ if err != nil {
+ t.Fatal(err.Error())
+ }
+} \ No newline at end of file
diff --git a/src/k8splugin/internal/app/instance.go b/src/k8splugin/internal/app/instance.go
index c1ec35b6..1c9c81a9 100644
--- a/src/k8splugin/internal/app/instance.go
+++ b/src/k8splugin/internal/app/instance.go
@@ -1,6 +1,8 @@
/*
* Copyright 2018 Intel Corporation, Inc
* Copyright © 2021 Samsung Electronics
+ * Copyright © 2021 Orange
+ * Copyright © 2021 Nokia Bell Labs
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -18,16 +20,29 @@
package app
import (
+ "context"
"encoding/json"
+ "log"
+ "strings"
+ "strconv"
+ "time"
+
+ appsv1 "k8s.io/api/apps/v1"
+ batchv1 "k8s.io/api/batch/v1"
+ corev1 "k8s.io/api/core/v1"
+ apiextv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
+ "k8s.io/apimachinery/pkg/runtime"
+ "k8s.io/apimachinery/pkg/runtime/schema"
+ "k8s.io/cli-runtime/pkg/resource"
+
"github.com/onap/multicloud-k8s/src/k8splugin/internal/db"
"github.com/onap/multicloud-k8s/src/k8splugin/internal/helm"
"github.com/onap/multicloud-k8s/src/k8splugin/internal/namegenerator"
"github.com/onap/multicloud-k8s/src/k8splugin/internal/rb"
- "k8s.io/apimachinery/pkg/runtime/schema"
- "log"
- "strings"
+ "github.com/onap/multicloud-k8s/src/k8splugin/internal/statuscheck"
pkgerrors "github.com/pkg/errors"
+ "helm.sh/helm/v3/pkg/release"
)
// InstanceRequest contains the parameters needed for instantiation
@@ -52,6 +67,22 @@ type InstanceResponse struct {
Hooks []*helm.Hook `json:"-"`
}
+// InstanceDbData contains the data to put to Db
+type InstanceDbData struct {
+ ID string `json:"id"`
+ Request InstanceRequest `json:"request"`
+ Namespace string `json:"namespace"`
+ Status string `json:"status"`
+ ReleaseName string `json:"release-name"`
+ Resources []helm.KubernetesResource `json:"resources"`
+ Hooks []*helm.Hook `json:"hooks"`
+ HookProgress string `json:"hook-progress"`
+ PreInstallTimeout int64 `json:"PreInstallTimeout"`
+ PostInstallTimeout int64 `json:"PostInstallTimeout"`
+ PreDeleteTimeout int64 `json:"PreDeleteTimeout"`
+ PostDeleteTimeout int64 `json:"PostDeleteTimeout"`
+}
+
// InstanceMiniResponse contains the response from instantiation
// It does NOT include the created resources.
// Use the regular GET to get the created resources for a particular instance
@@ -74,11 +105,13 @@ type InstanceStatus struct {
type InstanceManager interface {
Create(i InstanceRequest) (InstanceResponse, error)
Get(id string) (InstanceResponse, error)
+ GetFull(id string) (InstanceDbData, error)
Status(id string) (InstanceStatus, error)
Query(id, apiVersion, kind, name, labels string) (InstanceStatus, error)
List(rbname, rbversion, profilename string) ([]InstanceMiniResponse, error)
Find(rbName string, ver string, profile string, labelKeys map[string]string) ([]InstanceMiniResponse, error)
Delete(id string) error
+ RecoverCreateOrDelete(id string) error
}
// InstanceKey is used as the primary key in the db
@@ -100,13 +133,16 @@ func (dk InstanceKey) String() string {
// InstanceClient implements the InstanceManager interface
// It will also be used to maintain some localized state
type InstanceClient struct {
- storeName string
- tagInst string
+ storeName string
+ tagInst string
}
// NewInstanceClient returns an instance of the InstanceClient
// which implements the InstanceManager
func NewInstanceClient() *InstanceClient {
+ //TODO: Call RecoverCreateOrDelete to perform recovery when the plugin restart.
+ //Not implement here now because We have issue with current test set (date race)
+
return &InstanceClient{
storeName: "rbdef",
tagInst: "instance",
@@ -125,7 +161,6 @@ func resolveModelFromInstance(instanceID string) (rbName, rbVersion, profileName
// Create an instance of rb on the cluster in the database
func (v *InstanceClient) Create(i InstanceRequest) (InstanceResponse, error) {
-
// Name is required
if i.RBName == "" || i.RBVersion == "" || i.ProfileName == "" || i.CloudRegion == "" {
return InstanceResponse{},
@@ -141,10 +176,52 @@ func (v *InstanceClient) Create(i InstanceRequest) (InstanceResponse, error) {
//Convert override values from map to array of strings of the following format
//foo=bar
overrideValues := []string{}
+ var preInstallTimeOut, postInstallTimeOut, preDeleteTimeout, postDeleteTimeout int64
if i.OverrideValues != nil {
+ preInstallTimeOutStr, ok := i.OverrideValues["k8s-rb-instance-pre-install-timeout"]
+ if !ok {
+ preInstallTimeOutStr = "60"
+ }
+ preInstallTimeOut,err = strconv.ParseInt(preInstallTimeOutStr, 10, 64)
+ if err != nil {
+ return InstanceResponse{}, pkgerrors.Wrap(err, "Error parsing k8s-rb-instance-pre-install-timeout")
+ }
+
+ postInstallTimeOutStr, ok := i.OverrideValues["k8s-rb-instance-post-install-timeout"]
+ if !ok {
+ postInstallTimeOutStr = "600"
+ }
+ postInstallTimeOut,err = strconv.ParseInt(postInstallTimeOutStr, 10, 64)
+ if err != nil {
+ return InstanceResponse{}, pkgerrors.Wrap(err, "Error parsing k8s-rb-instance-post-install-timeout")
+ }
+
+ preDeleteTimeOutStr, ok := i.OverrideValues["k8s-rb-instance-pre-delete-timeout"]
+ if !ok {
+ preDeleteTimeOutStr = "60"
+ }
+ preDeleteTimeout,err = strconv.ParseInt(preDeleteTimeOutStr, 10, 64)
+ if err != nil {
+ return InstanceResponse{}, pkgerrors.Wrap(err, "Error parsing k8s-rb-instance-pre-delete-timeout")
+ }
+
+ postDeleteTimeOutStr, ok := i.OverrideValues["k8s-rb-instance-post-delete-timeout"]
+ if !ok {
+ postDeleteTimeOutStr = "600"
+ }
+ postDeleteTimeout,err = strconv.ParseInt(postDeleteTimeOutStr, 10, 64)
+ if err != nil {
+ return InstanceResponse{}, pkgerrors.Wrap(err, "Error parsing k8s-rb-instance-post-delete-timeout")
+ }
+
for k, v := range i.OverrideValues {
overrideValues = append(overrideValues, k+"="+v)
}
+ } else {
+ preInstallTimeOut = 60
+ postInstallTimeOut = 600
+ preDeleteTimeout = 60
+ postDeleteTimeout = 600
}
//Execute the kubernetes create command
@@ -162,11 +239,93 @@ func (v *InstanceClient) Create(i InstanceRequest) (InstanceResponse, error) {
return InstanceResponse{}, pkgerrors.Wrap(err, "Getting CloudRegion Information")
}
- createdResources, err := k8sClient.createResources(sortedTemplates, profile.Namespace)
+ log.Printf("Main rss info")
+ for _,t := range sortedTemplates {
+ log.Printf(" Path: %s", t.FilePath)
+ log.Printf(" Kind: %s", t.GVK.Kind)
+ }
+
+ log.Printf("Hook info")
+ for _,h := range hookList {
+ log.Printf(" Name: %s", h.Hook.Name)
+ log.Printf(" Events: %s", h.Hook.Events)
+ log.Printf(" Weight: %d", h.Hook.Weight)
+ log.Printf(" DeletePolicies: %s", h.Hook.DeletePolicies)
+ }
+ dbData := InstanceDbData{
+ ID: id,
+ Request: i,
+ Namespace: profile.Namespace,
+ ReleaseName: releaseName,
+ Status: "PRE-INSTALL",
+ Resources: []helm.KubernetesResource{},
+ Hooks: hookList,
+ HookProgress: "",
+ PreInstallTimeout: preInstallTimeOut,
+ PostInstallTimeout: postInstallTimeOut,
+ PreDeleteTimeout: preDeleteTimeout,
+ PostDeleteTimeout: postDeleteTimeout,
+ }
+
+ key := InstanceKey{
+ ID: id,
+ }
+ err = db.DBconn.Create(v.storeName, key, v.tagInst, dbData)
+ if err != nil {
+ return InstanceResponse{}, pkgerrors.Wrap(err, "Creating Instance DB Entry")
+ }
+
+ err = k8sClient.ensureNamespace(profile.Namespace)
+ if err != nil {
+ return InstanceResponse{}, pkgerrors.Wrap(err, "Creating Namespace")
+ }
+
+ hookClient := NewHookClient(profile.Namespace, id, v.storeName, v.tagInst)
+ if len(hookClient.getHookByEvent(hookList, release.HookPreInstall)) != 0 {
+ err = hookClient.ExecHook(k8sClient, hookList, release.HookPreInstall, preInstallTimeOut, 0, &dbData)
+ if err != nil {
+ log.Printf("Error running preinstall hooks for release %s, Error: %s. Stop here", releaseName, err)
+ err2 := db.DBconn.Delete(v.storeName, key, v.tagInst)
+ if err2 != nil {
+ log.Printf("Error cleaning failed instance in DB, please check DB.")
+ }
+ return InstanceResponse{}, pkgerrors.Wrap(err, "Error running preinstall hooks")
+ }
+ }
+
+ dbData.Status = "CREATING"
+ err = db.DBconn.Update(v.storeName, key, v.tagInst, dbData)
+ if err != nil {
+ err = db.DBconn.Delete(v.storeName, key, v.tagInst)
+ if err != nil {
+ log.Printf("Delete Instance DB Entry for release %s has error.", releaseName)
+ }
+ return InstanceResponse{}, pkgerrors.Wrap(err, "Update Instance DB Entry")
+ }
+
+ //Main rss creation is supposed to be very quick -> no need to support recover for main rss
+ createdResources, err := k8sClient.createResources(sortedTemplates, profile.Namespace);
if err != nil {
+ if len(createdResources) > 0 {
+ log.Printf("[Instance] Reverting created resources on Error: %s", err.Error())
+ k8sClient.deleteResources(createdResources, profile.Namespace)
+ }
+ log.Printf(" Instance: %s, Main rss are failed, skip post-install and remove instance in DB", id)
+ //main rss creation failed -> remove instance in DB
+ err = db.DBconn.Delete(v.storeName, key, v.tagInst)
+ if err != nil {
+ log.Printf("Delete Instance DB Entry for release %s has error.", releaseName)
+ }
return InstanceResponse{}, pkgerrors.Wrap(err, "Create Kubernetes Resources")
}
+ dbData.Status = "CREATED"
+ dbData.Resources = createdResources
+ err = db.DBconn.Update(v.storeName, key, v.tagInst, dbData)
+ if err != nil {
+ return InstanceResponse{}, pkgerrors.Wrap(err, "Update Instance DB Entry")
+ }
+
//Compose the return response
resp := InstanceResponse{
ID: id,
@@ -177,15 +336,71 @@ func (v *InstanceClient) Create(i InstanceRequest) (InstanceResponse, error) {
Hooks: hookList,
}
+ if len(hookClient.getHookByEvent(hookList, release.HookPostInstall)) != 0 {
+ go func() {
+ dbData.Status = "POST-INSTALL"
+ dbData.HookProgress = ""
+ err = hookClient.ExecHook(k8sClient, hookList, release.HookPostInstall, postInstallTimeOut, 0, &dbData)
+ if err != nil {
+ dbData.Status = "POST-INSTALL-FAILED"
+ log.Printf(" Instance: %s, Error running postinstall hooks error: %s", id, err)
+ } else {
+ dbData.Status = "DONE"
+ }
+ err = db.DBconn.Update(v.storeName, key, v.tagInst, dbData)
+ if err != nil {
+ log.Printf("Update Instance DB Entry for release %s has error.", releaseName)
+ }
+ }()
+ } else {
+ dbData.Status = "DONE"
+ err = db.DBconn.Update(v.storeName, key, v.tagInst, dbData)
+ if err != nil {
+ log.Printf("Update Instance DB Entry for release %s has error.", releaseName)
+ }
+ }
+
+ return resp, nil
+}
+
+// Get returns the full instance for corresponding ID
+func (v *InstanceClient) GetFull(id string) (InstanceDbData, error) {
key := InstanceKey{
ID: id,
}
- err = db.DBconn.Create(v.storeName, key, v.tagInst, resp)
+ value, err := db.DBconn.Read(v.storeName, key, v.tagInst)
if err != nil {
- return InstanceResponse{}, pkgerrors.Wrap(err, "Creating Instance DB Entry")
+ return InstanceDbData{}, pkgerrors.Wrap(err, "Get Instance")
}
- return resp, nil
+ //value is a byte array
+ if value != nil {
+ resp := InstanceDbData{}
+ err = db.DBconn.Unmarshal(value, &resp)
+ if err != nil {
+ return InstanceDbData{}, pkgerrors.Wrap(err, "Unmarshaling Instance Value")
+ }
+ //In case that we are communicating with an old db, some field will missing -> fill it with default value
+ if resp.Status == "" {
+ //For instance that is in Db -> consider it's DONE
+ resp.Status = "DONE"
+ }
+ if resp.PreInstallTimeout == 0 {
+ resp.PreInstallTimeout = 60
+ }
+ if resp.PostInstallTimeout == 0 {
+ resp.PostInstallTimeout = 600
+ }
+ if resp.PreDeleteTimeout == 0 {
+ resp.PreInstallTimeout = 60
+ }
+ if resp.PostDeleteTimeout == 0 {
+ resp.PostDeleteTimeout = 600
+ }
+ return resp, nil
+ }
+
+ return InstanceDbData{}, pkgerrors.New("Error getting Instance")
}
// Get returns the instance for corresponding ID
@@ -214,6 +429,7 @@ func (v *InstanceClient) Get(id string) (InstanceResponse, error) {
// Query returns state of instance's filtered resources
func (v *InstanceClient) Query(id, apiVersion, kind, name, labels string) (InstanceStatus, error) {
+ queryClient := NewQueryClient()
//Read the status from the DB
key := InstanceKey{
ID: id,
@@ -231,54 +447,21 @@ func (v *InstanceClient) Query(id, apiVersion, kind, name, labels string) (Insta
return InstanceStatus{}, pkgerrors.Wrap(err, "Unmarshaling Instance Value")
}
- k8sClient := KubernetesClient{}
- err = k8sClient.Init(resResp.Request.CloudRegion, id)
+ resources, err := queryClient.Query(resResp.Namespace, resResp.Request.CloudRegion, apiVersion, kind, name, labels, id)
if err != nil {
- return InstanceStatus{}, pkgerrors.Wrap(err, "Getting CloudRegion Information")
- }
-
- var resourcesStatus []ResourceStatus
- if labels != "" {
- resList, err := k8sClient.queryResources(apiVersion, kind, labels, resResp.Namespace)
- if err != nil {
- return InstanceStatus{}, pkgerrors.Wrap(err, "Querying Resources")
- }
- // If user specifies both label and name, we want to pick up only single resource from these matching label
- if name != "" {
- //Assigning 0-length, because we may actually not find matching name
- resourcesStatus = make([]ResourceStatus, 0)
- for _, res := range resList {
- if res.Name == name {
- resourcesStatus = append(resourcesStatus, res)
- break
- }
- }
- } else {
- resourcesStatus = resList
- }
- } else if name != "" {
- resIdentifier := helm.KubernetesResource{
- Name: name,
- GVK: schema.FromAPIVersionAndKind(apiVersion, kind),
- }
- res, err := k8sClient.GetResourceStatus(resIdentifier, resResp.Namespace)
- if err != nil {
- return InstanceStatus{}, pkgerrors.Wrap(err, "Querying Resource")
- }
- resourcesStatus = []ResourceStatus{res}
+ return InstanceStatus{}, pkgerrors.Wrap(err, "Querying Resources")
}
resp := InstanceStatus{
Request: resResp.Request,
- ResourceCount: int32(len(resourcesStatus)),
- ResourcesStatus: resourcesStatus,
+ ResourceCount: resources.ResourceCount,
+ ResourcesStatus: resources.ResourcesStatus,
}
return resp, nil
}
// Status returns the status for the instance
func (v *InstanceClient) Status(id string) (InstanceStatus, error) {
-
//Read the status from the DB
key := InstanceKey{
ID: id,
@@ -294,7 +477,7 @@ func (v *InstanceClient) Status(id string) (InstanceStatus, error) {
return InstanceStatus{}, pkgerrors.New("Status is not available")
}
- resResp := InstanceResponse{}
+ resResp := InstanceDbData{}
err = db.DBconn.Unmarshal(value, &resResp)
if err != nil {
return InstanceStatus{}, pkgerrors.Wrap(err, "Unmarshaling Instance Value")
@@ -312,25 +495,36 @@ func (v *InstanceClient) Status(id string) (InstanceStatus, error) {
cumulatedErrorMsg = append(cumulatedErrorMsg, err.Error())
}
+ isReady := true
generalStatus := make([]ResourceStatus, 0, len(resResp.Resources))
Main:
- for _, resource := range resResp.Resources {
+ for _, oneResource := range resResp.Resources {
for _, pod := range podsStatus {
- if resource.GVK == pod.GVK && resource.Name == pod.Name {
+ if oneResource.GVK == pod.GVK && oneResource.Name == pod.Name {
continue Main //Don't double check pods if someone decided to define pod explicitly in helm chart
}
}
- status, err := k8sClient.GetResourceStatus(resource, resResp.Namespace)
+ status, err := k8sClient.GetResourceStatus(oneResource, resResp.Namespace)
if err != nil {
cumulatedErrorMsg = append(cumulatedErrorMsg, err.Error())
+ isReady = false
} else {
generalStatus = append(generalStatus, status)
+ ready, err := v.checkRssStatus(oneResource, k8sClient, resResp.Namespace, status)
+
+ if !ready || err != nil {
+ isReady = false
+ if err != nil {
+ cumulatedErrorMsg = append(cumulatedErrorMsg, err.Error())
+ }
+ }
}
}
+ //We still need to iterate through rss list even the status is not DONE, to gather status of rss + pod for the response
resp := InstanceStatus{
Request: resResp.Request,
ResourceCount: int32(len(generalStatus) + len(podsStatus)),
- Ready: false, //FIXME To determine readiness, some parsing of status fields is necessary
+ Ready: isReady && resResp.Status == "DONE",
ResourcesStatus: append(generalStatus, podsStatus...),
}
@@ -344,6 +538,68 @@ Main:
return resp, nil
}
+func (v *InstanceClient) checkRssStatus(rss helm.KubernetesResource, k8sClient KubernetesClient, namespace string, status ResourceStatus) (bool, error) {
+ readyChecker := statuscheck.NewReadyChecker(k8sClient.clientSet, statuscheck.PausedAsReady(true), statuscheck.CheckJobs(true))
+ ctx, cancel := context.WithTimeout(context.Background(), time.Duration(60)*time.Second)
+ defer cancel()
+
+ apiVersion, kind := rss.GVK.ToAPIVersionAndKind()
+ log.Printf("apiVersion: %s, Kind: %s", apiVersion, kind)
+ restClient, err := k8sClient.getRestApi(apiVersion)
+ if err != nil {
+ return false, err
+ }
+ mapper := k8sClient.GetMapper()
+ mapping, err := mapper.RESTMapping(schema.GroupKind{
+ Group: rss.GVK.Group,
+ Kind: rss.GVK.Kind,
+ }, rss.GVK.Version)
+ resourceInfo := resource.Info{
+ Client: restClient,
+ Mapping: mapping,
+ Namespace: namespace,
+ Name: rss.Name,
+ Source: "",
+ Object: nil,
+ ResourceVersion: "",
+ }
+
+ var parsedRes runtime.Object
+ //TODO: Should we care about different api version for a same kind?
+ switch kind {
+ case "Pod":
+ parsedRes = new(corev1.Pod)
+ case "Job":
+ parsedRes = new(batchv1.Job)
+ case "Deployment":
+ parsedRes = new(appsv1.Deployment)
+ case "PersistentVolumeClaim":
+ parsedRes = new(corev1.PersistentVolume)
+ case "Service":
+ parsedRes = new(corev1.Service)
+ case "DaemonSet":
+ parsedRes = new(appsv1.DaemonSet)
+ case "CustomResourceDefinition":
+ parsedRes = new(apiextv1.CustomResourceDefinition)
+ case "StatefulSet":
+ parsedRes = new(appsv1.StatefulSet)
+ case "ReplicationController":
+ parsedRes = new(corev1.ReplicationController)
+ case "ReplicaSet":
+ parsedRes = new(appsv1.ReplicaSet)
+ default:
+ //For not listed resource, consider ready
+ return true, nil
+ }
+ err = runtime.DefaultUnstructuredConverter.FromUnstructured(status.Status.Object, parsedRes)
+ if err != nil {
+ return false, err
+ }
+ resourceInfo.Object = parsedRes
+ ready, err := readyChecker.IsReady(ctx, &resourceInfo)
+ return ready, err
+}
+
// List returns the instance for corresponding ID
// Empty string returns all
func (v *InstanceClient) List(rbname, rbversion, profilename string) ([]InstanceMiniResponse, error) {
@@ -358,7 +614,7 @@ func (v *InstanceClient) List(rbname, rbversion, profilename string) ([]Instance
for key, value := range dbres {
//value is a byte array
if value != nil {
- resp := InstanceResponse{}
+ resp := InstanceDbData{}
err = db.DBconn.Unmarshal(value, &resp)
if err != nil {
log.Printf("[Instance] Error: %s Unmarshaling Instance: %s", err.Error(), key)
@@ -385,6 +641,11 @@ func (v *InstanceClient) List(rbname, rbversion, profilename string) ([]Instance
continue
}
+ if resp.Status == "PRE-INSTALL" {
+ //DO not add instance which is in pre-install phase
+ continue
+ }
+
results = append(results, miniresp)
}
}
@@ -423,7 +684,6 @@ func (v *InstanceClient) Find(rbName string, version string, profile string, lab
if add {
ret = append(ret, resp)
}
-
}
return ret, nil
@@ -431,29 +691,249 @@ func (v *InstanceClient) Find(rbName string, version string, profile string, lab
// Delete the Instance from database
func (v *InstanceClient) Delete(id string) error {
- inst, err := v.Get(id)
+ inst, err := v.GetFull(id)
if err != nil {
return pkgerrors.Wrap(err, "Error getting Instance")
}
+ key := InstanceKey{
+ ID: id,
+ }
+ if inst.Status == "DELETED" {
+ //The instance is deleted when the plugin comes back -> just remove from Db
+ err = db.DBconn.Delete(v.storeName, key, v.tagInst)
+ if err != nil {
+ log.Printf("Delete Instance DB Entry for release %s has error.", inst.ReleaseName)
+ }
+ return nil
+ } else if inst.Status != "DONE"{
+ //Recover is ongoing, do nothing here
+ return nil
+ }
k8sClient := KubernetesClient{}
err = k8sClient.Init(inst.Request.CloudRegion, inst.ID)
if err != nil {
return pkgerrors.Wrap(err, "Getting CloudRegion Information")
}
+ inst.Status = "PRE-DELETE"
+ inst.HookProgress = ""
+ err = db.DBconn.Update(v.storeName, key, v.tagInst, inst)
+ if err != nil {
+ log.Printf("Update Instance DB Entry for release %s has error.", inst.ReleaseName)
+ }
+
+ hookClient := NewHookClient(inst.Namespace, id, v.storeName, v.tagInst)
+ if len(hookClient.getHookByEvent(inst.Hooks, release.HookPreDelete)) != 0 {
+ err = hookClient.ExecHook(k8sClient, inst.Hooks, release.HookPreDelete, inst.PreDeleteTimeout, 0, &inst)
+ if err != nil {
+ log.Printf(" Instance: %s, Error running pre-delete hooks error: %s", id, err)
+ inst.Status = "PRE-DELETE-FAILED"
+ err2 := db.DBconn.Update(v.storeName, key, v.tagInst, inst)
+ if err2 != nil {
+ log.Printf("Update Instance DB Entry for release %s has error.", inst.ReleaseName)
+ }
+ return pkgerrors.Wrap(err, "Error running pre-delete hooks")
+ }
+ }
+
+ inst.Status = "DELETING"
+ err = db.DBconn.Update(v.storeName, key, v.tagInst, inst)
+ if err != nil {
+ log.Printf("Update Instance DB Entry for release %s has error.", inst.ReleaseName)
+ }
err = k8sClient.deleteResources(inst.Resources, inst.Namespace)
if err != nil {
return pkgerrors.Wrap(err, "Deleting Instance Resources")
}
+ if len(hookClient.getHookByEvent(inst.Hooks, release.HookPostDelete)) != 0 {
+ go func() {
+ inst.HookProgress = ""
+ if err := v.runPostDelete(k8sClient, hookClient, &inst, 0, true); err != nil {
+ log.Printf(err.Error())
+ }
+ }()
+ } else {
+ err = db.DBconn.Delete(v.storeName, key, v.tagInst)
+ if err != nil {
+ return pkgerrors.Wrap(err, "Delete Instance")
+ }
+ }
+ return nil
+}
+
+//Continue the instantiation
+func (v *InstanceClient) RecoverCreateOrDelete(id string) error {
+ instance, err := v.GetFull(id)
+ if err != nil {
+ return pkgerrors.Wrap(err, "Error getting instance " + id + ", skip this instance. Error detail")
+ }
+ log.Printf("Instance " + id + ", status: " + instance.Status + ", HookProgress: " + instance.HookProgress)
+ //have to resolve again template for this instance because all templates are in /tmp -> will be deleted when container restarts
+ overrideValues := []string{}
+ if instance.Request.OverrideValues != nil {
+ for k, v := range instance.Request.OverrideValues {
+ overrideValues = append(overrideValues, k + "=" + v)
+ }
+ }
key := InstanceKey{
ID: id,
}
- err = db.DBconn.Delete(v.storeName, key, v.tagInst)
+ log.Printf(" Resolving template for release %s", instance.Request.ReleaseName)
+ _, hookList, _, err := rb.NewProfileClient().Resolve(instance.Request.RBName, instance.Request.RBVersion, instance.Request.ProfileName, overrideValues, instance.Request.ReleaseName)
+ instance.Hooks = hookList
+ err = db.DBconn.Update(v.storeName, key, v.tagInst, instance)
+ if err != nil {
+ return pkgerrors.Wrap(err, "Update Instance DB Entry")
+ }
+
+ if strings.Contains(instance.Status, "FAILED"){
+ log.Printf(" This instance has failed during instantiation, not going to recover")
+ return nil
+ } else if !strings.Contains(instance.Status, "-INSTALL") && !strings.Contains(instance.Status, "-DELETE") {
+ log.Printf(" This instance is not in hook state, not going to recover")
+ return nil
+ }
+
+ splitHookProgress := strings.Split(instance.HookProgress,"/")
+ completedHooks,err := strconv.Atoi(splitHookProgress[0])
+ if err != nil {
+ return pkgerrors.Wrap(err, "Error getting completed PRE-INSTALL hooks for instance " + instance.ID + ", skip. Error detail")
+ }
+
+ //we can add an option to delete instances that will not be recovered from database to clean the db
+ if (instance.Status != "POST-INSTALL") && (instance.Status != "PRE-DELETE") && (instance.Status != "POST-DELETE") {
+ if instance.Status == "PRE-INSTALL" {
+ //Plugin quits during pre-install hooks -> Will do nothing because from SO point of view, there's no instance ID and will be reported as fail and be rolled back
+ log.Printf(" The plugin quits during pre-install hook of this instance, not going to recover")
+ }
+ return nil
+ }
+ k8sClient := KubernetesClient{}
+ err = k8sClient.Init(instance.Request.CloudRegion, id)
+ if err != nil {
+ log.Printf(" Error getting CloudRegion %s", instance.Request.CloudRegion)
+ return nil
+ }
+ hookClient := NewHookClient(instance.Namespace, id, v.storeName, v.tagInst)
+ switch instance.Status {
+ case "POST-INSTALL":
+ //Plugin quits during post-install hooks -> continue
+ go func() {
+ log.Printf(" The plugin quits during post-install hook of this instance, continue post-install hook")
+ err = hookClient.ExecHook(k8sClient, instance.Hooks, release.HookPostInstall, instance.PostInstallTimeout, completedHooks, &instance)
+ log.Printf("dbData.HookProgress %s", instance.HookProgress)
+ if err != nil {
+ instance.Status = "POST-INSTALL-FAILED"
+ log.Printf(" Instance: %s, Error running postinstall hooks error: %s", id, err)
+ } else {
+ instance.Status = "DONE"
+ }
+ err = db.DBconn.Update(v.storeName, key, v.tagInst, instance)
+ if err != nil {
+ log.Printf("Update Instance DB Entry for release %s has error.", instance.ReleaseName)
+ }
+ }()
+ case "PRE-DELETE":
+ //Plugin quits during pre-delete hooks -> This already effects the instance -> should continue the deletion
+ go func() {
+ log.Printf(" The plugin quits during pre-delete hook of this instance, continue pre-delete hook")
+ err = hookClient.ExecHook(k8sClient, instance.Hooks, release.HookPreDelete, instance.PreDeleteTimeout, completedHooks, &instance)
+ if err != nil {
+ log.Printf(" Instance: %s, Error running pre-delete hooks error: %s", id, err)
+ instance.Status = "PRE-DELETE-FAILED"
+ err = db.DBconn.Update(v.storeName, key, v.tagInst, instance)
+ if err != nil {
+ log.Printf("Update Instance DB Entry for release %s has error.", instance.ReleaseName)
+ }
+ return
+ }
+
+ err = k8sClient.deleteResources(instance.Resources, instance.Namespace)
+ if err != nil {
+ log.Printf(" Error running deleting instance resources, error: %s", err)
+ return
+ }
+ //will not delete the instance in Db to avoid error when SO call delete again and there is not instance in DB
+ //the instance in DB will be deleted when SO call delete again.
+ instance.HookProgress = ""
+ if err := v.runPostDelete(k8sClient, hookClient, &instance, 0, false); err != nil {
+ log.Printf(err.Error())
+ }
+ }()
+ case "POST-DELETE":
+ //Plugin quits during post-delete hooks -> continue
+ go func() {
+ log.Printf(" The plugin quits during post-delete hook of this instance, continue post-delete hook")
+ if err := v.runPostDelete(k8sClient, hookClient, &instance, completedHooks, true); err != nil {
+ log.Printf(err.Error())
+ }
+ }()
+ default:
+ log.Printf(" This instance is not in hook state, not going to recover")
+ }
+
+ return nil
+}
+
+func (v *InstanceClient) runPostDelete(k8sClient KubernetesClient, hookClient *HookClient, instance *InstanceDbData, startIndex int, clearDb bool) error {
+ key := InstanceKey{
+ ID: instance.ID,
+ }
+ instance.Status = "POST-DELETE"
+ err := db.DBconn.Update(v.storeName, key, v.tagInst, instance)
if err != nil {
- return pkgerrors.Wrap(err, "Delete Instance")
+ log.Printf("Update Instance DB Entry for release %s has error.", instance.ReleaseName)
+ }
+ err = hookClient.ExecHook(k8sClient, instance.Hooks, release.HookPostDelete, instance.PostDeleteTimeout, startIndex, instance)
+ if err != nil {
+ //If this case happen, user should clean the cluster
+ log.Printf(" Instance: %s, Error running post-delete hooks error: %s", instance.ID, err)
+ instance.Status = "POST-DELETE-FAILED"
+ err2 := db.DBconn.Update(v.storeName, key, v.tagInst, instance)
+ if err2 != nil {
+ log.Printf("Update Instance DB Entry for release %s has error.", instance.ReleaseName)
+ return pkgerrors.Wrap(err2, "Delete Instance DB Entry")
+ }
+ return pkgerrors.Wrap(err, "Error running post-delete hooks")
}
+ if clearDb {
+ err = db.DBconn.Delete(v.storeName, key, v.tagInst)
+ if err != nil {
+ log.Printf("Delete Instance DB Entry for release %s has error.", instance.ReleaseName)
+ return pkgerrors.Wrap(err, "Delete Instance DB Entry")
+ }
+ } else {
+ instance.Status = "DELETED"
+ err := db.DBconn.Update(v.storeName, key, v.tagInst, instance)
+ if err != nil {
+ log.Printf("Update Instance DB Entry for release %s has error.", instance.ReleaseName)
+ return pkgerrors.Wrap(err, "Update Instance DB Entry")
+ }
+ }
+
+ go func() {
+ //Clear all hook rss that does not have delete-on-success deletion policy
+ log.Printf("Clean leftover hook resource")
+ var remainHookRss []helm.KubernetesResource
+ for _, h := range instance.Hooks {
+ res := helm.KubernetesResource{
+ GVK: h.KRT.GVK,
+ Name: h.Hook.Name,
+ }
+ if _, err := k8sClient.GetResourceStatus(res, hookClient.kubeNameSpace); err == nil {
+ remainHookRss = append(remainHookRss, res)
+ log.Printf(" Rss %s will be deleted.", res.Name)
+ }
+ }
+ if len(remainHookRss) > 0 {
+ err = k8sClient.deleteResources(remainHookRss, hookClient.kubeNameSpace)
+ if err != nil {
+ log.Printf("Error cleaning Hook Rss, please do it manually if needed. Error: %s", err.Error())
+ }
+ }
+ }()
return nil
}
diff --git a/src/k8splugin/internal/app/instance_test.go b/src/k8splugin/internal/app/instance_test.go
index 2711a52f..890c4c99 100644
--- a/src/k8splugin/internal/app/instance_test.go
+++ b/src/k8splugin/internal/app/instance_test.go
@@ -1,5 +1,6 @@
/*
Copyright 2018 Intel Corporation.
+Copyright © 2021 Nokia Bell Labs.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
@@ -15,13 +16,13 @@ package app
import (
"encoding/base64"
+ "github.com/onap/multicloud-k8s/src/k8splugin/internal/utils"
"io/ioutil"
"log"
"reflect"
"sort"
"testing"
- utils "github.com/onap/multicloud-k8s/src/k8splugin/internal"
"github.com/onap/multicloud-k8s/src/k8splugin/internal/connection"
"github.com/onap/multicloud-k8s/src/k8splugin/internal/db"
"github.com/onap/multicloud-k8s/src/k8splugin/internal/helm"
@@ -794,3 +795,99 @@ func TestInstanceDelete(t *testing.T) {
}
})
}
+
+//TODO: add a test when pre-hook is failed (if possible)
+func TestInstanceWithHookCreate(t *testing.T) {
+ err := LoadMockPlugins(utils.LoadedPlugins)
+ if err != nil {
+ t.Fatalf("LoadMockPlugins returned an error (%s)", err)
+ }
+
+ // Load the mock kube config file into memory
+ fd, err := ioutil.ReadFile("../../mock_files/mock_configs/mock_kube_config")
+ if err != nil {
+ t.Fatal("Unable to read mock_kube_config")
+ }
+
+ t.Run("Successfully create Instance With Hook", func(t *testing.T) {
+ db.DBconn = &db.MockDB{
+ Items: map[string]map[string][]byte{
+ rb.ProfileKey{RBName: "test-rbdef-hook", RBVersion: "v1",
+ ProfileName: "profile1"}.String(): {
+ "profilemetadata": []byte(
+ "{\"profile-name\":\"profile1\"," +
+ "\"release-name\":\"testprofilereleasename\"," +
+ "\"namespace\":\"testnamespace\"," +
+ "\"rb-name\":\"test-rbdef\"," +
+ "\"rb-version\":\"v1\"," +
+ "\"kubernetesversion\":\"1.12.3\"}"),
+ // base64 encoding of vagrant/tests/vnfs/testrb/helm/profile
+ "profilecontent": []byte("H4sICCVd3FwAA3Byb2ZpbGUxLnRhcgDt1NEKgjAUxvFd7ylG98aWO" +
+ "sGXiYELxLRwJvj2rbyoIPDGiuD/uzmwM9iB7Vvruvrgw7CdXHsUn6Ejm2W3aopcP9eZL" +
+ "YRJM1voPN+ZndAm16kVSn9onheXMLheKeGqfdM0rq07/3bfUv9PJUkiR9+H+tSVajRym" +
+ "M6+lEqN7njxoVSbU+z2deX388r9nWzkr8fGSt5d79pnLOZfm0f+dRrzb7P4DZD/LyDJA" +
+ "AAAAAAAAAAAAAAA/+0Ksq1N5QAoAAA="),
+ },
+ rb.DefinitionKey{RBName: "test-rbdef-hook", RBVersion: "v1"}.String(): {
+ "defmetadata": []byte(
+ "{\"rb-name\":\"test-rbdef-hook\"," +
+ "\"rb-version\":\"v1\"," +
+ "\"chart-name\":\"test\"," +
+ "\"description\":\"testresourcebundle\"}"),
+ // base64 encoding of test helm package with hooks inside
+ "defcontent": []byte("H4sICE+Q8WAAA3Rlc3QudGFyAO1aW2+jOBTOM7/CYl6HlEsIq7xV24" +
+ "fVqluNdlYjrVajkQMnhS1gFjvZjbr972MDJYTQwGhMMmn9qVUaYx/o8TnfuRgGlF1NxoX" +
+ "J4Xmu+LQ812x+PmNiOXzEMe3ZfD4xLdO23QlyR36uAmvKcI7QhIXs6Ly+6xcKJvZ/g+M1" +
+ "0OkWJ/EY9xAbPJ/PXtx/m9tGtf+WOePjlu143gSZYzxMG298/9+hG1jhdcxQaQRoRXKU5" +
+ "WBEKVdMHEM+1d6hP8KIIv6D0Z/Xv90afE6CGYMAraIYxIQb8GOcAxeSR3gZczmMoCWgDF" +
+ "PKp0Up/8pCQAySLMbc6KYaDpIoXWgIhYQ8fAkgBgZfMhJH/naBdDFo0LXvAwQQvOey+E3" +
+ "BKIb9HDCLSKqfW3mvAIX//xzinI3m/r3+b7nzZ/83Z57gf9tyHeX/pwDOok+QU+5NC7Sx" +
+ "NJxl9VfdmppTU9cCoH4eZawYvEa/QJwgX1hMwRXCgKL0HiWcQyI/JutAS3ECi+KCtnkWV" +
+ "sjSzv3fKrRR+H/NyuNkgoPyv5npzRzxOxP+b9uOyv9Ogdb+BxgSklKQGg36+N+zZ7v9tw" +
+ "X/u3xM8f8p0OR/Tv70igeBhygNFuimMIWPwLQEGA4wwyJZK7n98RFNf+cZG6YwveMj6On" +
+ "JqE2nmkUz7POp+uPj3tRi+OlJ57NivISYCqlI3LtPLM3AF5Mpn+EzkpcLeSLqh7cNSYNk" +
+ "oToTraQ0/kWBeE/gQJH80apHFPBJynCUcuU+jxiV9uortfgowfdCV8s13S7Jf3p9gbKAJ" +
+ "8mI5WuoxxjbtkZ8kiRY7NlfOg31z9+y/y3/zwhlRpmLG3+TpRwW6PF/25l7Vf5nWZaIE9" +
+ "ac/6H8/xRo+v9SuNKOAH4ly4Gu37IaSy4DdEjHaUpYUQNWi/WQZ6VTGl6JAlFfoMaaw+v" +
+ "GvxDdh4xP042f9I7r1c3KYlQvn+pT2SMpqtbpYcmK/kf/rAkTD1wT1RL7D2S1uo2SiC2Q" +
+ "I490OjSyzz2Up+fwISc+UHq324kGaeQg7J59qOrtO9jUdHRIXDvqojFAZhwS2BEK26cns" +
+ "V5/z2sLU/+sGYahjWGA9qgGaMs0QPMV2J89tv31Wd+LttdlebawvHPT7g+DdvzPQXr474" +
+ "//7i7+25Yt8n/PVPH/JJBDv3tWIzv8HwjvJ996yWsM/gf6eOOxf08fskP/gXBZxneZgf9" +
+ "AHSruXzZa8Z9Cvol8kHsW1Nf/K+r/sv83dx3R/5u5rjr/PQla5z8l+X4srWAgAVc2I7nt" +
+ "B1lMtgmku75fRnJWLTOKLwtkces56AgOkXlutf8waPf/axVJpIDe/r9jtc5/XNszlf+fA" +
+ "kf6/ztvGXgAsFswNhV8xxFA8yFlnQE0ZV7YIUBH/V+9+XOy/v/M9qxd/PfMsv/vKv8/BY" +
+ "7F/2vfJ+vB7k9xUaJwC6oMaKh/dy0cVGXtph+p8d0R6iyptWvD3UbonLSky9PrxfZOWhp" +
+ "RzZOGQkbonrSkSzPAi+2ftBRyYQ2UtuV9Z87YVMhY+eOL95Bmi9YQW9Q7X2GWkNLuP6V8" +
+ "Sx2Q1B5La48yXFdq25XcHqS3qoKXg673f2QXAL3nf17j/M8U539zx1T5/0kg7/WLEfPYD" +
+ "vHDXsB4xZlsh07eeCrb0sgYLwF9czI71AgvM5vtUMmFpbPnpl8FBQUFBQUFBQUFBQUFBQ" +
+ "UFBQUFhdHwFf2f+3IAUAAA"),
+ },
+ connection.ConnectionKey{CloudRegion: "mock_connection"}.String(): {
+ "metadata": []byte(
+ "{\"cloud-region\":\"mock_connection\"," +
+ "\"cloud-owner\":\"mock_owner\"," +
+ "\"kubeconfig\": \"" + base64.StdEncoding.EncodeToString(fd) + "\"}"),
+ },
+ },
+ }
+
+ ic := NewInstanceClient()
+ input := InstanceRequest{
+ RBName: "test-rbdef-hook",
+ RBVersion: "v1",
+ ProfileName: "profile1",
+ CloudRegion: "mock_connection",
+ }
+
+ ir, err := ic.Create(input)
+ if err != nil {
+ t.Fatalf("TestInstanceWithHookCreate returned an error (%s)", err)
+ }
+
+ log.Println(ir)
+
+ if len(ir.Resources) == 0 {
+ t.Fatalf("TestInstanceWithHookCreate returned empty data (%+v)", ir)
+ }
+ })
+}
diff --git a/src/k8splugin/internal/app/query.go b/src/k8splugin/internal/app/query.go
new file mode 100644
index 00000000..cb645afd
--- /dev/null
+++ b/src/k8splugin/internal/app/query.go
@@ -0,0 +1,108 @@
+/*
+ * Copyright 2018 Intel Corporation, Inc
+ * Copyright © 2021 Samsung Electronics
+ * Copyright © 2021 Orange
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package app
+
+import (
+ "github.com/onap/multicloud-k8s/src/k8splugin/internal/helm"
+ "k8s.io/apimachinery/pkg/runtime/schema"
+
+ pkgerrors "github.com/pkg/errors"
+)
+
+// QueryStatus is what is returned when status is queried for an instance
+type QueryStatus struct {
+ ResourceCount int32 `json:"resourceCount"`
+ ResourcesStatus []ResourceStatus `json:"resourcesStatus"`
+}
+
+// QueryManager is an interface exposes the instantiation functionality
+type QueryManager interface {
+ Query(namespace, cloudRegion, apiVersion, kind, name, labels, id string) (QueryStatus, error)
+}
+
+// QueryClient implements the InstanceManager interface
+// It will also be used to maintain some localized state
+type QueryClient struct {
+ storeName string
+ tagInst string
+}
+
+// NewQueryClient returns an instance of the QueryClient
+// which implements the InstanceManager
+func NewQueryClient() *QueryClient {
+ return &QueryClient{
+ storeName: "rbdef",
+ tagInst: "instance",
+ }
+}
+
+// Query returns state of instance's filtered resources
+func (v *QueryClient) Query(namespace, cloudRegion, apiVersion, kind, name, labels, id string) (QueryStatus, error) {
+
+ //Read the status from the DD
+
+ k8sClient := KubernetesClient{}
+ err := k8sClient.Init(cloudRegion, id)
+ if err != nil {
+ return QueryStatus{}, pkgerrors.Wrap(err, "Getting CloudRegion Information")
+ }
+
+ var resourcesStatus []ResourceStatus
+ if labels != "" {
+ resList, err := k8sClient.queryResources(apiVersion, kind, labels, namespace)
+ if err != nil {
+ return QueryStatus{}, pkgerrors.Wrap(err, "Querying Resources")
+ }
+ // If user specifies both label and name, we want to pick up only single resource from these matching label
+ if name != "" {
+ //Assigning 0-length, because we may actually not find matching name
+ resourcesStatus = make([]ResourceStatus, 0)
+ for _, res := range resList {
+ if res.Name == name {
+ resourcesStatus = append(resourcesStatus, res)
+ break
+ }
+ }
+ } else {
+ resourcesStatus = resList
+ }
+ } else if name != "" {
+ resIdentifier := helm.KubernetesResource{
+ Name: name,
+ GVK: schema.FromAPIVersionAndKind(apiVersion, kind),
+ }
+ res, err := k8sClient.GetResourceStatus(resIdentifier, namespace)
+ if err != nil {
+ return QueryStatus{}, pkgerrors.Wrap(err, "Querying Resource")
+ }
+ resourcesStatus = []ResourceStatus{res}
+ } else {
+ resList, err := k8sClient.queryResources(apiVersion, kind, labels, namespace)
+ if err != nil {
+ return QueryStatus{}, pkgerrors.Wrap(err, "Querying Resources")
+ }
+ resourcesStatus = resList
+ }
+
+ resp := QueryStatus{
+ ResourceCount: int32(len(resourcesStatus)),
+ ResourcesStatus: resourcesStatus,
+ }
+ return resp, nil
+}
diff --git a/src/k8splugin/internal/db/etcd.go b/src/k8splugin/internal/db/etcd.go
index 97771a07..a435b435 100644
--- a/src/k8splugin/internal/db/etcd.go
+++ b/src/k8splugin/internal/db/etcd.go
@@ -36,6 +36,7 @@ type EtcdConfig struct {
// EtcdStore Interface needed for mocking
type EtcdStore interface {
Get(key string) ([]byte, error)
+ GetAll(key string) ([][]byte, error)
Put(key, value string) error
Delete(key string) error
}
@@ -114,7 +115,7 @@ func (e EtcdClient) Get(key string) ([]byte, error) {
}
getResp, err := e.cli.Get(context.Background(), key)
if err != nil {
- return nil, pkgerrors.Errorf("Error getitng etcd entry: %s", err.Error())
+ return nil, pkgerrors.Errorf("Error getting etcd entry: %s", err.Error())
}
if getResp.Count == 0 {
return nil, pkgerrors.Errorf("Key doesn't exist")
@@ -122,6 +123,22 @@ func (e EtcdClient) Get(key string) ([]byte, error) {
return getResp.Kvs[0].Value, nil
}
+// GetAll sub values from Etcd DB
+func (e EtcdClient) GetAll(key string) ([][]byte, error) {
+ if e.cli == nil {
+ return nil, pkgerrors.Errorf("Etcd Client not initialized")
+ }
+ getResp, err := e.cli.Get(context.Background(), key, clientv3.WithPrefix())
+ if err != nil {
+ return nil, pkgerrors.Errorf("Error getting etcd entry: %s", err.Error())
+ }
+ result := make([][]byte, 0)
+ for _, v := range getResp.Kvs {
+ result = append(result, v.Value)
+ }
+ return result, nil
+}
+
// Delete values from Etcd DB
func (e EtcdClient) Delete(key string) error {
diff --git a/src/k8splugin/internal/db/etcd_testing.go b/src/k8splugin/internal/db/etcd_testing.go
index 12b17e33..9dfcad82 100644
--- a/src/k8splugin/internal/db/etcd_testing.go
+++ b/src/k8splugin/internal/db/etcd_testing.go
@@ -14,6 +14,8 @@ limitations under the License.
package db
import (
+ "strings"
+
pkgerrors "github.com/pkg/errors"
)
@@ -39,6 +41,16 @@ func (c *MockEtcdClient) Get(key string) ([]byte, error) {
return nil, pkgerrors.Errorf("Key doesn't exist")
}
+func (c *MockEtcdClient) GetAll(key string) ([][]byte, error) {
+ result := make([][]byte, 0)
+ for kvKey, kvValue := range c.Items {
+ if strings.HasPrefix(kvKey, key) {
+ result = append(result, []byte(kvValue))
+ }
+ }
+ return result, nil
+}
+
func (c *MockEtcdClient) Delete(key string) error {
delete(c.Items, key)
return c.Err
diff --git a/src/k8splugin/internal/helm/helm.go b/src/k8splugin/internal/helm/helm.go
index 3c25ac8c..849674a9 100644
--- a/src/k8splugin/internal/helm/helm.go
+++ b/src/k8splugin/internal/helm/helm.go
@@ -19,14 +19,13 @@ package helm
import (
"fmt"
+ "github.com/onap/multicloud-k8s/src/k8splugin/internal/utils"
"io/ioutil"
"os"
"path/filepath"
"regexp"
"strings"
- utils "github.com/onap/multicloud-k8s/src/k8splugin/internal"
-
pkgerrors "github.com/pkg/errors"
"helm.sh/helm/v3/pkg/action"
"helm.sh/helm/v3/pkg/chart/loader"
diff --git a/src/k8splugin/internal/plugin/helpers.go b/src/k8splugin/internal/plugin/helpers.go
index 7078b813..29213076 100644
--- a/src/k8splugin/internal/plugin/helpers.go
+++ b/src/k8splugin/internal/plugin/helpers.go
@@ -1,5 +1,6 @@
/*
* Copyright 2019 Intel Corporation, Inc
+ * Copyright © 2021 Nokia Bell Labs.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -17,12 +18,14 @@
package plugin
import (
+ "k8s.io/client-go/rest"
"log"
"strings"
+ "time"
- utils "github.com/onap/multicloud-k8s/src/k8splugin/internal"
"github.com/onap/multicloud-k8s/src/k8splugin/internal/config"
"github.com/onap/multicloud-k8s/src/k8splugin/internal/helm"
+ "github.com/onap/multicloud-k8s/src/k8splugin/internal/utils"
pkgerrors "github.com/pkg/errors"
corev1 "k8s.io/api/core/v1"
@@ -71,6 +74,16 @@ type Reference interface {
//Update kubernetes resource based on the groupVersionKind and resourceName provided in resource
Update(yamlFilePath string, namespace string, client KubernetesConnector) (string, error)
+
+
+ //WatchUntilReady a kubernetes resource until it's ready
+ WatchUntilReady(timeout time.Duration,
+ ns string,
+ res helm.KubernetesResource,
+ mapper meta.RESTMapper,
+ restClient rest.Interface,
+ objType runtime.Object,
+ clientSet kubernetes.Interface) error
}
// GetPluginByKind returns a plugin by the kind name
diff --git a/src/k8splugin/internal/plugin/helpers_test.go b/src/k8splugin/internal/plugin/helpers_test.go
index b968072f..34faf9a5 100644
--- a/src/k8splugin/internal/plugin/helpers_test.go
+++ b/src/k8splugin/internal/plugin/helpers_test.go
@@ -17,11 +17,11 @@
package plugin
import (
+ "github.com/onap/multicloud-k8s/src/k8splugin/internal/utils"
"testing"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
- utils "github.com/onap/multicloud-k8s/src/k8splugin/internal"
"github.com/onap/multicloud-k8s/src/k8splugin/internal/config"
)
diff --git a/src/k8splugin/internal/rb/archive.go b/src/k8splugin/internal/rb/archive.go
index 267c7cd2..c70dfd6c 100644
--- a/src/k8splugin/internal/rb/archive.go
+++ b/src/k8splugin/internal/rb/archive.go
@@ -19,13 +19,12 @@ package rb
import (
"archive/tar"
"compress/gzip"
+ "github.com/onap/multicloud-k8s/src/k8splugin/internal/utils"
pkgerrors "github.com/pkg/errors"
"io"
"io/ioutil"
"os"
"path/filepath"
-
- utils "github.com/onap/multicloud-k8s/src/k8splugin/internal"
)
func isTarGz(r io.Reader) error {
diff --git a/src/k8splugin/internal/statuscheck/converter.go b/src/k8splugin/internal/statuscheck/converter.go
new file mode 100644
index 00000000..8f411c41
--- /dev/null
+++ b/src/k8splugin/internal/statuscheck/converter.go
@@ -0,0 +1,69 @@
+/*
+Copyright The Helm Authors.
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package statuscheck // import "helm.sh/helm/v3/pkg/kube"
+
+import (
+ "sync"
+
+ apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
+ apiextensionsv1beta1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1beta1"
+ "k8s.io/apimachinery/pkg/api/meta"
+ "k8s.io/apimachinery/pkg/runtime"
+ "k8s.io/apimachinery/pkg/runtime/schema"
+ "k8s.io/cli-runtime/pkg/resource"
+ "k8s.io/client-go/kubernetes/scheme"
+)
+
+var k8sNativeScheme *runtime.Scheme
+var k8sNativeSchemeOnce sync.Once
+
+// AsVersioned converts the given info into a runtime.Object with the correct
+// group and version set
+func AsVersioned(info *resource.Info) runtime.Object {
+ return convertWithMapper(info.Object, info.Mapping)
+}
+
+// convertWithMapper converts the given object with the optional provided
+// RESTMapping. If no mapping is provided, the default schema versioner is used
+func convertWithMapper(obj runtime.Object, mapping *meta.RESTMapping) runtime.Object {
+ s := kubernetesNativeScheme()
+ var gv = runtime.GroupVersioner(schema.GroupVersions(s.PrioritizedVersionsAllGroups()))
+ if mapping != nil {
+ gv = mapping.GroupVersionKind.GroupVersion()
+ }
+ if obj, err := runtime.ObjectConvertor(s).ConvertToVersion(obj, gv); err == nil {
+ return obj
+ }
+ return obj
+}
+
+// kubernetesNativeScheme returns a clean *runtime.Scheme with _only_ Kubernetes
+// native resources added to it. This is required to break free of custom resources
+// that may have been added to scheme.Scheme due to Helm being used as a package in
+// combination with e.g. a versioned kube client. If we would not do this, the client
+// may attempt to perform e.g. a 3-way-merge strategy patch for custom resources.
+func kubernetesNativeScheme() *runtime.Scheme {
+ k8sNativeSchemeOnce.Do(func() {
+ k8sNativeScheme = runtime.NewScheme()
+ scheme.AddToScheme(k8sNativeScheme)
+ // API extensions are not in the above scheme set,
+ // and must thus be added separately.
+ apiextensionsv1beta1.AddToScheme(k8sNativeScheme)
+ apiextensionsv1.AddToScheme(k8sNativeScheme)
+ })
+ return k8sNativeScheme
+}
diff --git a/src/k8splugin/internal/statuscheck/ready.go b/src/k8splugin/internal/statuscheck/ready.go
new file mode 100644
index 00000000..7bea536a
--- /dev/null
+++ b/src/k8splugin/internal/statuscheck/ready.go
@@ -0,0 +1,393 @@
+/*
+Copyright The Helm Authors.
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package statuscheck // import "helm.sh/helm/v3/pkg/kube"
+
+import (
+ "context"
+ "github.com/onap/multicloud-k8s/src/k8splugin/internal/utils"
+ appsv1 "k8s.io/api/apps/v1"
+ appsv1beta1 "k8s.io/api/apps/v1beta1"
+ appsv1beta2 "k8s.io/api/apps/v1beta2"
+ batchv1 "k8s.io/api/batch/v1"
+ corev1 "k8s.io/api/core/v1"
+ extensionsv1beta1 "k8s.io/api/extensions/v1beta1"
+ apiextv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
+ apiextv1beta1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1beta1"
+ metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
+ "k8s.io/apimachinery/pkg/runtime"
+ "k8s.io/apimachinery/pkg/util/intstr"
+ "k8s.io/cli-runtime/pkg/resource"
+ "k8s.io/client-go/kubernetes"
+ "k8s.io/client-go/kubernetes/scheme"
+ "log"
+)
+
+// ReadyCheckerOption is a function that configures a ReadyChecker.
+type ReadyCheckerOption func(*ReadyChecker)
+
+// PausedAsReady returns a ReadyCheckerOption that configures a ReadyChecker
+// to consider paused resources to be ready. For example a Deployment
+// with spec.paused equal to true would be considered ready.
+func PausedAsReady(pausedAsReady bool) ReadyCheckerOption {
+ return func(c *ReadyChecker) {
+ c.pausedAsReady = pausedAsReady
+ }
+}
+
+// CheckJobs returns a ReadyCheckerOption that configures a ReadyChecker
+// to consider readiness of Job resources.
+func CheckJobs(checkJobs bool) ReadyCheckerOption {
+ return func(c *ReadyChecker) {
+ c.checkJobs = checkJobs
+ }
+}
+
+// NewReadyChecker creates a new checker. Passed ReadyCheckerOptions can
+// be used to override defaults.
+func NewReadyChecker(cl kubernetes.Interface, opts ...ReadyCheckerOption) ReadyChecker {
+ c := ReadyChecker{
+ client: cl,
+ }
+
+ for _, opt := range opts {
+ opt(&c)
+ }
+
+ return c
+}
+
+// ReadyChecker is a type that can check core Kubernetes types for readiness.
+type ReadyChecker struct {
+ client kubernetes.Interface
+ checkJobs bool
+ pausedAsReady bool
+}
+
+// IsReady checks if v is ready. It supports checking readiness for pods,
+// deployments, persistent volume claims, services, daemon sets, custom
+// resource definitions, stateful sets, replication controllers, and replica
+// sets. All other resource kinds are always considered ready.
+//
+// IsReady will fetch the latest state of the object from the server prior to
+// performing readiness checks, and it will return any error encountered.
+func (c *ReadyChecker) IsReady(ctx context.Context, v *resource.Info) (bool, error) {
+ var (
+ // This defaults to true, otherwise we get to a point where
+ // things will always return false unless one of the objects
+ // that manages pods has been hit
+ ok = true
+ err error
+ )
+ switch value := AsVersioned(v).(type) {
+ case *corev1.Pod:
+ pod, err := c.client.CoreV1().Pods(v.Namespace).Get(ctx, v.Name, metav1.GetOptions{})
+ if err != nil || !c.isPodReady(pod) {
+ return false, err
+ }
+ case *batchv1.Job:
+ if c.checkJobs {
+ job, err := c.client.BatchV1().Jobs(v.Namespace).Get(ctx, v.Name, metav1.GetOptions{})
+ if err != nil || !c.jobReady(job) {
+ return false, err
+ }
+ }
+ case *appsv1.Deployment, *appsv1beta1.Deployment, *appsv1beta2.Deployment, *extensionsv1beta1.Deployment:
+ currentDeployment, err := c.client.AppsV1().Deployments(v.Namespace).Get(ctx, v.Name, metav1.GetOptions{})
+ if err != nil {
+ return false, err
+ }
+ // If paused deployment will never be ready
+ if currentDeployment.Spec.Paused {
+ return c.pausedAsReady, nil
+ }
+ // Find RS associated with deployment
+ newReplicaSet, err := utils.GetNewReplicaSet(currentDeployment, c.client.AppsV1())
+ if err != nil || newReplicaSet == nil {
+ return false, err
+ }
+ if !c.deploymentReady(newReplicaSet, currentDeployment) {
+ return false, nil
+ }
+ case *corev1.PersistentVolumeClaim:
+ claim, err := c.client.CoreV1().PersistentVolumeClaims(v.Namespace).Get(ctx, v.Name, metav1.GetOptions{})
+ if err != nil {
+ return false, err
+ }
+ if !c.volumeReady(claim) {
+ return false, nil
+ }
+ case *corev1.Service:
+ svc, err := c.client.CoreV1().Services(v.Namespace).Get(ctx, v.Name, metav1.GetOptions{})
+ if err != nil {
+ return false, err
+ }
+ if !c.serviceReady(svc) {
+ return false, nil
+ }
+ case *extensionsv1beta1.DaemonSet, *appsv1.DaemonSet, *appsv1beta2.DaemonSet:
+ ds, err := c.client.AppsV1().DaemonSets(v.Namespace).Get(ctx, v.Name, metav1.GetOptions{})
+ if err != nil {
+ return false, err
+ }
+ if !c.daemonSetReady(ds) {
+ return false, nil
+ }
+ case *apiextv1beta1.CustomResourceDefinition:
+ if err := v.Get(); err != nil {
+ return false, err
+ }
+ crd := &apiextv1beta1.CustomResourceDefinition{}
+ if err := scheme.Scheme.Convert(v.Object, crd, nil); err != nil {
+ return false, err
+ }
+ if !c.crdBetaReady(*crd) {
+ return false, nil
+ }
+ case *apiextv1.CustomResourceDefinition:
+ if err := v.Get(); err != nil {
+ return false, err
+ }
+ crd := &apiextv1.CustomResourceDefinition{}
+ if err := scheme.Scheme.Convert(v.Object, crd, nil); err != nil {
+ return false, err
+ }
+ if !c.crdReady(*crd) {
+ return false, nil
+ }
+ case *appsv1.StatefulSet, *appsv1beta1.StatefulSet, *appsv1beta2.StatefulSet:
+ sts, err := c.client.AppsV1().StatefulSets(v.Namespace).Get(ctx, v.Name, metav1.GetOptions{})
+ if err != nil {
+ return false, err
+ }
+ if !c.statefulSetReady(sts) {
+ return false, nil
+ }
+ case *corev1.ReplicationController, *extensionsv1beta1.ReplicaSet, *appsv1beta2.ReplicaSet, *appsv1.ReplicaSet:
+ ok, err = c.podsReadyForObject(ctx, v.Namespace, value)
+ }
+ if !ok || err != nil {
+ return false, err
+ }
+ return true, nil
+}
+
+func (c *ReadyChecker) podsReadyForObject(ctx context.Context, namespace string, obj runtime.Object) (bool, error) {
+ pods, err := c.podsforObject(ctx, namespace, obj)
+ if err != nil {
+ return false, err
+ }
+ for _, pod := range pods {
+ if !c.isPodReady(&pod) {
+ return false, nil
+ }
+ }
+ return true, nil
+}
+
+func (c *ReadyChecker) podsforObject(ctx context.Context, namespace string, obj runtime.Object) ([]corev1.Pod, error) {
+ selector, err := SelectorsForObject(obj)
+ if err != nil {
+ return nil, err
+ }
+ list, err := getPods(ctx, c.client, namespace, selector.String())
+ return list, err
+}
+
+// isPodReady returns true if a pod is ready; false otherwise.
+func (c *ReadyChecker) isPodReady(pod *corev1.Pod) bool {
+ for _, c := range pod.Status.Conditions {
+ if c.Type == corev1.PodReady && c.Status == corev1.ConditionTrue {
+ return true
+ }
+ }
+ log.Printf("Pod is not ready: %s/%s", pod.GetNamespace(), pod.GetName())
+ return false
+}
+
+func (c *ReadyChecker) jobReady(job *batchv1.Job) bool {
+ if job.Status.Failed > *job.Spec.BackoffLimit {
+ log.Printf("Job is failed: %s/%s", job.GetNamespace(), job.GetName())
+ return false
+ }
+ if job.Status.Succeeded < *job.Spec.Completions {
+ log.Printf("Job is not completed: %s/%s", job.GetNamespace(), job.GetName())
+ return false
+ }
+ return true
+}
+
+func (c *ReadyChecker) serviceReady(s *corev1.Service) bool {
+ // ExternalName Services are external to cluster so helm shouldn't be checking to see if they're 'ready' (i.e. have an IP Set)
+ if s.Spec.Type == corev1.ServiceTypeExternalName {
+ return true
+ }
+
+ // Ensure that the service cluster IP is not empty
+ if s.Spec.ClusterIP == "" {
+ log.Printf("Service does not have cluster IP address: %s/%s", s.GetNamespace(), s.GetName())
+ return false
+ }
+
+ // This checks if the service has a LoadBalancer and that balancer has an Ingress defined
+ if s.Spec.Type == corev1.ServiceTypeLoadBalancer {
+ // do not wait when at least 1 external IP is set
+ if len(s.Spec.ExternalIPs) > 0 {
+ log.Printf("Service %s/%s has external IP addresses (%v), marking as ready", s.GetNamespace(), s.GetName(), s.Spec.ExternalIPs)
+ return true
+ }
+
+ if s.Status.LoadBalancer.Ingress == nil {
+ log.Printf("Service does not have load balancer ingress IP address: %s/%s", s.GetNamespace(), s.GetName())
+ return false
+ }
+ }
+
+ return true
+}
+
+func (c *ReadyChecker) volumeReady(v *corev1.PersistentVolumeClaim) bool {
+ if v.Status.Phase != corev1.ClaimBound {
+ log.Printf("PersistentVolumeClaim is not bound: %s/%s", v.GetNamespace(), v.GetName())
+ return false
+ }
+ return true
+}
+
+func (c *ReadyChecker) deploymentReady(rs *appsv1.ReplicaSet, dep *appsv1.Deployment) bool {
+ expectedReady := *dep.Spec.Replicas - utils.MaxUnavailable(*dep)
+ if !(rs.Status.ReadyReplicas >= expectedReady) {
+ log.Printf("Deployment is not ready: %s/%s. %d out of %d expected pods are ready", dep.Namespace, dep.Name, rs.Status.ReadyReplicas, expectedReady)
+ return false
+ }
+ return true
+}
+
+func (c *ReadyChecker) daemonSetReady(ds *appsv1.DaemonSet) bool {
+ // If the update strategy is not a rolling update, there will be nothing to wait for
+ if ds.Spec.UpdateStrategy.Type != appsv1.RollingUpdateDaemonSetStrategyType {
+ return true
+ }
+
+ // Make sure all the updated pods have been scheduled
+ if ds.Status.UpdatedNumberScheduled != ds.Status.DesiredNumberScheduled {
+ log.Printf("DaemonSet is not ready: %s/%s. %d out of %d expected pods have been scheduled", ds.Namespace, ds.Name, ds.Status.UpdatedNumberScheduled, ds.Status.DesiredNumberScheduled)
+ return false
+ }
+ maxUnavailable, err := intstr.GetValueFromIntOrPercent(ds.Spec.UpdateStrategy.RollingUpdate.MaxUnavailable, int(ds.Status.DesiredNumberScheduled), true)
+ if err != nil {
+ // If for some reason the value is invalid, set max unavailable to the
+ // number of desired replicas. This is the same behavior as the
+ // `MaxUnavailable` function in deploymentutil
+ maxUnavailable = int(ds.Status.DesiredNumberScheduled)
+ }
+
+ expectedReady := int(ds.Status.DesiredNumberScheduled) - maxUnavailable
+ if !(int(ds.Status.NumberReady) >= expectedReady) {
+ log.Printf("DaemonSet is not ready: %s/%s. %d out of %d expected pods are ready", ds.Namespace, ds.Name, ds.Status.NumberReady, expectedReady)
+ return false
+ }
+ return true
+}
+
+// Because the v1 extensions API is not available on all supported k8s versions
+// yet and because Go doesn't support generics, we need to have a duplicate
+// function to support the v1beta1 types
+func (c *ReadyChecker) crdBetaReady(crd apiextv1beta1.CustomResourceDefinition) bool {
+ for _, cond := range crd.Status.Conditions {
+ switch cond.Type {
+ case apiextv1beta1.Established:
+ if cond.Status == apiextv1beta1.ConditionTrue {
+ return true
+ }
+ case apiextv1beta1.NamesAccepted:
+ if cond.Status == apiextv1beta1.ConditionFalse {
+ // This indicates a naming conflict, but it's probably not the
+ // job of this function to fail because of that. Instead,
+ // we treat it as a success, since the process should be able to
+ // continue.
+ return true
+ }
+ }
+ }
+ return false
+}
+
+func (c *ReadyChecker) crdReady(crd apiextv1.CustomResourceDefinition) bool {
+ for _, cond := range crd.Status.Conditions {
+ switch cond.Type {
+ case apiextv1.Established:
+ if cond.Status == apiextv1.ConditionTrue {
+ return true
+ }
+ case apiextv1.NamesAccepted:
+ if cond.Status == apiextv1.ConditionFalse {
+ // This indicates a naming conflict, but it's probably not the
+ // job of this function to fail because of that. Instead,
+ // we treat it as a success, since the process should be able to
+ // continue.
+ return true
+ }
+ }
+ }
+ return false
+}
+
+func (c *ReadyChecker) statefulSetReady(sts *appsv1.StatefulSet) bool {
+ // If the update strategy is not a rolling update, there will be nothing to wait for
+ if sts.Spec.UpdateStrategy.Type != appsv1.RollingUpdateStatefulSetStrategyType {
+ return true
+ }
+
+ // Dereference all the pointers because StatefulSets like them
+ var partition int
+ // 1 is the default for replicas if not set
+ var replicas = 1
+ // For some reason, even if the update strategy is a rolling update, the
+ // actual rollingUpdate field can be nil. If it is, we can safely assume
+ // there is no partition value
+ if sts.Spec.UpdateStrategy.RollingUpdate != nil && sts.Spec.UpdateStrategy.RollingUpdate.Partition != nil {
+ partition = int(*sts.Spec.UpdateStrategy.RollingUpdate.Partition)
+ }
+ if sts.Spec.Replicas != nil {
+ replicas = int(*sts.Spec.Replicas)
+ }
+
+ // Because an update strategy can use partitioning, we need to calculate the
+ // number of updated replicas we should have. For example, if the replicas
+ // is set to 3 and the partition is 2, we'd expect only one pod to be
+ // updated
+ expectedReplicas := replicas - partition
+
+ // Make sure all the updated pods have been scheduled
+ if int(sts.Status.UpdatedReplicas) != expectedReplicas {
+ log.Printf("StatefulSet is not ready: %s/%s. %d out of %d expected pods have been scheduled", sts.Namespace, sts.Name, sts.Status.UpdatedReplicas, expectedReplicas)
+ return false
+ }
+
+ if int(sts.Status.ReadyReplicas) != replicas {
+ log.Printf("StatefulSet is not ready: %s/%s. %d out of %d expected pods are ready", sts.Namespace, sts.Name, sts.Status.ReadyReplicas, replicas)
+ return false
+ }
+ return true
+}
+
+func getPods(ctx context.Context, client kubernetes.Interface, namespace, selector string) ([]corev1.Pod, error) {
+ list, err := client.CoreV1().Pods(namespace).List(ctx, metav1.ListOptions{
+ LabelSelector: selector,
+ })
+ return list.Items, err
+}
diff --git a/src/k8splugin/internal/statuscheck/ready_test.go b/src/k8splugin/internal/statuscheck/ready_test.go
new file mode 100644
index 00000000..e1db16f4
--- /dev/null
+++ b/src/k8splugin/internal/statuscheck/ready_test.go
@@ -0,0 +1,517 @@
+/*
+Copyright The Helm Authors.
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package statuscheck // import "helm.sh/helm/v3/pkg/kube"
+
+import (
+ "context"
+ "testing"
+
+ appsv1 "k8s.io/api/apps/v1"
+ batchv1 "k8s.io/api/batch/v1"
+ corev1 "k8s.io/api/core/v1"
+ metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
+ "k8s.io/apimachinery/pkg/runtime"
+ "k8s.io/apimachinery/pkg/util/intstr"
+ "k8s.io/client-go/kubernetes/fake"
+)
+
+const defaultNamespace = metav1.NamespaceDefault
+
+func Test_ReadyChecker_deploymentReady(t *testing.T) {
+ type args struct {
+ rs *appsv1.ReplicaSet
+ dep *appsv1.Deployment
+ }
+ tests := []struct {
+ name string
+ args args
+ want bool
+ }{
+ {
+ name: "deployment is ready",
+ args: args{
+ rs: newReplicaSet("foo", 1, 1),
+ dep: newDeployment("foo", 1, 1, 0),
+ },
+ want: true,
+ },
+ {
+ name: "deployment is not ready",
+ args: args{
+ rs: newReplicaSet("foo", 0, 0),
+ dep: newDeployment("foo", 1, 1, 0),
+ },
+ want: false,
+ },
+ {
+ name: "deployment is ready when maxUnavailable is set",
+ args: args{
+ rs: newReplicaSet("foo", 2, 1),
+ dep: newDeployment("foo", 2, 1, 1),
+ },
+ want: true,
+ },
+ }
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ c := NewReadyChecker(fake.NewSimpleClientset(), PausedAsReady(false), CheckJobs(false))
+ if got := c.deploymentReady(tt.args.rs, tt.args.dep); got != tt.want {
+ t.Errorf("deploymentReady() = %v, want %v", got, tt.want)
+ }
+ })
+ }
+}
+
+func Test_ReadyChecker_daemonSetReady(t *testing.T) {
+ type args struct {
+ ds *appsv1.DaemonSet
+ }
+ tests := []struct {
+ name string
+ args args
+ want bool
+ }{
+ {
+ name: "daemonset is ready",
+ args: args{
+ ds: newDaemonSet("foo", 0, 1, 1, 1),
+ },
+ want: true,
+ },
+ {
+ name: "daemonset is not ready",
+ args: args{
+ ds: newDaemonSet("foo", 0, 0, 1, 1),
+ },
+ want: false,
+ },
+ {
+ name: "daemonset pods have not been scheduled successfully",
+ args: args{
+ ds: newDaemonSet("foo", 0, 0, 1, 0),
+ },
+ want: false,
+ },
+ {
+ name: "daemonset is ready when maxUnavailable is set",
+ args: args{
+ ds: newDaemonSet("foo", 1, 1, 2, 2),
+ },
+ want: true,
+ },
+ }
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ c := NewReadyChecker(fake.NewSimpleClientset(), PausedAsReady(false), CheckJobs(false))
+ if got := c.daemonSetReady(tt.args.ds); got != tt.want {
+ t.Errorf("daemonSetReady() = %v, want %v", got, tt.want)
+ }
+ })
+ }
+}
+
+func Test_ReadyChecker_statefulSetReady(t *testing.T) {
+ type args struct {
+ sts *appsv1.StatefulSet
+ }
+ tests := []struct {
+ name string
+ args args
+ want bool
+ }{
+ {
+ name: "statefulset is ready",
+ args: args{
+ sts: newStatefulSet("foo", 1, 0, 1, 1),
+ },
+ want: true,
+ },
+ {
+ name: "statefulset is not ready",
+ args: args{
+ sts: newStatefulSet("foo", 1, 0, 0, 1),
+ },
+ want: false,
+ },
+ {
+ name: "statefulset is ready when partition is specified",
+ args: args{
+ sts: newStatefulSet("foo", 2, 1, 2, 1),
+ },
+ want: true,
+ },
+ {
+ name: "statefulset is not ready when partition is set",
+ args: args{
+ sts: newStatefulSet("foo", 1, 1, 1, 1),
+ },
+ want: false,
+ },
+ }
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ c := NewReadyChecker(fake.NewSimpleClientset(), PausedAsReady(false), CheckJobs(false))
+ if got := c.statefulSetReady(tt.args.sts); got != tt.want {
+ t.Errorf("statefulSetReady() = %v, want %v", got, tt.want)
+ }
+ })
+ }
+}
+
+func Test_ReadyChecker_podsReadyForObject(t *testing.T) {
+ type args struct {
+ namespace string
+ obj runtime.Object
+ }
+ tests := []struct {
+ name string
+ args args
+ existPods []corev1.Pod
+ want bool
+ wantErr bool
+ }{
+ {
+ name: "pods ready for a replicaset",
+ args: args{
+ namespace: defaultNamespace,
+ obj: newReplicaSet("foo", 1, 1),
+ },
+ existPods: []corev1.Pod{
+ *newPodWithCondition("foo", corev1.ConditionTrue),
+ },
+ want: true,
+ wantErr: false,
+ },
+ {
+ name: "pods not ready for a replicaset",
+ args: args{
+ namespace: defaultNamespace,
+ obj: newReplicaSet("foo", 1, 1),
+ },
+ existPods: []corev1.Pod{
+ *newPodWithCondition("foo", corev1.ConditionFalse),
+ },
+ want: false,
+ wantErr: false,
+ },
+ }
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ c := NewReadyChecker(fake.NewSimpleClientset(), PausedAsReady(false), CheckJobs(false))
+ for _, pod := range tt.existPods {
+ if _, err := c.client.CoreV1().Pods(defaultNamespace).Create(context.TODO(), &pod, metav1.CreateOptions{}); err != nil {
+ t.Errorf("Failed to create Pod error: %v", err)
+ return
+ }
+ }
+ got, err := c.podsReadyForObject(context.TODO(), tt.args.namespace, tt.args.obj)
+ if (err != nil) != tt.wantErr {
+ t.Errorf("podsReadyForObject() error = %v, wantErr %v", err, tt.wantErr)
+ return
+ }
+ if got != tt.want {
+ t.Errorf("podsReadyForObject() got = %v, want %v", got, tt.want)
+ }
+ })
+ }
+}
+
+func Test_ReadyChecker_jobReady(t *testing.T) {
+ type args struct {
+ job *batchv1.Job
+ }
+ tests := []struct {
+ name string
+ args args
+ want bool
+ }{
+ {
+ name: "job is completed",
+ args: args{job: newJob("foo", 1, 1, 1, 0)},
+ want: true,
+ },
+ {
+ name: "job is incomplete",
+ args: args{job: newJob("foo", 1, 1, 0, 0)},
+ want: false,
+ },
+ {
+ name: "job is failed",
+ args: args{job: newJob("foo", 1, 1, 0, 1)},
+ want: false,
+ },
+ {
+ name: "job is completed with retry",
+ args: args{job: newJob("foo", 1, 1, 1, 1)},
+ want: true,
+ },
+ {
+ name: "job is failed with retry",
+ args: args{job: newJob("foo", 1, 1, 0, 2)},
+ want: false,
+ },
+ {
+ name: "job is completed single run",
+ args: args{job: newJob("foo", 0, 1, 1, 0)},
+ want: true,
+ },
+ {
+ name: "job is failed single run",
+ args: args{job: newJob("foo", 0, 1, 0, 1)},
+ want: false,
+ },
+ }
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ c := NewReadyChecker(fake.NewSimpleClientset(), PausedAsReady(false), CheckJobs(false))
+ if got := c.jobReady(tt.args.job); got != tt.want {
+ t.Errorf("jobReady() = %v, want %v", got, tt.want)
+ }
+ })
+ }
+}
+
+func Test_ReadyChecker_volumeReady(t *testing.T) {
+ type args struct {
+ v *corev1.PersistentVolumeClaim
+ }
+ tests := []struct {
+ name string
+ args args
+ want bool
+ }{
+ {
+ name: "pvc is bound",
+ args: args{
+ v: newPersistentVolumeClaim("foo", corev1.ClaimBound),
+ },
+ want: true,
+ },
+ {
+ name: "pvc is not ready",
+ args: args{
+ v: newPersistentVolumeClaim("foo", corev1.ClaimPending),
+ },
+ want: false,
+ },
+ }
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ c := NewReadyChecker(fake.NewSimpleClientset(), PausedAsReady(false), CheckJobs(false))
+ if got := c.volumeReady(tt.args.v); got != tt.want {
+ t.Errorf("volumeReady() = %v, want %v", got, tt.want)
+ }
+ })
+ }
+}
+
+func newDaemonSet(name string, maxUnavailable, numberReady, desiredNumberScheduled, updatedNumberScheduled int) *appsv1.DaemonSet {
+ return &appsv1.DaemonSet{
+ ObjectMeta: metav1.ObjectMeta{
+ Name: name,
+ Namespace: defaultNamespace,
+ },
+ Spec: appsv1.DaemonSetSpec{
+ UpdateStrategy: appsv1.DaemonSetUpdateStrategy{
+ Type: appsv1.RollingUpdateDaemonSetStrategyType,
+ RollingUpdate: &appsv1.RollingUpdateDaemonSet{
+ MaxUnavailable: func() *intstr.IntOrString { i := intstr.FromInt(maxUnavailable); return &i }(),
+ },
+ },
+ Selector: &metav1.LabelSelector{MatchLabels: map[string]string{"name": name}},
+ Template: corev1.PodTemplateSpec{
+ ObjectMeta: metav1.ObjectMeta{
+ Name: name,
+ Labels: map[string]string{"name": name},
+ },
+ Spec: corev1.PodSpec{
+ Containers: []corev1.Container{
+ {
+ Image: "nginx",
+ },
+ },
+ },
+ },
+ },
+ Status: appsv1.DaemonSetStatus{
+ DesiredNumberScheduled: int32(desiredNumberScheduled),
+ NumberReady: int32(numberReady),
+ UpdatedNumberScheduled: int32(updatedNumberScheduled),
+ },
+ }
+}
+
+func newStatefulSet(name string, replicas, partition, readyReplicas, updatedReplicas int) *appsv1.StatefulSet {
+ return &appsv1.StatefulSet{
+ ObjectMeta: metav1.ObjectMeta{
+ Name: name,
+ Namespace: defaultNamespace,
+ },
+ Spec: appsv1.StatefulSetSpec{
+ UpdateStrategy: appsv1.StatefulSetUpdateStrategy{
+ Type: appsv1.RollingUpdateStatefulSetStrategyType,
+ RollingUpdate: &appsv1.RollingUpdateStatefulSetStrategy{
+ Partition: intToInt32(partition),
+ },
+ },
+ Replicas: intToInt32(replicas),
+ Selector: &metav1.LabelSelector{MatchLabels: map[string]string{"name": name}},
+ Template: corev1.PodTemplateSpec{
+ ObjectMeta: metav1.ObjectMeta{
+ Name: name,
+ Labels: map[string]string{"name": name},
+ },
+ Spec: corev1.PodSpec{
+ Containers: []corev1.Container{
+ {
+ Image: "nginx",
+ },
+ },
+ },
+ },
+ },
+ Status: appsv1.StatefulSetStatus{
+ UpdatedReplicas: int32(updatedReplicas),
+ ReadyReplicas: int32(readyReplicas),
+ },
+ }
+}
+
+func newDeployment(name string, replicas, maxSurge, maxUnavailable int) *appsv1.Deployment {
+ return &appsv1.Deployment{
+ ObjectMeta: metav1.ObjectMeta{
+ Name: name,
+ Namespace: defaultNamespace,
+ },
+ Spec: appsv1.DeploymentSpec{
+ Strategy: appsv1.DeploymentStrategy{
+ Type: appsv1.RollingUpdateDeploymentStrategyType,
+ RollingUpdate: &appsv1.RollingUpdateDeployment{
+ MaxUnavailable: func() *intstr.IntOrString { i := intstr.FromInt(maxUnavailable); return &i }(),
+ MaxSurge: func() *intstr.IntOrString { i := intstr.FromInt(maxSurge); return &i }(),
+ },
+ },
+ Replicas: intToInt32(replicas),
+ Selector: &metav1.LabelSelector{MatchLabels: map[string]string{"name": name}},
+ Template: corev1.PodTemplateSpec{
+ ObjectMeta: metav1.ObjectMeta{
+ Name: name,
+ Labels: map[string]string{"name": name},
+ },
+ Spec: corev1.PodSpec{
+ Containers: []corev1.Container{
+ {
+ Image: "nginx",
+ },
+ },
+ },
+ },
+ },
+ }
+}
+
+func newReplicaSet(name string, replicas int, readyReplicas int) *appsv1.ReplicaSet {
+ d := newDeployment(name, replicas, 0, 0)
+ return &appsv1.ReplicaSet{
+ ObjectMeta: metav1.ObjectMeta{
+ Name: name,
+ Namespace: defaultNamespace,
+ Labels: d.Spec.Selector.MatchLabels,
+ OwnerReferences: []metav1.OwnerReference{*metav1.NewControllerRef(d, d.GroupVersionKind())},
+ },
+ Spec: appsv1.ReplicaSetSpec{
+ Selector: d.Spec.Selector,
+ Replicas: intToInt32(replicas),
+ Template: d.Spec.Template,
+ },
+ Status: appsv1.ReplicaSetStatus{
+ ReadyReplicas: int32(readyReplicas),
+ },
+ }
+}
+
+func newPodWithCondition(name string, podReadyCondition corev1.ConditionStatus) *corev1.Pod {
+ return &corev1.Pod{
+ ObjectMeta: metav1.ObjectMeta{
+ Name: name,
+ Namespace: defaultNamespace,
+ Labels: map[string]string{"name": name},
+ },
+ Spec: corev1.PodSpec{
+ Containers: []corev1.Container{
+ {
+ Image: "nginx",
+ },
+ },
+ },
+ Status: corev1.PodStatus{
+ Conditions: []corev1.PodCondition{
+ {
+ Type: corev1.PodReady,
+ Status: podReadyCondition,
+ },
+ },
+ },
+ }
+}
+
+func newPersistentVolumeClaim(name string, phase corev1.PersistentVolumeClaimPhase) *corev1.PersistentVolumeClaim {
+ return &corev1.PersistentVolumeClaim{
+ ObjectMeta: metav1.ObjectMeta{
+ Name: name,
+ Namespace: defaultNamespace,
+ },
+ Status: corev1.PersistentVolumeClaimStatus{
+ Phase: phase,
+ },
+ }
+}
+
+func newJob(name string, backoffLimit, completions, succeeded, failed int) *batchv1.Job {
+ return &batchv1.Job{
+ ObjectMeta: metav1.ObjectMeta{
+ Name: name,
+ Namespace: defaultNamespace,
+ },
+ Spec: batchv1.JobSpec{
+ BackoffLimit: intToInt32(backoffLimit),
+ Completions: intToInt32(completions),
+ Template: corev1.PodTemplateSpec{
+ ObjectMeta: metav1.ObjectMeta{
+ Name: name,
+ Labels: map[string]string{"name": name},
+ },
+ Spec: corev1.PodSpec{
+ Containers: []corev1.Container{
+ {
+ Image: "nginx",
+ },
+ },
+ },
+ },
+ },
+ Status: batchv1.JobStatus{
+ Succeeded: int32(succeeded),
+ Failed: int32(failed),
+ },
+ }
+}
+
+func intToInt32(i int) *int32 {
+ i32 := int32(i)
+ return &i32
+}
diff --git a/src/k8splugin/internal/statuscheck/resource.go b/src/k8splugin/internal/statuscheck/resource.go
new file mode 100644
index 00000000..598af2fb
--- /dev/null
+++ b/src/k8splugin/internal/statuscheck/resource.go
@@ -0,0 +1,85 @@
+/*
+Copyright The Helm Authors.
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package statuscheck // import "helm.sh/helm/v3/pkg/kube"
+
+import "k8s.io/cli-runtime/pkg/resource"
+
+// ResourceList provides convenience methods for comparing collections of Infos.
+type ResourceList []*resource.Info
+
+// Append adds an Info to the Result.
+func (r *ResourceList) Append(val *resource.Info) {
+ *r = append(*r, val)
+}
+
+// Visit implements resource.Visitor.
+func (r ResourceList) Visit(fn resource.VisitorFunc) error {
+ for _, i := range r {
+ if err := fn(i, nil); err != nil {
+ return err
+ }
+ }
+ return nil
+}
+
+// Filter returns a new Result with Infos that satisfy the predicate fn.
+func (r ResourceList) Filter(fn func(*resource.Info) bool) ResourceList {
+ var result ResourceList
+ for _, i := range r {
+ if fn(i) {
+ result.Append(i)
+ }
+ }
+ return result
+}
+
+// Get returns the Info from the result that matches the name and kind.
+func (r ResourceList) Get(info *resource.Info) *resource.Info {
+ for _, i := range r {
+ if isMatchingInfo(i, info) {
+ return i
+ }
+ }
+ return nil
+}
+
+// Contains checks to see if an object exists.
+func (r ResourceList) Contains(info *resource.Info) bool {
+ for _, i := range r {
+ if isMatchingInfo(i, info) {
+ return true
+ }
+ }
+ return false
+}
+
+// Difference will return a new Result with objects not contained in rs.
+func (r ResourceList) Difference(rs ResourceList) ResourceList {
+ return r.Filter(func(info *resource.Info) bool {
+ return !rs.Contains(info)
+ })
+}
+
+// Intersect will return a new Result with objects contained in both Results.
+func (r ResourceList) Intersect(rs ResourceList) ResourceList {
+ return r.Filter(rs.Contains)
+}
+
+// isMatchingInfo returns true if infos match on Name and GroupVersionKind.
+func isMatchingInfo(a, b *resource.Info) bool {
+ return a.Name == b.Name && a.Namespace == b.Namespace && a.Mapping.GroupVersionKind.Kind == b.Mapping.GroupVersionKind.Kind
+}
diff --git a/src/k8splugin/internal/statuscheck/resource_test.go b/src/k8splugin/internal/statuscheck/resource_test.go
new file mode 100644
index 00000000..532cebfd
--- /dev/null
+++ b/src/k8splugin/internal/statuscheck/resource_test.go
@@ -0,0 +1,61 @@
+/*
+Copyright The Helm Authors.
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package statuscheck // import "helm.sh/helm/v3/pkg/kube"
+
+import (
+ "testing"
+
+ "k8s.io/apimachinery/pkg/api/meta"
+ "k8s.io/apimachinery/pkg/runtime/schema"
+ "k8s.io/cli-runtime/pkg/resource"
+)
+
+func TestResourceList(t *testing.T) {
+ mapping := &meta.RESTMapping{
+ Resource: schema.GroupVersionResource{Group: "group", Version: "version", Resource: "pod"},
+ }
+
+ info := func(name string) *resource.Info {
+ return &resource.Info{Name: name, Mapping: mapping}
+ }
+
+ var r1, r2 ResourceList
+ r1 = []*resource.Info{info("foo"), info("bar")}
+ r2 = []*resource.Info{info("bar")}
+
+ if r1.Get(info("bar")).Mapping.Resource.Resource != "pod" {
+ t.Error("expected get pod")
+ }
+
+ diff := r1.Difference(r2)
+ if len(diff) != 1 {
+ t.Error("expected 1 result")
+ }
+
+ if !diff.Contains(info("foo")) {
+ t.Error("expected diff to return foo")
+ }
+
+ inter := r1.Intersect(r2)
+ if len(inter) != 1 {
+ t.Error("expected 1 result")
+ }
+
+ if !inter.Contains(info("bar")) {
+ t.Error("expected intersect to return bar")
+ }
+}
diff --git a/src/k8splugin/internal/statuscheck/wait.go b/src/k8splugin/internal/statuscheck/wait.go
new file mode 100644
index 00000000..41d90d91
--- /dev/null
+++ b/src/k8splugin/internal/statuscheck/wait.go
@@ -0,0 +1,109 @@
+/*
+Copyright The Helm Authors.
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package statuscheck // import "helm.sh/helm/v3/pkg/kube"
+
+import (
+ "context"
+ "fmt"
+ "time"
+
+ "github.com/pkg/errors"
+ appsv1 "k8s.io/api/apps/v1"
+ appsv1beta1 "k8s.io/api/apps/v1beta1"
+ appsv1beta2 "k8s.io/api/apps/v1beta2"
+ batchv1 "k8s.io/api/batch/v1"
+ corev1 "k8s.io/api/core/v1"
+ extensionsv1beta1 "k8s.io/api/extensions/v1beta1"
+ metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
+ "k8s.io/apimachinery/pkg/labels"
+ "k8s.io/apimachinery/pkg/runtime"
+
+ "k8s.io/apimachinery/pkg/util/wait"
+)
+
+type waiter struct {
+ c ReadyChecker
+ timeout time.Duration
+ log func(string, ...interface{})
+}
+
+// waitForResources polls to get the current status of all pods, PVCs, Services and
+// Jobs(optional) until all are ready or a timeout is reached
+func (w *waiter) waitForResources(created ResourceList) error {
+ w.log("beginning wait for %d resources with timeout of %v", len(created), w.timeout)
+
+ ctx, cancel := context.WithTimeout(context.Background(), w.timeout)
+ defer cancel()
+
+ return wait.PollImmediateUntil(2*time.Second, func() (bool, error) {
+ for _, v := range created {
+ ready, err := w.c.IsReady(ctx, v)
+ if !ready || err != nil {
+ return false, err
+ }
+ }
+ return true, nil
+ }, ctx.Done())
+}
+
+// SelectorsForObject returns the pod label selector for a given object
+//
+// Modified version of https://github.com/kubernetes/kubernetes/blob/v1.14.1/pkg/kubectl/polymorphichelpers/helpers.go#L84
+func SelectorsForObject(object runtime.Object) (selector labels.Selector, err error) {
+ switch t := object.(type) {
+ case *extensionsv1beta1.ReplicaSet:
+ selector, err = metav1.LabelSelectorAsSelector(t.Spec.Selector)
+ case *appsv1.ReplicaSet:
+ selector, err = metav1.LabelSelectorAsSelector(t.Spec.Selector)
+ case *appsv1beta2.ReplicaSet:
+ selector, err = metav1.LabelSelectorAsSelector(t.Spec.Selector)
+ case *corev1.ReplicationController:
+ selector = labels.SelectorFromSet(t.Spec.Selector)
+ case *appsv1.StatefulSet:
+ selector, err = metav1.LabelSelectorAsSelector(t.Spec.Selector)
+ case *appsv1beta1.StatefulSet:
+ selector, err = metav1.LabelSelectorAsSelector(t.Spec.Selector)
+ case *appsv1beta2.StatefulSet:
+ selector, err = metav1.LabelSelectorAsSelector(t.Spec.Selector)
+ case *extensionsv1beta1.DaemonSet:
+ selector, err = metav1.LabelSelectorAsSelector(t.Spec.Selector)
+ case *appsv1.DaemonSet:
+ selector, err = metav1.LabelSelectorAsSelector(t.Spec.Selector)
+ case *appsv1beta2.DaemonSet:
+ selector, err = metav1.LabelSelectorAsSelector(t.Spec.Selector)
+ case *extensionsv1beta1.Deployment:
+ selector, err = metav1.LabelSelectorAsSelector(t.Spec.Selector)
+ case *appsv1.Deployment:
+ selector, err = metav1.LabelSelectorAsSelector(t.Spec.Selector)
+ case *appsv1beta1.Deployment:
+ selector, err = metav1.LabelSelectorAsSelector(t.Spec.Selector)
+ case *appsv1beta2.Deployment:
+ selector, err = metav1.LabelSelectorAsSelector(t.Spec.Selector)
+ case *batchv1.Job:
+ selector, err = metav1.LabelSelectorAsSelector(t.Spec.Selector)
+ case *corev1.Service:
+ if t.Spec.Selector == nil || len(t.Spec.Selector) == 0 {
+ return nil, fmt.Errorf("invalid service '%s': Service is defined without a selector", t.Name)
+ }
+ selector = labels.SelectorFromSet(t.Spec.Selector)
+
+ default:
+ return nil, fmt.Errorf("selector for %T not implemented", object)
+ }
+
+ return selector, errors.Wrap(err, "invalid label selector")
+}
diff --git a/src/k8splugin/internal/utils/deploymentutil.go b/src/k8splugin/internal/utils/deploymentutil.go
new file mode 100644
index 00000000..b5159c41
--- /dev/null
+++ b/src/k8splugin/internal/utils/deploymentutil.go
@@ -0,0 +1,178 @@
+/*
+Copyright 2016 The Kubernetes Authors.
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package utils
+
+import (
+ "context"
+ "sort"
+
+ apps "k8s.io/api/apps/v1"
+ v1 "k8s.io/api/core/v1"
+ apiequality "k8s.io/apimachinery/pkg/api/equality"
+ metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
+ intstrutil "k8s.io/apimachinery/pkg/util/intstr"
+ appsclient "k8s.io/client-go/kubernetes/typed/apps/v1"
+)
+
+// deploymentutil contains a copy of a few functions from Kubernetes controller code to avoid a dependency on k8s.io/kubernetes.
+// This code is copied from https://github.com/kubernetes/kubernetes/blob/e856613dd5bb00bcfaca6974431151b5c06cbed5/pkg/controller/deployment/util/deployment_util.go
+// No changes to the code were made other than removing some unused functions
+
+// RsListFunc returns the ReplicaSet from the ReplicaSet namespace and the List metav1.ListOptions.
+type RsListFunc func(string, metav1.ListOptions) ([]*apps.ReplicaSet, error)
+
+// ListReplicaSets returns a slice of RSes the given deployment targets.
+// Note that this does NOT attempt to reconcile ControllerRef (adopt/orphan),
+// because only the controller itself should do that.
+// However, it does filter out anything whose ControllerRef doesn't match.
+func ListReplicaSets(deployment *apps.Deployment, getRSList RsListFunc) ([]*apps.ReplicaSet, error) {
+ // TODO: Right now we list replica sets by their labels. We should list them by selector, i.e. the replica set's selector
+ // should be a superset of the deployment's selector, see https://github.com/kubernetes/kubernetes/issues/19830.
+ namespace := deployment.Namespace
+ selector, err := metav1.LabelSelectorAsSelector(deployment.Spec.Selector)
+ if err != nil {
+ return nil, err
+ }
+ options := metav1.ListOptions{LabelSelector: selector.String()}
+ all, err := getRSList(namespace, options)
+ if err != nil {
+ return nil, err
+ }
+ // Only include those whose ControllerRef matches the Deployment.
+ owned := make([]*apps.ReplicaSet, 0, len(all))
+ for _, rs := range all {
+ if metav1.IsControlledBy(rs, deployment) {
+ owned = append(owned, rs)
+ }
+ }
+ return owned, nil
+}
+
+// ReplicaSetsByCreationTimestamp sorts a list of ReplicaSet by creation timestamp, using their names as a tie breaker.
+type ReplicaSetsByCreationTimestamp []*apps.ReplicaSet
+
+func (o ReplicaSetsByCreationTimestamp) Len() int { return len(o) }
+func (o ReplicaSetsByCreationTimestamp) Swap(i, j int) { o[i], o[j] = o[j], o[i] }
+func (o ReplicaSetsByCreationTimestamp) Less(i, j int) bool {
+ if o[i].CreationTimestamp.Equal(&o[j].CreationTimestamp) {
+ return o[i].Name < o[j].Name
+ }
+ return o[i].CreationTimestamp.Before(&o[j].CreationTimestamp)
+}
+
+// FindNewReplicaSet returns the new RS this given deployment targets (the one with the same pod template).
+func FindNewReplicaSet(deployment *apps.Deployment, rsList []*apps.ReplicaSet) *apps.ReplicaSet {
+ sort.Sort(ReplicaSetsByCreationTimestamp(rsList))
+ for i := range rsList {
+ if EqualIgnoreHash(&rsList[i].Spec.Template, &deployment.Spec.Template) {
+ // In rare cases, such as after cluster upgrades, Deployment may end up with
+ // having more than one new ReplicaSets that have the same template as its template,
+ // see https://github.com/kubernetes/kubernetes/issues/40415
+ // We deterministically choose the oldest new ReplicaSet.
+ return rsList[i]
+ }
+ }
+ // new ReplicaSet does not exist.
+ return nil
+}
+
+// EqualIgnoreHash returns true if two given podTemplateSpec are equal, ignoring the diff in value of Labels[pod-template-hash]
+// We ignore pod-template-hash because:
+// 1. The hash result would be different upon podTemplateSpec API changes
+// (e.g. the addition of a new field will cause the hash code to change)
+// 2. The deployment template won't have hash labels
+func EqualIgnoreHash(template1, template2 *v1.PodTemplateSpec) bool {
+ t1Copy := template1.DeepCopy()
+ t2Copy := template2.DeepCopy()
+ // Remove hash labels from template.Labels before comparing
+ delete(t1Copy.Labels, apps.DefaultDeploymentUniqueLabelKey)
+ delete(t2Copy.Labels, apps.DefaultDeploymentUniqueLabelKey)
+ return apiequality.Semantic.DeepEqual(t1Copy, t2Copy)
+}
+
+// GetNewReplicaSet returns a replica set that matches the intent of the given deployment; get ReplicaSetList from client interface.
+// Returns nil if the new replica set doesn't exist yet.
+func GetNewReplicaSet(deployment *apps.Deployment, c appsclient.AppsV1Interface) (*apps.ReplicaSet, error) {
+ rsList, err := ListReplicaSets(deployment, RsListFromClient(c))
+ if err != nil {
+ return nil, err
+ }
+ return FindNewReplicaSet(deployment, rsList), nil
+}
+
+// RsListFromClient returns an rsListFunc that wraps the given client.
+func RsListFromClient(c appsclient.AppsV1Interface) RsListFunc {
+ return func(namespace string, options metav1.ListOptions) ([]*apps.ReplicaSet, error) {
+ rsList, err := c.ReplicaSets(namespace).List(context.Background(), options)
+ if err != nil {
+ return nil, err
+ }
+ var ret []*apps.ReplicaSet
+ for i := range rsList.Items {
+ ret = append(ret, &rsList.Items[i])
+ }
+ return ret, err
+ }
+}
+
+// IsRollingUpdate returns true if the strategy type is a rolling update.
+func IsRollingUpdate(deployment *apps.Deployment) bool {
+ return deployment.Spec.Strategy.Type == apps.RollingUpdateDeploymentStrategyType
+}
+
+// MaxUnavailable returns the maximum unavailable pods a rolling deployment can take.
+func MaxUnavailable(deployment apps.Deployment) int32 {
+ if !IsRollingUpdate(&deployment) || *(deployment.Spec.Replicas) == 0 {
+ return int32(0)
+ }
+ // Error caught by validation
+ _, maxUnavailable, _ := ResolveFenceposts(deployment.Spec.Strategy.RollingUpdate.MaxSurge, deployment.Spec.Strategy.RollingUpdate.MaxUnavailable, *(deployment.Spec.Replicas))
+ if maxUnavailable > *deployment.Spec.Replicas {
+ return *deployment.Spec.Replicas
+ }
+ return maxUnavailable
+}
+
+// ResolveFenceposts resolves both maxSurge and maxUnavailable. This needs to happen in one
+// step. For example:
+//
+// 2 desired, max unavailable 1%, surge 0% - should scale old(-1), then new(+1), then old(-1), then new(+1)
+// 1 desired, max unavailable 1%, surge 0% - should scale old(-1), then new(+1)
+// 2 desired, max unavailable 25%, surge 1% - should scale new(+1), then old(-1), then new(+1), then old(-1)
+// 1 desired, max unavailable 25%, surge 1% - should scale new(+1), then old(-1)
+// 2 desired, max unavailable 0%, surge 1% - should scale new(+1), then old(-1), then new(+1), then old(-1)
+// 1 desired, max unavailable 0%, surge 1% - should scale new(+1), then old(-1)
+func ResolveFenceposts(maxSurge, maxUnavailable *intstrutil.IntOrString, desired int32) (int32, int32, error) {
+ surge, err := intstrutil.GetValueFromIntOrPercent(intstrutil.ValueOrDefault(maxSurge, intstrutil.FromInt(0)), int(desired), true)
+ if err != nil {
+ return 0, 0, err
+ }
+ unavailable, err := intstrutil.GetValueFromIntOrPercent(intstrutil.ValueOrDefault(maxUnavailable, intstrutil.FromInt(0)), int(desired), false)
+ if err != nil {
+ return 0, 0, err
+ }
+
+ if surge == 0 && unavailable == 0 {
+ // Validation should never allow the user to explicitly use zero values for both maxSurge
+ // maxUnavailable. Due to rounding down maxUnavailable though, it may resolve to zero.
+ // If both fenceposts resolve to zero, then we should set maxUnavailable to 1 on the
+ // theory that surge might not work due to quota.
+ unavailable = 1
+ }
+
+ return int32(surge), int32(unavailable), nil
+}
diff --git a/src/k8splugin/internal/utils.go b/src/k8splugin/internal/utils/utils.go
index 174f8e79..174f8e79 100644
--- a/src/k8splugin/internal/utils.go
+++ b/src/k8splugin/internal/utils/utils.go
diff --git a/src/k8splugin/internal/utils_test.go b/src/k8splugin/internal/utils/utils_test.go
index 58b17bc6..908ce92e 100644
--- a/src/k8splugin/internal/utils_test.go
+++ b/src/k8splugin/internal/utils/utils_test.go
@@ -42,7 +42,7 @@ func TestDecodeYAML(t *testing.T) {
},
{
label: "Successfully read YAML file",
- input: "../mock_files/mock_yamls/deployment.yaml",
+ input: "../../mock_files/mock_yamls/deployment.yaml",
expectedResult: &appsV1.Deployment{
ObjectMeta: metaV1.ObjectMeta{
Name: "mock-deployment",