aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorDileep Ranganathan <dileep.ranganathan@intel.com>2019-09-25 14:37:41 -0700
committerMarco Platania <platania@research.att.com>2019-10-01 12:47:15 +0000
commit604eccdeb964c1b640692f832f74d9f9cf8f478e (patch)
treefc01078709447874538841506e19b16e6e2ef2de
parent7e8edf7f0f52169cbda36b45dfa67db2d3e34eab (diff)
Added Daemonset Status predicate
Added Daemonset Status predicate to optimize watch on DS. Minor fixes on status update for the controllers by coalescing status and CR update together. Issue-ID: ONAPARC-461 Signed-off-by: Dileep Ranganathan <dileep.ranganathan@intel.com> Change-Id: I2a56f0b93c2d7a56b9e8149c41f8c6f22be86ef1
-rw-r--r--vnfs/DAaaS/microservices/collectd-operator/pkg/apis/onap/v1alpha1/zz_generated.openapi.go24
-rw-r--r--vnfs/DAaaS/microservices/collectd-operator/pkg/controller/collectdglobal/collectdglobal_controller.go55
-rw-r--r--vnfs/DAaaS/microservices/collectd-operator/pkg/controller/collectdplugin/collectdplugin_controller.go71
-rw-r--r--vnfs/DAaaS/microservices/collectd-operator/pkg/controller/utils/collectdutils.go2
-rw-r--r--vnfs/DAaaS/microservices/collectd-operator/pkg/controller/utils/predicate.go65
5 files changed, 164 insertions, 53 deletions
diff --git a/vnfs/DAaaS/microservices/collectd-operator/pkg/apis/onap/v1alpha1/zz_generated.openapi.go b/vnfs/DAaaS/microservices/collectd-operator/pkg/apis/onap/v1alpha1/zz_generated.openapi.go
index 0c803967..85e49f43 100644
--- a/vnfs/DAaaS/microservices/collectd-operator/pkg/apis/onap/v1alpha1/zz_generated.openapi.go
+++ b/vnfs/DAaaS/microservices/collectd-operator/pkg/apis/onap/v1alpha1/zz_generated.openapi.go
@@ -11,12 +11,12 @@ import (
func GetOpenAPIDefinitions(ref common.ReferenceCallback) map[string]common.OpenAPIDefinition {
return map[string]common.OpenAPIDefinition{
- "demo/vnfs/DAaaS/microservices/collectd-operator/pkg/apis/onap/v1alpha1.CollectdGlobal": schema_pkg_apis_onap_v1alpha1_CollectdGlobal(ref),
- "demo/vnfs/DAaaS/microservices/collectd-operator/pkg/apis/onap/v1alpha1.CollectdGlobalSpec": schema_pkg_apis_onap_v1alpha1_CollectdGlobalSpec(ref),
- "demo/vnfs/DAaaS/microservices/collectd-operator/pkg/apis/onap/v1alpha1.CollectdGlobalStatus": schema_pkg_apis_onap_v1alpha1_CollectdGlobalStatus(ref),
- "demo/vnfs/DAaaS/microservices/collectd-operator/pkg/apis/onap/v1alpha1.CollectdPlugin": schema_pkg_apis_onap_v1alpha1_CollectdPlugin(ref),
- "demo/vnfs/DAaaS/microservices/collectd-operator/pkg/apis/onap/v1alpha1.CollectdPluginSpec": schema_pkg_apis_onap_v1alpha1_CollectdPluginSpec(ref),
- "demo/vnfs/DAaaS/microservices/collectd-operator/pkg/apis/onap/v1alpha1.CollectdPluginStatus": schema_pkg_apis_onap_v1alpha1_CollectdPluginStatus(ref),
+ "./pkg/apis/onap/v1alpha1.CollectdGlobal": schema_pkg_apis_onap_v1alpha1_CollectdGlobal(ref),
+ "./pkg/apis/onap/v1alpha1.CollectdGlobalSpec": schema_pkg_apis_onap_v1alpha1_CollectdGlobalSpec(ref),
+ "./pkg/apis/onap/v1alpha1.CollectdGlobalStatus": schema_pkg_apis_onap_v1alpha1_CollectdGlobalStatus(ref),
+ "./pkg/apis/onap/v1alpha1.CollectdPlugin": schema_pkg_apis_onap_v1alpha1_CollectdPlugin(ref),
+ "./pkg/apis/onap/v1alpha1.CollectdPluginSpec": schema_pkg_apis_onap_v1alpha1_CollectdPluginSpec(ref),
+ "./pkg/apis/onap/v1alpha1.CollectdPluginStatus": schema_pkg_apis_onap_v1alpha1_CollectdPluginStatus(ref),
}
}
@@ -47,19 +47,19 @@ func schema_pkg_apis_onap_v1alpha1_CollectdGlobal(ref common.ReferenceCallback)
},
"spec": {
SchemaProps: spec.SchemaProps{
- Ref: ref("demo/vnfs/DAaaS/microservices/collectd-operator/pkg/apis/onap/v1alpha1.CollectdGlobalSpec"),
+ Ref: ref("./pkg/apis/onap/v1alpha1.CollectdGlobalSpec"),
},
},
"status": {
SchemaProps: spec.SchemaProps{
- Ref: ref("demo/vnfs/DAaaS/microservices/collectd-operator/pkg/apis/onap/v1alpha1.CollectdGlobalStatus"),
+ Ref: ref("./pkg/apis/onap/v1alpha1.CollectdGlobalStatus"),
},
},
},
},
},
Dependencies: []string{
- "demo/vnfs/DAaaS/microservices/collectd-operator/pkg/apis/onap/v1alpha1.CollectdGlobalSpec", "demo/vnfs/DAaaS/microservices/collectd-operator/pkg/apis/onap/v1alpha1.CollectdGlobalStatus", "k8s.io/apimachinery/pkg/apis/meta/v1.ObjectMeta"},
+ "./pkg/apis/onap/v1alpha1.CollectdGlobalSpec", "./pkg/apis/onap/v1alpha1.CollectdGlobalStatus", "k8s.io/apimachinery/pkg/apis/meta/v1.ObjectMeta"},
}
}
@@ -151,19 +151,19 @@ func schema_pkg_apis_onap_v1alpha1_CollectdPlugin(ref common.ReferenceCallback)
},
"spec": {
SchemaProps: spec.SchemaProps{
- Ref: ref("demo/vnfs/DAaaS/microservices/collectd-operator/pkg/apis/onap/v1alpha1.CollectdPluginSpec"),
+ Ref: ref("./pkg/apis/onap/v1alpha1.CollectdPluginSpec"),
},
},
"status": {
SchemaProps: spec.SchemaProps{
- Ref: ref("demo/vnfs/DAaaS/microservices/collectd-operator/pkg/apis/onap/v1alpha1.CollectdPluginStatus"),
+ Ref: ref("./pkg/apis/onap/v1alpha1.CollectdPluginStatus"),
},
},
},
},
},
Dependencies: []string{
- "demo/vnfs/DAaaS/microservices/collectd-operator/pkg/apis/onap/v1alpha1.CollectdPluginSpec", "demo/vnfs/DAaaS/microservices/collectd-operator/pkg/apis/onap/v1alpha1.CollectdPluginStatus", "k8s.io/apimachinery/pkg/apis/meta/v1.ObjectMeta"},
+ "./pkg/apis/onap/v1alpha1.CollectdPluginSpec", "./pkg/apis/onap/v1alpha1.CollectdPluginStatus", "k8s.io/apimachinery/pkg/apis/meta/v1.ObjectMeta"},
}
}
diff --git a/vnfs/DAaaS/microservices/collectd-operator/pkg/controller/collectdglobal/collectdglobal_controller.go b/vnfs/DAaaS/microservices/collectd-operator/pkg/controller/collectdglobal/collectdglobal_controller.go
index 0d3e2bbf..8c6023b8 100644
--- a/vnfs/DAaaS/microservices/collectd-operator/pkg/controller/collectdglobal/collectdglobal_controller.go
+++ b/vnfs/DAaaS/microservices/collectd-operator/pkg/controller/collectdglobal/collectdglobal_controller.go
@@ -12,6 +12,7 @@ import (
onapv1alpha1 "collectd-operator/pkg/apis/onap/v1alpha1"
collectdutils "collectd-operator/pkg/controller/utils"
+ dspredicate "collectd-operator/pkg/controller/utils"
dsutils "collectd-operator/pkg/controller/utils"
appsv1 "k8s.io/api/apps/v1"
@@ -69,22 +70,13 @@ func add(mgr manager.Manager, r reconcile.Reconciler) error {
log.Error(err, "Failed to get watch labels, continuing with default label")
}
rcp := r.(*ReconcileCollectdGlobal)
- // Select the Daemonset with labelSelector (Defautl is app=collectd)
+ // Select the Daemonset with labelSelector (Default is app=collectd)
if a.Meta.GetLabels()[labels[0]] == labels[1] {
var requests []reconcile.Request
cg, err := collectdutils.GetCollectdGlobal(rcp.client, a.Meta.GetNamespace())
if err != nil || cg == nil {
log.V(1).Info("No CollectdGlobal CR instance Exist")
- cpList, err := collectdutils.GetCollectdPluginList(rcp.client, a.Meta.GetNamespace())
- if err != nil || cpList == nil || cpList.Items == nil || len(cpList.Items) == 0 {
- log.V(1).Info("No CollectdPlugin CR instance Exist")
- return nil
- }
- for _, cp := range cpList.Items {
- requests = append(requests, reconcile.Request{
- NamespacedName: client.ObjectKey{Namespace: cp.Namespace, Name: cp.Name}})
- }
- return requests
+ return nil
}
requests = append(requests, reconcile.Request{
NamespacedName: client.ObjectKey{Namespace: cg.Namespace, Name: cg.Name}})
@@ -92,7 +84,7 @@ func add(mgr manager.Manager, r reconcile.Reconciler) error {
}
return nil
}),
- }, predicate.GenerationChangedPredicate{})
+ }, dspredicate.DaemonSetStatusChangedPredicate{})
if err != nil {
return err
}
@@ -224,10 +216,13 @@ func (r *ReconcileCollectdGlobal) handleCollectdGlobal(reqLogger logr.Logger, cr
panic(fmt.Errorf("Update failed: %v", retryErr))
}
- err := r.updateStatus(cr)
- if err != nil {
- reqLogger.Error(err, "Unable to update status")
+ retryErr = retry.RetryOnConflict(retry.DefaultRetry, func() error {
+ err := r.updateStatus(cr)
return err
+ })
+
+ if retryErr != nil {
+ panic(fmt.Errorf("Update failed: %v", retryErr))
}
// Reconcile success
reqLogger.Info("Reconcile success!!")
@@ -266,22 +261,29 @@ func (r *ReconcileCollectdGlobal) handleDelete(reqLogger logr.Logger, cr *onapv1
}
func (r *ReconcileCollectdGlobal) updateStatus(cr *onapv1alpha1.CollectdGlobal) error {
- switch cr.Status.Status {
+ // Fetch the CollectdGlobal instance
+ instance := &onapv1alpha1.CollectdGlobal{}
+ key := types.NamespacedName{Namespace: cr.Namespace, Name: cr.Name}
+ err := r.client.Get(context.TODO(), key, instance)
+ if err != nil {
+ return err
+ }
+ switch instance.Status.Status {
case onapv1alpha1.Initial:
- cr.Status.Status = onapv1alpha1.Created
+ instance.Status.Status = onapv1alpha1.Created
case onapv1alpha1.Created, onapv1alpha1.Enabled:
- pods, err := collectdutils.GetPodList(r.client, cr.Namespace)
+ pods, err := collectdutils.GetPodList(r.client, instance.Namespace)
if err != nil {
return err
}
- if !reflect.DeepEqual(pods, cr.Status.CollectdAgents) {
- cr.Status.CollectdAgents = pods
- cr.Status.Status = onapv1alpha1.Enabled
+ if !reflect.DeepEqual(pods, instance.Status.CollectdAgents) {
+ instance.Status.CollectdAgents = pods
+ instance.Status.Status = onapv1alpha1.Enabled
}
case onapv1alpha1.Deleting, onapv1alpha1.Deprecated:
return nil
}
- err := r.client.Status().Update(context.TODO(), cr)
+ err = r.client.Status().Update(context.TODO(), instance)
return err
}
@@ -301,11 +303,12 @@ func (r *ReconcileCollectdGlobal) addFinalizer(reqLogger logr.Logger, cr *onapv1
// Update status from Initial to Created
// Since addFinalizer will be executed only once,
// the status will be changed from Initial state to Created
- updateErr := r.updateStatus(cr)
- if updateErr != nil {
- reqLogger.Error(updateErr, "Failed to update status from Initial state")
- }
+ // updateErr := r.updateStatus(cr)
+ // if updateErr != nil {
+ // reqLogger.Error(updateErr, "Failed to update status from Initial state")
+ // }
// Update CR
+ cr.Status.Status = onapv1alpha1.Created
err := r.client.Update(context.TODO(), cr)
if err != nil {
reqLogger.Error(err, "Failed to update CollectdGlobal with finalizer")
diff --git a/vnfs/DAaaS/microservices/collectd-operator/pkg/controller/collectdplugin/collectdplugin_controller.go b/vnfs/DAaaS/microservices/collectd-operator/pkg/controller/collectdplugin/collectdplugin_controller.go
index 32775c5c..e0e62b87 100644
--- a/vnfs/DAaaS/microservices/collectd-operator/pkg/controller/collectdplugin/collectdplugin_controller.go
+++ b/vnfs/DAaaS/microservices/collectd-operator/pkg/controller/collectdplugin/collectdplugin_controller.go
@@ -4,17 +4,20 @@ import (
"context"
"fmt"
"reflect"
+ "strings"
"github.com/go-logr/logr"
"github.com/operator-framework/operator-sdk/pkg/predicate"
onapv1alpha1 "collectd-operator/pkg/apis/onap/v1alpha1"
collectdutils "collectd-operator/pkg/controller/utils"
+ dspredicate "collectd-operator/pkg/controller/utils"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/runtime"
+ "k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/util/retry"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller"
@@ -54,6 +57,35 @@ func add(mgr manager.Manager, r reconcile.Reconciler) error {
return err
}
+ log.V(1).Info("Add watcher for secondary resource Collectd Daemonset")
+ err = c.Watch(
+ &source.Kind{Type: &appsv1.DaemonSet{}},
+ &handler.EnqueueRequestsFromMapFunc{
+ ToRequests: handler.ToRequestsFunc(func(a handler.MapObject) []reconcile.Request {
+ labelSelector, err := collectdutils.GetWatchLabels()
+ labels := strings.Split(labelSelector, "=")
+ if err != nil {
+ log.Error(err, "Failed to get watch labels, continuing with default label")
+ }
+ rcp := r.(*ReconcileCollectdPlugin)
+ // Select the Daemonset with labelSelector (Default is app=collectd)
+ if a.Meta.GetLabels()[labels[0]] == labels[1] {
+ var requests []reconcile.Request
+ cpList, err := collectdutils.GetCollectdPluginList(rcp.client, a.Meta.GetNamespace())
+ if err != nil || cpList == nil || cpList.Items == nil || len(cpList.Items) == 0 {
+ log.V(1).Info("No CollectdPlugin CR instance Exist")
+ return nil
+ }
+ for _, cp := range cpList.Items {
+ requests = append(requests, reconcile.Request{
+ NamespacedName: client.ObjectKey{Namespace: cp.Namespace, Name: cp.Name}})
+ }
+ return requests
+ }
+ return nil
+ }),
+ }, dspredicate.DaemonSetStatusChangedPredicate{})
+
return nil
}
@@ -178,10 +210,13 @@ func (r *ReconcileCollectdPlugin) handleCollectdPlugin(reqLogger logr.Logger, cr
panic(fmt.Errorf("Update failed: %v", retryErr))
}
- err := r.updateStatus(cr)
- if err != nil {
- reqLogger.Error(err, "Unable to update status")
+ retryErr = retry.RetryOnConflict(retry.DefaultRetry, func() error {
+ err := r.updateStatus(cr)
return err
+ })
+
+ if retryErr != nil {
+ panic(fmt.Errorf("Update failed: %v", retryErr))
}
// Reconcile success
reqLogger.Info("Reconcile success!!")
@@ -220,22 +255,29 @@ func (r *ReconcileCollectdPlugin) handleDelete(reqLogger logr.Logger, cr *onapv1
}
func (r *ReconcileCollectdPlugin) updateStatus(cr *onapv1alpha1.CollectdPlugin) error {
- switch cr.Status.Status {
+ // Fetch the CollectdGlobal instance
+ instance := &onapv1alpha1.CollectdPlugin{}
+ key := types.NamespacedName{Namespace: cr.Namespace, Name: cr.Name}
+ err := r.client.Get(context.TODO(), key, instance)
+ if err != nil {
+ return err
+ }
+ switch instance.Status.Status {
case onapv1alpha1.Initial:
- cr.Status.Status = onapv1alpha1.Created
+ instance.Status.Status = onapv1alpha1.Created
case onapv1alpha1.Created, onapv1alpha1.Enabled:
- pods, err := collectdutils.GetPodList(r.client, cr.Namespace)
+ pods, err := collectdutils.GetPodList(r.client, instance.Namespace)
if err != nil {
return err
}
- if !reflect.DeepEqual(pods, cr.Status.CollectdAgents) {
- cr.Status.CollectdAgents = pods
- cr.Status.Status = onapv1alpha1.Enabled
+ if !reflect.DeepEqual(pods, instance.Status.CollectdAgents) {
+ instance.Status.CollectdAgents = pods
+ instance.Status.Status = onapv1alpha1.Enabled
}
case onapv1alpha1.Deleting, onapv1alpha1.Deprecated:
return nil
}
- err := r.client.Status().Update(context.TODO(), cr)
+ err = r.client.Status().Update(context.TODO(), instance)
return err
}
@@ -255,11 +297,12 @@ func (r *ReconcileCollectdPlugin) addFinalizer(reqLogger logr.Logger, cr *onapv1
// Update status from Initial to Created
// Since addFinalizer will be executed only once,
// the status will be changed from Initial state to Created
- updateErr := r.updateStatus(cr)
- if updateErr != nil {
- reqLogger.Error(updateErr, "Failed to update status from Initial state")
- }
+ // updateErr := r.updateStatus(cr)
+ // if updateErr != nil {
+ // reqLogger.Error(updateErr, "Failed to update status from Initial state")
+ // }
// Update CR
+ cr.Status.Status = onapv1alpha1.Created
err := r.client.Update(context.TODO(), cr)
if err != nil {
reqLogger.Error(err, "Failed to update CollectdPlugin with finalizer")
diff --git a/vnfs/DAaaS/microservices/collectd-operator/pkg/controller/utils/collectdutils.go b/vnfs/DAaaS/microservices/collectd-operator/pkg/controller/utils/collectdutils.go
index 17cad0e5..b379d916 100644
--- a/vnfs/DAaaS/microservices/collectd-operator/pkg/controller/utils/collectdutils.go
+++ b/vnfs/DAaaS/microservices/collectd-operator/pkg/controller/utils/collectdutils.go
@@ -211,7 +211,7 @@ func RebuildCollectdConf(rc client.Client, ns string, isDelete bool, delPlugin s
collectdConf += cpConf + "\n"
}
- collectdConf += "#Last line (collectd requires ā€˜\\nā€™ at the last line)\n"
+ collectdConf += "#Last line (collectd requires '\\n' at the last line)\n"
return collectdConf, nil
}
diff --git a/vnfs/DAaaS/microservices/collectd-operator/pkg/controller/utils/predicate.go b/vnfs/DAaaS/microservices/collectd-operator/pkg/controller/utils/predicate.go
new file mode 100644
index 00000000..a9ec1dc4
--- /dev/null
+++ b/vnfs/DAaaS/microservices/collectd-operator/pkg/controller/utils/predicate.go
@@ -0,0 +1,65 @@
+// Copyright 2018 The Operator-SDK Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package utils
+
+import (
+ appsv1 "k8s.io/api/apps/v1"
+
+ "sigs.k8s.io/controller-runtime/pkg/event"
+ "sigs.k8s.io/controller-runtime/pkg/predicate"
+ logf "sigs.k8s.io/controller-runtime/pkg/runtime/log"
+)
+
+var plog = logf.Log.WithName("predicate").WithName("eventFilters")
+
+// DaemonSetStatusChangedPredicate implements a default update predicate function on status change for Daemonsets
+// (adapted from sigs.k8s.io/controller-runtime/pkg/predicate/predicate.GenerationChangedPredicate)
+type DaemonSetStatusChangedPredicate struct {
+ predicate.Funcs
+}
+
+// Update implements default UpdateEvent filter for validating generation change
+func (DaemonSetStatusChangedPredicate) Update(e event.UpdateEvent) bool {
+ newDS := e.ObjectNew.DeepCopyObject().(*appsv1.DaemonSet)
+ oldDS := e.ObjectOld.DeepCopyObject().(*appsv1.DaemonSet)
+ plog.V(2).Info("newDS", "nUNS:=", newDS.Status.UpdatedNumberScheduled, "oUNS:=", oldDS.Status.UpdatedNumberScheduled, "nDNS:=", newDS.Status.DesiredNumberScheduled, "nNR:=", newDS.Status.NumberReady, "nNA:=", newDS.Status.NumberAvailable)
+ if newDS.Status.UpdatedNumberScheduled >= oldDS.Status.UpdatedNumberScheduled {
+ if (newDS.Status.UpdatedNumberScheduled == newDS.Status.NumberReady) &&
+ (newDS.Status.UpdatedNumberScheduled == newDS.Status.NumberAvailable) {
+ return true
+ }
+ }
+ if e.MetaOld == nil {
+ plog.Error(nil, "Update event has no old metadata", "event", e)
+ return false
+ }
+ if e.ObjectOld == nil {
+ plog.Error(nil, "Update event has no old runtime object to update", "event", e)
+ return false
+ }
+ if e.ObjectNew == nil {
+ plog.Error(nil, "Update event has no new runtime object for update", "event", e)
+ return false
+ }
+ if e.MetaNew == nil {
+ plog.Error(nil, "Update event has no new metadata", "event", e)
+ return false
+ }
+ if e.MetaNew.GetGeneration() == e.MetaOld.GetGeneration() {
+ return false
+ }
+
+ return true
+}