summaryrefslogtreecommitdiffstats
path: root/src/monitor/pkg/controller/resourcebundlestate
diff options
context:
space:
mode:
authorSrivahni Chivukula <srivahni.chivukula@intel.com>2020-04-07 17:52:05 -0700
committerRitu Sood <ritu.sood@intel.com>2020-06-16 20:06:29 +0000
commit964db6a95cfdd82969f6af5a09822929a1862408 (patch)
treeb8f31df98b537c32763a2c409d07eaec153f9372 /src/monitor/pkg/controller/resourcebundlestate
parentdd6613ec4e4bbe79699f6b5802334f968dfb8306 (diff)
Status operator to update status of resources
This operator monitors the status of resources like pods, services, deployments, daemonsets, configmaps etc. and updates the status in the CR accordingly. Issue-ID: MULTICLOUD-1047 Signed-off-by: Srivahni Chivukula <srivahni.chivukula@intel.com> Change-Id: I7d92584a44c8add2df69f2985140a55b460ac037
Diffstat (limited to 'src/monitor/pkg/controller/resourcebundlestate')
-rw-r--r--src/monitor/pkg/controller/resourcebundlestate/configMap_controller.go179
-rw-r--r--src/monitor/pkg/controller/resourcebundlestate/configMap_predicate.go44
-rw-r--r--src/monitor/pkg/controller/resourcebundlestate/controller.go224
-rw-r--r--src/monitor/pkg/controller/resourcebundlestate/daemonSet_controller.go182
-rw-r--r--src/monitor/pkg/controller/resourcebundlestate/daemonSet_predicate.go44
-rw-r--r--src/monitor/pkg/controller/resourcebundlestate/deployment_controller.go182
-rw-r--r--src/monitor/pkg/controller/resourcebundlestate/deployment_predicate.go44
-rw-r--r--src/monitor/pkg/controller/resourcebundlestate/helpers.go6
-rw-r--r--src/monitor/pkg/controller/resourcebundlestate/ingress_controller.go182
-rw-r--r--src/monitor/pkg/controller/resourcebundlestate/ingress_predicate.go44
-rw-r--r--src/monitor/pkg/controller/resourcebundlestate/job_controller.go182
-rw-r--r--src/monitor/pkg/controller/resourcebundlestate/job_predicate.go44
-rw-r--r--src/monitor/pkg/controller/resourcebundlestate/pod_controller.go2
-rw-r--r--src/monitor/pkg/controller/resourcebundlestate/secret_controller.go179
-rw-r--r--src/monitor/pkg/controller/resourcebundlestate/secret_predicate.go44
-rw-r--r--src/monitor/pkg/controller/resourcebundlestate/service_controller.go182
-rw-r--r--src/monitor/pkg/controller/resourcebundlestate/service_predicate.go44
-rw-r--r--src/monitor/pkg/controller/resourcebundlestate/statefulSet_controller.go182
-rw-r--r--src/monitor/pkg/controller/resourcebundlestate/statefulSet_predicate.go44
19 files changed, 2028 insertions, 6 deletions
diff --git a/src/monitor/pkg/controller/resourcebundlestate/configMap_controller.go b/src/monitor/pkg/controller/resourcebundlestate/configMap_controller.go
new file mode 100644
index 00000000..f93355af
--- /dev/null
+++ b/src/monitor/pkg/controller/resourcebundlestate/configMap_controller.go
@@ -0,0 +1,179 @@
+package resourcebundlestate
+
+import (
+ "context"
+ "log"
+
+ "github.com/onap/multicloud-k8s/src/monitor/pkg/apis/k8splugin/v1alpha1"
+
+ corev1 "k8s.io/api/core/v1"
+ k8serrors "k8s.io/apimachinery/pkg/api/errors"
+ "k8s.io/apimachinery/pkg/types"
+ "sigs.k8s.io/controller-runtime/pkg/client"
+ "sigs.k8s.io/controller-runtime/pkg/controller"
+ "sigs.k8s.io/controller-runtime/pkg/handler"
+ "sigs.k8s.io/controller-runtime/pkg/manager"
+ "sigs.k8s.io/controller-runtime/pkg/reconcile"
+ "sigs.k8s.io/controller-runtime/pkg/source"
+)
+
+// AddConfigMapController the new controller to the controller manager
+func AddConfigMapController(mgr manager.Manager) error {
+ return addConfigMapController(mgr, newConfigMapReconciler(mgr))
+}
+
+func addConfigMapController(mgr manager.Manager, r *configMapReconciler) error {
+ // Create a new controller
+ c, err := controller.New("ConfigMap-controller", mgr, controller.Options{Reconciler: r})
+ if err != nil {
+ return err
+ }
+
+ // Watch for changes to secondar resource ConfigMaps
+ // Predicate filters Service which don't have the k8splugin label
+ err = c.Watch(&source.Kind{Type: &corev1.ConfigMap{}}, &handler.EnqueueRequestForObject{}, &configMapPredicate{})
+ if err != nil {
+ return err
+ }
+
+ return nil
+}
+
+func newConfigMapReconciler(m manager.Manager) *configMapReconciler {
+ return &configMapReconciler{client: m.GetClient()}
+}
+
+type configMapReconciler struct {
+ client client.Client
+}
+
+// Reconcile implements the loop that will update the ResourceBundleState CR
+// whenever we get any updates from all the ConfigMaps we watch.
+func (r *configMapReconciler) Reconcile(req reconcile.Request) (reconcile.Result, error) {
+ log.Printf("Updating ResourceBundleState for ConfigMap: %+v\n", req)
+
+ cm := &corev1.ConfigMap{}
+ err := r.client.Get(context.TODO(), req.NamespacedName, cm)
+ if err != nil {
+ if k8serrors.IsNotFound(err) {
+ log.Printf("ConfigMap not found: %+v. Remove from CR if it is stored there.\n", req.NamespacedName)
+ // Remove the ConfigMap's status from StatusList
+ // This can happen if we get the DeletionTimeStamp event
+ // after the ConfigMap has been deleted.
+ r.deleteConfigMapFromAllCRs(req.NamespacedName)
+ return reconcile.Result{}, nil
+ }
+ log.Printf("Failed to get ConfigMap: %+v\n", req.NamespacedName)
+ return reconcile.Result{}, err
+ }
+
+ // Find the CRs which track this ConfigMap via the labelselector
+ crSelector := returnLabel(cm.GetLabels())
+ if crSelector == nil {
+ log.Println("We should not be here. The predicate should have filtered this ConfigMap")
+ }
+
+ // Get the CRs which have this label and update them all
+ // Ideally, we will have only one CR, but there is nothing
+ // preventing the creation of multiple.
+ // TODO: Consider using an admission validating webook to prevent multiple
+ rbStatusList := &v1alpha1.ResourceBundleStateList{}
+ err = listResources(r.client, req.Namespace, crSelector, rbStatusList)
+ if err != nil || len(rbStatusList.Items) == 0 {
+ log.Printf("Did not find any CRs tracking this resource\n")
+ return reconcile.Result{}, nil
+ }
+
+ err = r.updateCRs(rbStatusList, cm)
+ if err != nil {
+ // Requeue the update
+ return reconcile.Result{}, err
+ }
+
+ return reconcile.Result{}, nil
+}
+
+// deleteConfigMapFromAllCRs deletes ConfigMap status from all the CRs when the ConfigMap itself has been deleted
+// and we have not handled the updateCRs yet.
+// Since, we don't have the ConfigMap's labels, we need to look at all the CRs in this namespace
+func (r *configMapReconciler) deleteConfigMapFromAllCRs(namespacedName types.NamespacedName) error {
+
+ rbStatusList := &v1alpha1.ResourceBundleStateList{}
+ err := listResources(r.client, namespacedName.Namespace, nil, rbStatusList)
+ if err != nil || len(rbStatusList.Items) == 0 {
+ log.Printf("Did not find any CRs tracking this resource\n")
+ return nil
+ }
+ for _, cr := range rbStatusList.Items {
+ r.deleteFromSingleCR(&cr, namespacedName.Name)
+ }
+
+ return nil
+}
+
+func (r *configMapReconciler) updateCRs(crl *v1alpha1.ResourceBundleStateList, cm *corev1.ConfigMap) error {
+
+ for _, cr := range crl.Items {
+ // ConfigMap is not scheduled for deletion
+ if cm.DeletionTimestamp == nil {
+ err := r.updateSingleCR(&cr, cm)
+ if err != nil {
+ return err
+ }
+ } else {
+ // ConfigMap is scheduled for deletion
+ r.deleteFromSingleCR(&cr, cm.Name)
+ }
+ }
+
+ return nil
+}
+
+func (r *configMapReconciler) deleteFromSingleCR(cr *v1alpha1.ResourceBundleState, name string) error {
+ cr.Status.ResourceCount--
+ length := len(cr.Status.ConfigMapStatuses)
+ for i, rstatus := range cr.Status.ConfigMapStatuses {
+ if rstatus.Name == name {
+ //Delete that status from the array
+ cr.Status.ConfigMapStatuses[i] = cr.Status.ConfigMapStatuses[length-1]
+ cr.Status.ConfigMapStatuses = cr.Status.ConfigMapStatuses[:length-1]
+ return nil
+ }
+ }
+
+ log.Println("Did not find a status for ConfigMapStatuses in CR")
+ return nil
+}
+
+func (r *configMapReconciler) updateSingleCR(cr *v1alpha1.ResourceBundleState, cm *corev1.ConfigMap) error {
+
+ // Update status after searching for it in the list of resourceStatuses
+ for _, rstatus := range cr.Status.ConfigMapStatuses {
+ // Look for the status if we already have it in the CR
+ if rstatus.Name == cm.Name {
+ err := r.client.Status().Update(context.TODO(), cr)
+ if err != nil {
+ log.Printf("failed to update rbstate: %v\n", err)
+ return err
+ }
+ return nil
+ }
+ }
+
+ // Exited for loop with no status found
+ // Increment the number of tracked resources
+ cr.Status.ResourceCount++
+
+ // Add it to CR
+ cr.Status.ConfigMapStatuses = append(cr.Status.ConfigMapStatuses, corev1.ConfigMap{
+ ObjectMeta: cm.ObjectMeta,
+ })
+
+ err := r.client.Status().Update(context.TODO(), cr)
+ if err != nil {
+ log.Printf("failed to update rbstate: %v\n", err)
+ return err
+ }
+
+ return nil
+}
diff --git a/src/monitor/pkg/controller/resourcebundlestate/configMap_predicate.go b/src/monitor/pkg/controller/resourcebundlestate/configMap_predicate.go
new file mode 100644
index 00000000..b9b17738
--- /dev/null
+++ b/src/monitor/pkg/controller/resourcebundlestate/configMap_predicate.go
@@ -0,0 +1,44 @@
+package resourcebundlestate
+
+import (
+ "sigs.k8s.io/controller-runtime/pkg/event"
+)
+
+type configMapPredicate struct {
+}
+
+func (c *configMapPredicate) Create(evt event.CreateEvent) bool {
+
+ if evt.Meta == nil {
+ return false
+ }
+
+ labels := evt.Meta.GetLabels()
+ return checkLabel(labels)
+}
+
+func (c *configMapPredicate) Delete(evt event.DeleteEvent) bool {
+
+ if evt.Meta == nil {
+ return false
+ }
+
+ labels := evt.Meta.GetLabels()
+ return checkLabel(labels)
+}
+
+func (c *configMapPredicate) Update(evt event.UpdateEvent) bool {
+
+ if evt.MetaNew == nil {
+ return false
+ }
+
+ labels := evt.MetaNew.GetLabels()
+ return checkLabel(labels)
+}
+
+func (c *configMapPredicate) Generic(evt event.GenericEvent) bool {
+
+ labels := evt.Meta.GetLabels()
+ return checkLabel(labels)
+}
diff --git a/src/monitor/pkg/controller/resourcebundlestate/controller.go b/src/monitor/pkg/controller/resourcebundlestate/controller.go
index 71765e97..7206116b 100644
--- a/src/monitor/pkg/controller/resourcebundlestate/controller.go
+++ b/src/monitor/pkg/controller/resourcebundlestate/controller.go
@@ -6,7 +6,10 @@ import (
"github.com/onap/multicloud-k8s/src/monitor/pkg/apis/k8splugin/v1alpha1"
+ appsv1 "k8s.io/api/apps/v1"
+ v1 "k8s.io/api/batch/v1"
corev1 "k8s.io/api/core/v1"
+ v1beta1 "k8s.io/api/extensions/v1beta1"
k8serrors "k8s.io/apimachinery/pkg/api/errors"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller"
@@ -70,7 +73,49 @@ func (r *reconciler) Reconcile(req reconcile.Request) (reconcile.Result, error)
err = r.updateServices(rbstate, rbstate.Spec.Selector.MatchLabels)
if err != nil {
- log.Printf("Error adding services: %v\n", err)
+ log.Printf("Error adding servicestatuses: %v\n", err)
+ return reconcile.Result{}, err
+ }
+
+ err = r.updateConfigMaps(rbstate, rbstate.Spec.Selector.MatchLabels)
+ if err != nil {
+ log.Printf("Error adding configmapstatuses: %v\n", err)
+ return reconcile.Result{}, err
+ }
+
+ err = r.updateDeployments(rbstate, rbstate.Spec.Selector.MatchLabels)
+ if err != nil {
+ log.Printf("Error adding deploymentstatuses: %v\n", err)
+ return reconcile.Result{}, err
+ }
+
+ err = r.updateSecrets(rbstate, rbstate.Spec.Selector.MatchLabels)
+ if err != nil {
+ log.Printf("Error adding secretstatuses: %v\n", err)
+ return reconcile.Result{}, err
+ }
+
+ err = r.updateDaemonSets(rbstate, rbstate.Spec.Selector.MatchLabels)
+ if err != nil {
+ log.Printf("Error adding daemonSetstatuses: %v\n", err)
+ return reconcile.Result{}, err
+ }
+
+ err = r.updateIngresses(rbstate, rbstate.Spec.Selector.MatchLabels)
+ if err != nil {
+ log.Printf("Error adding ingressStatuses: %v\n", err)
+ return reconcile.Result{}, err
+ }
+
+ err = r.updateJobs(rbstate, rbstate.Spec.Selector.MatchLabels)
+ if err != nil {
+ log.Printf("Error adding jobstatuses: %v\n", err)
+ return reconcile.Result{}, err
+ }
+
+ err = r.updateStatefulSets(rbstate, rbstate.Spec.Selector.MatchLabels)
+ if err != nil {
+ log.Printf("Error adding statefulSetstatuses: %v\n", err)
return reconcile.Result{}, err
}
@@ -96,7 +141,16 @@ func (r *reconciler) updateServices(rbstate *v1alpha1.ResourceBundleState,
return err
}
- rbstate.Status.ServiceStatuses = serviceList.Items
+ rbstate.Status.ServiceStatuses = []corev1.Service{}
+
+ for _, svc := range serviceList.Items {
+ resStatus := corev1.Service{
+ ObjectMeta: svc.ObjectMeta,
+ Status: svc.Status,
+ }
+ rbstate.Status.ServiceStatuses = append(rbstate.Status.ServiceStatuses, resStatus)
+ }
+
return nil
}
@@ -124,3 +178,169 @@ func (r *reconciler) updatePods(rbstate *v1alpha1.ResourceBundleState,
return nil
}
+
+func (r *reconciler) updateConfigMaps(rbstate *v1alpha1.ResourceBundleState,
+ selectors map[string]string) error {
+
+ // Update the CR with the ConfigMaps created as well
+ configMapList := &corev1.ConfigMapList{}
+ err := listResources(r.client, rbstate.Namespace, selectors, configMapList)
+ if err != nil {
+ log.Printf("Failed to list configMaps: %v", err)
+ return err
+ }
+
+ rbstate.Status.ConfigMapStatuses = []corev1.ConfigMap{}
+
+ for _, cm := range configMapList.Items {
+ resStatus := corev1.ConfigMap{
+ ObjectMeta: cm.ObjectMeta,
+ }
+ rbstate.Status.ConfigMapStatuses = append(rbstate.Status.ConfigMapStatuses, resStatus)
+ }
+
+ return nil
+}
+
+func (r *reconciler) updateDeployments(rbstate *v1alpha1.ResourceBundleState,
+ selectors map[string]string) error {
+
+ // Update the CR with the Deployments created as well
+ deploymentList := &appsv1.DeploymentList{}
+ err := listResources(r.client, rbstate.Namespace, selectors, deploymentList)
+ if err != nil {
+ log.Printf("Failed to list deployments: %v", err)
+ return err
+ }
+
+ rbstate.Status.DeploymentStatuses = []appsv1.Deployment{}
+
+ for _, dep := range deploymentList.Items {
+ resStatus := appsv1.Deployment{
+ ObjectMeta: dep.ObjectMeta,
+ Status: dep.Status,
+ }
+ rbstate.Status.DeploymentStatuses = append(rbstate.Status.DeploymentStatuses, resStatus)
+ }
+
+ return nil
+}
+
+func (r *reconciler) updateSecrets(rbstate *v1alpha1.ResourceBundleState,
+ selectors map[string]string) error {
+
+ // Update the CR with the Secrets created as well
+ secretList := &corev1.SecretList{}
+ err := listResources(r.client, rbstate.Namespace, selectors, secretList)
+ if err != nil {
+ log.Printf("Failed to list secrets: %v", err)
+ return err
+ }
+
+ rbstate.Status.SecretStatuses = []corev1.Secret{}
+
+ for _, sec := range secretList.Items {
+ resStatus := corev1.Secret{
+ ObjectMeta: sec.ObjectMeta,
+ }
+ rbstate.Status.SecretStatuses = append(rbstate.Status.SecretStatuses, resStatus)
+ }
+
+ return nil
+}
+
+func (r *reconciler) updateDaemonSets(rbstate *v1alpha1.ResourceBundleState,
+ selectors map[string]string) error {
+
+ // Update the CR with the DaemonSets created as well
+ daemonSetList := &appsv1.DaemonSetList{}
+ err := listResources(r.client, rbstate.Namespace, selectors, daemonSetList)
+ if err != nil {
+ log.Printf("Failed to list DaemonSets: %v", err)
+ return err
+ }
+
+ rbstate.Status.DaemonSetStatuses = []appsv1.DaemonSet{}
+
+ for _, ds := range daemonSetList.Items {
+ resStatus := appsv1.DaemonSet{
+ ObjectMeta: ds.ObjectMeta,
+ Status: ds.Status,
+ }
+ rbstate.Status.DaemonSetStatuses = append(rbstate.Status.DaemonSetStatuses, resStatus)
+ }
+
+ return nil
+}
+
+func (r *reconciler) updateIngresses(rbstate *v1alpha1.ResourceBundleState,
+ selectors map[string]string) error {
+
+ // Update the CR with the Ingresses created as well
+ ingressList := &v1beta1.IngressList{}
+ err := listResources(r.client, rbstate.Namespace, selectors, ingressList)
+ if err != nil {
+ log.Printf("Failed to list ingresses: %v", err)
+ return err
+ }
+
+ rbstate.Status.IngressStatuses = []v1beta1.Ingress{}
+
+ for _, ing := range ingressList.Items {
+ resStatus := v1beta1.Ingress{
+ ObjectMeta: ing.ObjectMeta,
+ Status: ing.Status,
+ }
+ rbstate.Status.IngressStatuses = append(rbstate.Status.IngressStatuses, resStatus)
+ }
+
+ return nil
+}
+
+func (r *reconciler) updateJobs(rbstate *v1alpha1.ResourceBundleState,
+ selectors map[string]string) error {
+
+ // Update the CR with the Services created as well
+ jobList := &v1.JobList{}
+ err := listResources(r.client, rbstate.Namespace, selectors, jobList)
+ if err != nil {
+ log.Printf("Failed to list jobs: %v", err)
+ return err
+ }
+
+ rbstate.Status.JobStatuses = []v1.Job{}
+
+ for _, job := range jobList.Items {
+ resStatus := v1.Job{
+ ObjectMeta: job.ObjectMeta,
+ Status: job.Status,
+ }
+ rbstate.Status.JobStatuses = append(rbstate.Status.JobStatuses, resStatus)
+ }
+
+ return nil
+}
+
+func (r *reconciler) updateStatefulSets(rbstate *v1alpha1.ResourceBundleState,
+ selectors map[string]string) error {
+
+ // Update the CR with the StatefulSets created as well
+ statefulSetList := &appsv1.StatefulSetList{}
+ err := listResources(r.client, rbstate.Namespace, selectors, statefulSetList)
+ if err != nil {
+ log.Printf("Failed to list statefulSets: %v", err)
+ return err
+ }
+
+ rbstate.Status.StatefulSetStatuses = []appsv1.StatefulSet{}
+
+ for _, sfs := range statefulSetList.Items {
+ resStatus := appsv1.StatefulSet{
+ ObjectMeta: sfs.ObjectMeta,
+ Status: sfs.Status,
+ }
+ rbstate.Status.StatefulSetStatuses = append(rbstate.Status.StatefulSetStatuses, resStatus)
+ }
+
+ return nil
+}
diff --git a/src/monitor/pkg/controller/resourcebundlestate/daemonSet_controller.go b/src/monitor/pkg/controller/resourcebundlestate/daemonSet_controller.go
new file mode 100644
index 00000000..3ccb40ce
--- /dev/null
+++ b/src/monitor/pkg/controller/resourcebundlestate/daemonSet_controller.go
@@ -0,0 +1,182 @@
+package resourcebundlestate
+
+import (
+ "context"
+ "log"
+
+ "github.com/onap/multicloud-k8s/src/monitor/pkg/apis/k8splugin/v1alpha1"
+
+ appsv1 "k8s.io/api/apps/v1"
+ k8serrors "k8s.io/apimachinery/pkg/api/errors"
+ "k8s.io/apimachinery/pkg/types"
+ "sigs.k8s.io/controller-runtime/pkg/client"
+ "sigs.k8s.io/controller-runtime/pkg/controller"
+ "sigs.k8s.io/controller-runtime/pkg/handler"
+ "sigs.k8s.io/controller-runtime/pkg/manager"
+ "sigs.k8s.io/controller-runtime/pkg/reconcile"
+ "sigs.k8s.io/controller-runtime/pkg/source"
+)
+
+// AddDaemonSetController the new controller to the controller manager
+func AddDaemonSetController(mgr manager.Manager) error {
+ return addDaemonSetController(mgr, newDaemonSetReconciler(mgr))
+}
+
+func addDaemonSetController(mgr manager.Manager, r *daemonSetReconciler) error {
+ // Create a new controller
+ c, err := controller.New("Daemonset-controller", mgr, controller.Options{Reconciler: r})
+ if err != nil {
+ return err
+ }
+
+ // Watch for changes to secondar resource DaemonSets
+ // Predicate filters DaemonSets which don't have the k8splugin label
+ err = c.Watch(&source.Kind{Type: &appsv1.DaemonSet{}}, &handler.EnqueueRequestForObject{}, &daemonSetPredicate{})
+ if err != nil {
+ return err
+ }
+
+ return nil
+}
+
+func newDaemonSetReconciler(m manager.Manager) *daemonSetReconciler {
+ return &daemonSetReconciler{client: m.GetClient()}
+}
+
+type daemonSetReconciler struct {
+ client client.Client
+}
+
+// Reconcile implements the loop that will update the ResourceBundleState CR
+// whenever we get any updates from all the daemonSets we watch.
+func (r *daemonSetReconciler) Reconcile(req reconcile.Request) (reconcile.Result, error) {
+ log.Printf("Updating ResourceBundleState for DaemonSet: %+v\n", req)
+
+ ds := &appsv1.DaemonSet{}
+ err := r.client.Get(context.TODO(), req.NamespacedName, ds)
+ if err != nil {
+ if k8serrors.IsNotFound(err) {
+ log.Printf("DaemonSet not found: %+v. Remove from CR if it is stored there.\n", req.NamespacedName)
+ // Remove the DaemonSet's status from StatusList
+ // This can happen if we get the DeletionTimeStamp event
+ // after the DaemonSet has been deleted.
+ r.deleteDaemonSetFromAllCRs(req.NamespacedName)
+ return reconcile.Result{}, nil
+ }
+ log.Printf("Failed to get daemonSet: %+v\n", req.NamespacedName)
+ return reconcile.Result{}, err
+ }
+
+ // Find the CRs which track this daemonSet via the labelselector
+ crSelector := returnLabel(ds.GetLabels())
+ if crSelector == nil {
+ log.Println("We should not be here. The predicate should have filtered this DaemonSet")
+ }
+
+ // Get the CRs which have this label and update them all
+ // Ideally, we will have only one CR, but there is nothing
+ // preventing the creation of multiple.
+ // TODO: Consider using an admission validating webook to prevent multiple
+ rbStatusList := &v1alpha1.ResourceBundleStateList{}
+ err = listResources(r.client, req.Namespace, crSelector, rbStatusList)
+ if err != nil || len(rbStatusList.Items) == 0 {
+ log.Printf("Did not find any CRs tracking this resource\n")
+ return reconcile.Result{}, nil
+ }
+
+ err = r.updateCRs(rbStatusList, ds)
+ if err != nil {
+ // Requeue the update
+ return reconcile.Result{}, err
+ }
+
+ return reconcile.Result{}, nil
+}
+
+// deleteDaemonSetFromAllCRs deletes daemonSet status from all the CRs when the DaemonSet itself has been deleted
+// and we have not handled the updateCRs yet.
+// Since, we don't have the daemonSet's labels, we need to look at all the CRs in this namespace
+func (r *daemonSetReconciler) deleteDaemonSetFromAllCRs(namespacedName types.NamespacedName) error {
+
+ rbStatusList := &v1alpha1.ResourceBundleStateList{}
+ err := listResources(r.client, namespacedName.Namespace, nil, rbStatusList)
+ if err != nil || len(rbStatusList.Items) == 0 {
+ log.Printf("Did not find any CRs tracking this resource\n")
+ return nil
+ }
+ for _, cr := range rbStatusList.Items {
+ r.deleteFromSingleCR(&cr, namespacedName.Name)
+ }
+
+ return nil
+}
+
+func (r *daemonSetReconciler) updateCRs(crl *v1alpha1.ResourceBundleStateList, ds *appsv1.DaemonSet) error {
+
+ for _, cr := range crl.Items {
+ // DaemonSet is not scheduled for deletion
+ if ds.DeletionTimestamp == nil {
+ err := r.updateSingleCR(&cr, ds)
+ if err != nil {
+ return err
+ }
+ } else {
+ // DaemonSet is scheduled for deletion
+ r.deleteFromSingleCR(&cr, ds.Name)
+ }
+ }
+
+ return nil
+}
+
+func (r *daemonSetReconciler) deleteFromSingleCR(cr *v1alpha1.ResourceBundleState, name string) error {
+ cr.Status.ResourceCount--
+ length := len(cr.Status.DaemonSetStatuses)
+ for i, rstatus := range cr.Status.DaemonSetStatuses {
+ if rstatus.Name == name {
+ //Delete that status from the array
+ cr.Status.DaemonSetStatuses[i] = cr.Status.DaemonSetStatuses[length-1]
+ cr.Status.DaemonSetStatuses[length-1].Status = appsv1.DaemonSetStatus{}
+ cr.Status.DaemonSetStatuses = cr.Status.DaemonSetStatuses[:length-1]
+ return nil
+ }
+ }
+
+ log.Println("Did not find a status for DaemonSet in CR")
+ return nil
+}
+
+func (r *daemonSetReconciler) updateSingleCR(cr *v1alpha1.ResourceBundleState, ds *appsv1.DaemonSet) error {
+
+ // Update status after searching for it in the list of resourceStatuses
+ for i, rstatus := range cr.Status.DaemonSetStatuses {
+ // Look for the status if we already have it in the CR
+ if rstatus.Name == ds.Name {
+ ds.Status.DeepCopyInto(&cr.Status.DaemonSetStatuses[i].Status)
+ err := r.client.Status().Update(context.TODO(), cr)
+ if err != nil {
+ log.Printf("failed to update rbstate: %v\n", err)
+ return err
+ }
+ return nil
+ }
+ }
+
+ // Exited for loop with no status found
+ // Increment the number of tracked resources
+ cr.Status.ResourceCount++
+
+ // Add it to CR
+ cr.Status.DaemonSetStatuses = append(cr.Status.DaemonSetStatuses, appsv1.DaemonSet{
+ ObjectMeta: ds.ObjectMeta,
+ Status: ds.Status,
+ })
+
+ err := r.client.Status().Update(context.TODO(), cr)
+ if err != nil {
+ log.Printf("failed to update rbstate: %v\n", err)
+ return err
+ }
+
+ return nil
+}
diff --git a/src/monitor/pkg/controller/resourcebundlestate/daemonSet_predicate.go b/src/monitor/pkg/controller/resourcebundlestate/daemonSet_predicate.go
new file mode 100644
index 00000000..16a8bc54
--- /dev/null
+++ b/src/monitor/pkg/controller/resourcebundlestate/daemonSet_predicate.go
@@ -0,0 +1,44 @@
+package resourcebundlestate
+
+import (
+ "sigs.k8s.io/controller-runtime/pkg/event"
+)
+
+type daemonSetPredicate struct {
+}
+
+func (d *daemonSetPredicate) Create(evt event.CreateEvent) bool {
+
+ if evt.Meta == nil {
+ return false
+ }
+
+ labels := evt.Meta.GetLabels()
+ return checkLabel(labels)
+}
+
+func (d *daemonSetPredicate) Delete(evt event.DeleteEvent) bool {
+
+ if evt.Meta == nil {
+ return false
+ }
+
+ labels := evt.Meta.GetLabels()
+ return checkLabel(labels)
+}
+
+func (d *daemonSetPredicate) Update(evt event.UpdateEvent) bool {
+
+ if evt.MetaNew == nil {
+ return false
+ }
+
+ labels := evt.MetaNew.GetLabels()
+ return checkLabel(labels)
+}
+
+func (d *daemonSetPredicate) Generic(evt event.GenericEvent) bool {
+
+ labels := evt.Meta.GetLabels()
+ return checkLabel(labels)
+}
diff --git a/src/monitor/pkg/controller/resourcebundlestate/deployment_controller.go b/src/monitor/pkg/controller/resourcebundlestate/deployment_controller.go
new file mode 100644
index 00000000..c563ed77
--- /dev/null
+++ b/src/monitor/pkg/controller/resourcebundlestate/deployment_controller.go
@@ -0,0 +1,182 @@
+package resourcebundlestate
+
+import (
+ "context"
+ "log"
+
+ "github.com/onap/multicloud-k8s/src/monitor/pkg/apis/k8splugin/v1alpha1"
+
+ appsv1 "k8s.io/api/apps/v1"
+ k8serrors "k8s.io/apimachinery/pkg/api/errors"
+ "k8s.io/apimachinery/pkg/types"
+ "sigs.k8s.io/controller-runtime/pkg/client"
+ "sigs.k8s.io/controller-runtime/pkg/controller"
+ "sigs.k8s.io/controller-runtime/pkg/handler"
+ "sigs.k8s.io/controller-runtime/pkg/manager"
+ "sigs.k8s.io/controller-runtime/pkg/reconcile"
+ "sigs.k8s.io/controller-runtime/pkg/source"
+)
+
+// AddDeploymentController the new controller to the controller manager
+func AddDeploymentController(mgr manager.Manager) error {
+ return addDeploymentController(mgr, newDeploymentReconciler(mgr))
+}
+
+func addDeploymentController(mgr manager.Manager, r *deploymentReconciler) error {
+ // Create a new controller
+ c, err := controller.New("Deployment-controller", mgr, controller.Options{Reconciler: r})
+ if err != nil {
+ return err
+ }
+
+ // Watch for changes to secondar resource Deployments
+ // Predicate filters Deployment which don't have the k8splugin label
+ err = c.Watch(&source.Kind{Type: &appsv1.Deployment{}}, &handler.EnqueueRequestForObject{}, &deploymentPredicate{})
+ if err != nil {
+ return err
+ }
+
+ return nil
+}
+
+func newDeploymentReconciler(m manager.Manager) *deploymentReconciler {
+ return &deploymentReconciler{client: m.GetClient()}
+}
+
+type deploymentReconciler struct {
+ client client.Client
+}
+
+// Reconcile implements the loop that will update the ResourceBundleState CR
+// whenever we get any updates from all the deployments we watch.
+func (r *deploymentReconciler) Reconcile(req reconcile.Request) (reconcile.Result, error) {
+ log.Printf("Updating ResourceBundleState for Deployment: %+v\n", req)
+
+ dep := &appsv1.Deployment{}
+ err := r.client.Get(context.TODO(), req.NamespacedName, dep)
+ if err != nil {
+ if k8serrors.IsNotFound(err) {
+ log.Printf("Deployment not found: %+v. Remove from CR if it is stored there.\n", req.NamespacedName)
+ // Remove the Deployment's status from StatusList
+ // This can happen if we get the DeletionTimeStamp event
+ // after the Deployment has been deleted.
+ r.deleteDeploymentFromAllCRs(req.NamespacedName)
+ return reconcile.Result{}, nil
+ }
+ log.Printf("Failed to get deployment: %+v\n", req.NamespacedName)
+ return reconcile.Result{}, err
+ }
+
+ // Find the CRs which track this deployment via the labelselector
+ crSelector := returnLabel(dep.GetLabels())
+ if crSelector == nil {
+ log.Println("We should not be here. The predicate should have filtered this Deployment")
+ }
+
+ // Get the CRs which have this label and update them all
+ // Ideally, we will have only one CR, but there is nothing
+ // preventing the creation of multiple.
+ // TODO: Consider using an admission validating webook to prevent multiple
+ rbStatusList := &v1alpha1.ResourceBundleStateList{}
+ err = listResources(r.client, req.Namespace, crSelector, rbStatusList)
+ if err != nil || len(rbStatusList.Items) == 0 {
+ log.Printf("Did not find any CRs tracking this resource\n")
+ return reconcile.Result{}, nil
+ }
+
+ err = r.updateCRs(rbStatusList, dep)
+ if err != nil {
+ // Requeue the update
+ return reconcile.Result{}, err
+ }
+
+ return reconcile.Result{}, nil
+}
+
+// deleteDeploymentFromAllCRs deletes deployment status from all the CRs when the Deployment itself has been deleted
+// and we have not handled the updateCRs yet.
+// Since, we don't have the deployment's labels, we need to look at all the CRs in this namespace
+func (r *deploymentReconciler) deleteDeploymentFromAllCRs(namespacedName types.NamespacedName) error {
+
+ rbStatusList := &v1alpha1.ResourceBundleStateList{}
+ err := listResources(r.client, namespacedName.Namespace, nil, rbStatusList)
+ if err != nil || len(rbStatusList.Items) == 0 {
+ log.Printf("Did not find any CRs tracking this resource\n")
+ return nil
+ }
+ for _, cr := range rbStatusList.Items {
+ r.deleteFromSingleCR(&cr, namespacedName.Name)
+ }
+
+ return nil
+}
+
+func (r *deploymentReconciler) updateCRs(crl *v1alpha1.ResourceBundleStateList, dep *appsv1.Deployment) error {
+
+ for _, cr := range crl.Items {
+ // Deployment is not scheduled for deletion
+ if dep.DeletionTimestamp == nil {
+ err := r.updateSingleCR(&cr, dep)
+ if err != nil {
+ return err
+ }
+ } else {
+ // Deployment is scheduled for deletion
+ r.deleteFromSingleCR(&cr, dep.Name)
+ }
+ }
+
+ return nil
+}
+
+func (r *deploymentReconciler) deleteFromSingleCR(cr *v1alpha1.ResourceBundleState, name string) error {
+ cr.Status.ResourceCount--
+ length := len(cr.Status.DeploymentStatuses)
+ for i, rstatus := range cr.Status.DeploymentStatuses {
+ if rstatus.Name == name {
+ //Delete that status from the array
+ cr.Status.DeploymentStatuses[i] = cr.Status.DeploymentStatuses[length-1]
+ cr.Status.DeploymentStatuses[length-1].Status = appsv1.DeploymentStatus{}
+ cr.Status.DeploymentStatuses = cr.Status.DeploymentStatuses[:length-1]
+ return nil
+ }
+ }
+
+ log.Println("Did not find a status for Deployment in CR")
+ return nil
+}
+
+func (r *deploymentReconciler) updateSingleCR(cr *v1alpha1.ResourceBundleState, dep *appsv1.Deployment) error {
+
+ // Update status after searching for it in the list of resourceStatuses
+ for i, rstatus := range cr.Status.DeploymentStatuses {
+ // Look for the status if we already have it in the CR
+ if rstatus.Name == dep.Name {
+ dep.Status.DeepCopyInto(&cr.Status.DeploymentStatuses[i].Status)
+ err := r.client.Status().Update(context.TODO(), cr)
+ if err != nil {
+ log.Printf("failed to update rbstate: %v\n", err)
+ return err
+ }
+ return nil
+ }
+ }
+
+ // Exited for loop with no status found
+ // Increment the number of tracked resources
+ cr.Status.ResourceCount++
+
+ // Add it to CR
+ cr.Status.DeploymentStatuses = append(cr.Status.DeploymentStatuses, appsv1.Deployment{
+ ObjectMeta: dep.ObjectMeta,
+ Status: dep.Status,
+ })
+
+ err := r.client.Status().Update(context.TODO(), cr)
+ if err != nil {
+ log.Printf("failed to update rbstate: %v\n", err)
+ return err
+ }
+
+ return nil
+}
diff --git a/src/monitor/pkg/controller/resourcebundlestate/deployment_predicate.go b/src/monitor/pkg/controller/resourcebundlestate/deployment_predicate.go
new file mode 100644
index 00000000..6061e93f
--- /dev/null
+++ b/src/monitor/pkg/controller/resourcebundlestate/deployment_predicate.go
@@ -0,0 +1,44 @@
+package resourcebundlestate
+
+import (
+ "sigs.k8s.io/controller-runtime/pkg/event"
+)
+
+type deploymentPredicate struct {
+}
+
+func (d *deploymentPredicate) Create(evt event.CreateEvent) bool {
+
+ if evt.Meta == nil {
+ return false
+ }
+
+ labels := evt.Meta.GetLabels()
+ return checkLabel(labels)
+}
+
+func (d *deploymentPredicate) Delete(evt event.DeleteEvent) bool {
+
+ if evt.Meta == nil {
+ return false
+ }
+
+ labels := evt.Meta.GetLabels()
+ return checkLabel(labels)
+}
+
+func (d *deploymentPredicate) Update(evt event.UpdateEvent) bool {
+
+ if evt.MetaNew == nil {
+ return false
+ }
+
+ labels := evt.MetaNew.GetLabels()
+ return checkLabel(labels)
+}
+
+func (d *deploymentPredicate) Generic(evt event.GenericEvent) bool {
+
+ labels := evt.Meta.GetLabels()
+ return checkLabel(labels)
+}
diff --git a/src/monitor/pkg/controller/resourcebundlestate/helpers.go b/src/monitor/pkg/controller/resourcebundlestate/helpers.go
index dab78825..5a5676f8 100644
--- a/src/monitor/pkg/controller/resourcebundlestate/helpers.go
+++ b/src/monitor/pkg/controller/resourcebundlestate/helpers.go
@@ -12,7 +12,7 @@ import (
// checkLabel verifies if the expected label exists and returns bool
func checkLabel(labels map[string]string) bool {
- _, ok := labels["k8splugin.io/rb-inst-id"]
+ _, ok := labels["emco/deployment-id"]
if !ok {
log.Printf("Pod does not have label. Filter it.")
return false
@@ -23,13 +23,13 @@ func checkLabel(labels map[string]string) bool {
// returnLabel verifies if the expected label exists and returns a map
func returnLabel(labels map[string]string) map[string]string {
- l, ok := labels["k8splugin.io/rb-inst-id"]
+ l, ok := labels["emco/deployment-id"]
if !ok {
log.Printf("Pod does not have label. Filter it.")
return nil
}
return map[string]string{
- "k8splugin.io/rb-inst-id": l,
+ "emco/deployment-id": l,
}
}
diff --git a/src/monitor/pkg/controller/resourcebundlestate/ingress_controller.go b/src/monitor/pkg/controller/resourcebundlestate/ingress_controller.go
new file mode 100644
index 00000000..603536b3
--- /dev/null
+++ b/src/monitor/pkg/controller/resourcebundlestate/ingress_controller.go
@@ -0,0 +1,182 @@
+package resourcebundlestate
+
+import (
+ "context"
+ "log"
+
+ "github.com/onap/multicloud-k8s/src/monitor/pkg/apis/k8splugin/v1alpha1"
+
+ v1beta1 "k8s.io/api/extensions/v1beta1"
+ k8serrors "k8s.io/apimachinery/pkg/api/errors"
+ "k8s.io/apimachinery/pkg/types"
+ "sigs.k8s.io/controller-runtime/pkg/client"
+ "sigs.k8s.io/controller-runtime/pkg/controller"
+ "sigs.k8s.io/controller-runtime/pkg/handler"
+ "sigs.k8s.io/controller-runtime/pkg/manager"
+ "sigs.k8s.io/controller-runtime/pkg/reconcile"
+ "sigs.k8s.io/controller-runtime/pkg/source"
+)
+
+// AddIngressController the new controller to the controller manager
+func AddIngressController(mgr manager.Manager) error {
+ return addIngressController(mgr, newIngressReconciler(mgr))
+}
+
+func addIngressController(mgr manager.Manager, r *ingressReconciler) error {
+ // Create a new controller
+ c, err := controller.New("Ingress-controller", mgr, controller.Options{Reconciler: r})
+ if err != nil {
+ return err
+ }
+
+ // Watch for changes to secondar resource Ingress
+ // Predicate filters Ingress which don't have the k8splugin label
+ err = c.Watch(&source.Kind{Type: &v1beta1.Ingress{}}, &handler.EnqueueRequestForObject{}, &ingressPredicate{})
+ if err != nil {
+ return err
+ }
+
+ return nil
+}
+
+func newIngressReconciler(m manager.Manager) *ingressReconciler {
+ return &ingressReconciler{client: m.GetClient()}
+}
+
+type ingressReconciler struct {
+ client client.Client
+}
+
+// Reconcile implements the loop that will update the ResourceBundleState CR
+// whenever we get any updates from all the ingress we watch.
+func (r *ingressReconciler) Reconcile(req reconcile.Request) (reconcile.Result, error) {
+ log.Printf("Updating ResourceBundleState for Ingress: %+v\n", req)
+
+ ing := &v1beta1.Ingress{}
+ err := r.client.Get(context.TODO(), req.NamespacedName, ing)
+ if err != nil {
+ if k8serrors.IsNotFound(err) {
+ log.Printf("Ingress not found: %+v. Remove from CR if it is stored there.\n", req.NamespacedName)
+ // Remove the Ingress's status from StatusList
+ // This can happen if we get the DeletionTimeStamp event
+ // after the Ingress has been deleted.
+ r.deleteIngressFromAllCRs(req.NamespacedName)
+ return reconcile.Result{}, nil
+ }
+ log.Printf("Failed to get ingress: %+v\n", req.NamespacedName)
+ return reconcile.Result{}, err
+ }
+
+ // Find the CRs which track this Ingress via the labelselector
+ crSelector := returnLabel(ing.GetLabels())
+ if crSelector == nil {
+ log.Println("We should not be here. The predicate should have filtered this Ingress")
+ }
+
+ // Get the CRs which have this label and update them all
+ // Ideally, we will have only one CR, but there is nothing
+ // preventing the creation of multiple.
+ // TODO: Consider using an admission validating webook to prevent multiple
+ rbStatusList := &v1alpha1.ResourceBundleStateList{}
+ err = listResources(r.client, req.Namespace, crSelector, rbStatusList)
+ if err != nil || len(rbStatusList.Items) == 0 {
+ log.Printf("Did not find any CRs tracking this resource\n")
+ return reconcile.Result{}, nil
+ }
+
+ err = r.updateCRs(rbStatusList, ing)
+ if err != nil {
+ // Requeue the update
+ return reconcile.Result{}, err
+ }
+
+ return reconcile.Result{}, nil
+}
+
+// deleteIngressFromAllCRs deletes ingress status from all the CRs when the Ingress itself has been deleted
+// and we have not handled the updateCRs yet.
+// Since, we don't have the Ingress's labels, we need to look at all the CRs in this namespace
+func (r *ingressReconciler) deleteIngressFromAllCRs(namespacedName types.NamespacedName) error {
+
+ rbStatusList := &v1alpha1.ResourceBundleStateList{}
+ err := listResources(r.client, namespacedName.Namespace, nil, rbStatusList)
+ if err != nil || len(rbStatusList.Items) == 0 {
+ log.Printf("Did not find any CRs tracking this resource\n")
+ return nil
+ }
+ for _, cr := range rbStatusList.Items {
+ r.deleteFromSingleCR(&cr, namespacedName.Name)
+ }
+
+ return nil
+}
+
+func (r *ingressReconciler) updateCRs(crl *v1alpha1.ResourceBundleStateList, ing *v1beta1.Ingress) error {
+
+ for _, cr := range crl.Items {
+ // Ingress is not scheduled for deletion
+ if ing.DeletionTimestamp == nil {
+ err := r.updateSingleCR(&cr, ing)
+ if err != nil {
+ return err
+ }
+ } else {
+ // Ingress is scheduled for deletion
+ r.deleteFromSingleCR(&cr, ing.Name)
+ }
+ }
+
+ return nil
+}
+
+func (r *ingressReconciler) deleteFromSingleCR(cr *v1alpha1.ResourceBundleState, name string) error {
+ cr.Status.ResourceCount--
+ length := len(cr.Status.IngressStatuses)
+ for i, rstatus := range cr.Status.IngressStatuses {
+ if rstatus.Name == name {
+ //Delete that status from the array
+ cr.Status.IngressStatuses[i] = cr.Status.IngressStatuses[length-1]
+ cr.Status.IngressStatuses[length-1].Status = v1beta1.IngressStatus{}
+ cr.Status.IngressStatuses = cr.Status.IngressStatuses[:length-1]
+ return nil
+ }
+ }
+
+ log.Println("Did not find a status for Ingress in CR")
+ return nil
+}
+
+func (r *ingressReconciler) updateSingleCR(cr *v1alpha1.ResourceBundleState, ing *v1beta1.Ingress) error {
+
+ // Update status after searching for it in the list of resourceStatuses
+ for i, rstatus := range cr.Status.IngressStatuses {
+ // Look for the status if we already have it in the CR
+ if rstatus.Name == ing.Name {
+ ing.Status.DeepCopyInto(&cr.Status.IngressStatuses[i].Status)
+ err := r.client.Status().Update(context.TODO(), cr)
+ if err != nil {
+ log.Printf("failed to update rbstate: %v\n", err)
+ return err
+ }
+ return nil
+ }
+ }
+
+ // Exited for loop with no status found
+ // Increment the number of tracked resources
+ cr.Status.ResourceCount++
+
+ // Add it to CR
+ cr.Status.IngressStatuses = append(cr.Status.IngressStatuses, v1beta1.Ingress{
+ ObjectMeta: ing.ObjectMeta,
+ Status: ing.Status,
+ })
+
+ err := r.client.Status().Update(context.TODO(), cr)
+ if err != nil {
+ log.Printf("failed to update rbstate: %v\n", err)
+ return err
+ }
+
+ return nil
+}
diff --git a/src/monitor/pkg/controller/resourcebundlestate/ingress_predicate.go b/src/monitor/pkg/controller/resourcebundlestate/ingress_predicate.go
new file mode 100644
index 00000000..9a41c842
--- /dev/null
+++ b/src/monitor/pkg/controller/resourcebundlestate/ingress_predicate.go
@@ -0,0 +1,44 @@
+package resourcebundlestate
+
+import (
+ "sigs.k8s.io/controller-runtime/pkg/event"
+)
+
+type ingressPredicate struct {
+}
+
+func (i *ingressPredicate) Create(evt event.CreateEvent) bool {
+
+ if evt.Meta == nil {
+ return false
+ }
+
+ labels := evt.Meta.GetLabels()
+ return checkLabel(labels)
+}
+
+func (i *ingressPredicate) Delete(evt event.DeleteEvent) bool {
+
+ if evt.Meta == nil {
+ return false
+ }
+
+ labels := evt.Meta.GetLabels()
+ return checkLabel(labels)
+}
+
+func (i *ingressPredicate) Update(evt event.UpdateEvent) bool {
+
+ if evt.MetaNew == nil {
+ return false
+ }
+
+ labels := evt.MetaNew.GetLabels()
+ return checkLabel(labels)
+}
+
+func (i *ingressPredicate) Generic(evt event.GenericEvent) bool {
+
+ labels := evt.Meta.GetLabels()
+ return checkLabel(labels)
+}
diff --git a/src/monitor/pkg/controller/resourcebundlestate/job_controller.go b/src/monitor/pkg/controller/resourcebundlestate/job_controller.go
new file mode 100644
index 00000000..cd76e66f
--- /dev/null
+++ b/src/monitor/pkg/controller/resourcebundlestate/job_controller.go
@@ -0,0 +1,182 @@
+package resourcebundlestate
+
+import (
+ "context"
+ "log"
+
+ "github.com/onap/multicloud-k8s/src/monitor/pkg/apis/k8splugin/v1alpha1"
+
+ v1 "k8s.io/api/batch/v1"
+ k8serrors "k8s.io/apimachinery/pkg/api/errors"
+ "k8s.io/apimachinery/pkg/types"
+ "sigs.k8s.io/controller-runtime/pkg/client"
+ "sigs.k8s.io/controller-runtime/pkg/controller"
+ "sigs.k8s.io/controller-runtime/pkg/handler"
+ "sigs.k8s.io/controller-runtime/pkg/manager"
+ "sigs.k8s.io/controller-runtime/pkg/reconcile"
+ "sigs.k8s.io/controller-runtime/pkg/source"
+)
+
+// AddJobController the new controller to the controller manager
+func AddJobController(mgr manager.Manager) error {
+ return addJobController(mgr, newJobReconciler(mgr))
+}
+
+func addJobController(mgr manager.Manager, r *jobReconciler) error {
+ // Create a new controller
+ c, err := controller.New("Job-controller", mgr, controller.Options{Reconciler: r})
+ if err != nil {
+ return err
+ }
+
+ // Watch for changes to secondar resource Jobs
+ // Predicate filters Job which don't have the k8splugin label
+ err = c.Watch(&source.Kind{Type: &v1.Job{}}, &handler.EnqueueRequestForObject{}, &jobPredicate{})
+ if err != nil {
+ return err
+ }
+
+ return nil
+}
+
+func newJobReconciler(m manager.Manager) *jobReconciler {
+ return &jobReconciler{client: m.GetClient()}
+}
+
+type jobReconciler struct {
+ client client.Client
+}
+
+// Reconcile implements the loop that will update the ResourceBundleState CR
+// whenever we get any updates from all the jobs we watch.
+func (r *jobReconciler) Reconcile(req reconcile.Request) (reconcile.Result, error) {
+ log.Printf("Updating ResourceBundleState for Job: %+v\n", req)
+
+ job := &v1.Job{}
+ err := r.client.Get(context.TODO(), req.NamespacedName, job)
+ if err != nil {
+ if k8serrors.IsNotFound(err) {
+ log.Printf("Job not found: %+v. Remove from CR if it is stored there.\n", req.NamespacedName)
+ // Remove the Job's status from StatusList
+ // This can happen if we get the DeletionTimeStamp event
+ // after the Job has been deleted.
+ r.deleteJobFromAllCRs(req.NamespacedName)
+ return reconcile.Result{}, nil
+ }
+ log.Printf("Failed to get Job: %+v\n", req.NamespacedName)
+ return reconcile.Result{}, err
+ }
+
+ // Find the CRs which track this Job via the labelselector
+ crSelector := returnLabel(job.GetLabels())
+ if crSelector == nil {
+ log.Println("We should not be here. The predicate should have filtered this Job")
+ }
+
+ // Get the CRs which have this label and update them all
+ // Ideally, we will have only one CR, but there is nothing
+ // preventing the creation of multiple.
+ // TODO: Consider using an admission validating webook to prevent multiple
+ rbStatusList := &v1alpha1.ResourceBundleStateList{}
+ err = listResources(r.client, req.Namespace, crSelector, rbStatusList)
+ if err != nil || len(rbStatusList.Items) == 0 {
+ log.Printf("Did not find any CRs tracking this resource\n")
+ return reconcile.Result{}, nil
+ }
+
+ err = r.updateCRs(rbStatusList, job)
+ if err != nil {
+ // Requeue the update
+ return reconcile.Result{}, err
+ }
+
+ return reconcile.Result{}, nil
+}
+
+// deleteJobFromAllCRs deletes job status from all the CRs when the Job itself has been deleted
+// and we have not handled the updateCRs yet.
+// Since, we don't have the job's labels, we need to look at all the CRs in this namespace
+func (r *jobReconciler) deleteJobFromAllCRs(namespacedName types.NamespacedName) error {
+
+ rbStatusList := &v1alpha1.ResourceBundleStateList{}
+ err := listResources(r.client, namespacedName.Namespace, nil, rbStatusList)
+ if err != nil || len(rbStatusList.Items) == 0 {
+ log.Printf("Did not find any CRs tracking this resource\n")
+ return nil
+ }
+ for _, cr := range rbStatusList.Items {
+ r.deleteFromSingleCR(&cr, namespacedName.Name)
+ }
+
+ return nil
+}
+
+func (r *jobReconciler) updateCRs(crl *v1alpha1.ResourceBundleStateList, job *v1.Job) error {
+
+ for _, cr := range crl.Items {
+ // Job is not scheduled for deletion
+ if job.DeletionTimestamp == nil {
+ err := r.updateSingleCR(&cr, job)
+ if err != nil {
+ return err
+ }
+ } else {
+ // Job is scheduled for deletion
+ r.deleteFromSingleCR(&cr, job.Name)
+ }
+ }
+
+ return nil
+}
+
+func (r *jobReconciler) deleteFromSingleCR(cr *v1alpha1.ResourceBundleState, name string) error {
+ cr.Status.ResourceCount--
+ length := len(cr.Status.JobStatuses)
+ for i, rstatus := range cr.Status.JobStatuses {
+ if rstatus.Name == name {
+ //Delete that status from the array
+ cr.Status.JobStatuses[i] = cr.Status.JobStatuses[length-1]
+ cr.Status.JobStatuses[length-1].Status = v1.JobStatus{}
+ cr.Status.JobStatuses = cr.Status.JobStatuses[:length-1]
+ return nil
+ }
+ }
+
+ log.Println("Did not find a status for Job in CR")
+ return nil
+}
+
+func (r *jobReconciler) updateSingleCR(cr *v1alpha1.ResourceBundleState, job *v1.Job) error {
+
+ // Update status after searching for it in the list of resourceStatuses
+ for i, rstatus := range cr.Status.JobStatuses {
+ // Look for the status if we already have it in the CR
+ if rstatus.Name == job.Name {
+ job.Status.DeepCopyInto(&cr.Status.JobStatuses[i].Status)
+ err := r.client.Status().Update(context.TODO(), cr)
+ if err != nil {
+ log.Printf("failed to update rbstate: %v\n", err)
+ return err
+ }
+ return nil
+ }
+ }
+
+ // Exited for loop with no status found
+ // Increment the number of tracked resources
+ cr.Status.ResourceCount++
+
+ // Add it to CR
+ cr.Status.JobStatuses = append(cr.Status.JobStatuses, v1.Job{
+ ObjectMeta: job.ObjectMeta,
+ Status: job.Status,
+ })
+
+ err := r.client.Status().Update(context.TODO(), cr)
+ if err != nil {
+ log.Printf("failed to update rbstate: %v\n", err)
+ return err
+ }
+
+ return nil
+}
diff --git a/src/monitor/pkg/controller/resourcebundlestate/job_predicate.go b/src/monitor/pkg/controller/resourcebundlestate/job_predicate.go
new file mode 100644
index 00000000..14841532
--- /dev/null
+++ b/src/monitor/pkg/controller/resourcebundlestate/job_predicate.go
@@ -0,0 +1,44 @@
+package resourcebundlestate
+
+import (
+ "sigs.k8s.io/controller-runtime/pkg/event"
+)
+
+type jobPredicate struct {
+}
+
+func (j *jobPredicate) Create(evt event.CreateEvent) bool {
+
+ if evt.Meta == nil {
+ return false
+ }
+
+ labels := evt.Meta.GetLabels()
+ return checkLabel(labels)
+}
+
+func (j *jobPredicate) Delete(evt event.DeleteEvent) bool {
+
+ if evt.Meta == nil {
+ return false
+ }
+
+ labels := evt.Meta.GetLabels()
+ return checkLabel(labels)
+}
+
+func (j *jobPredicate) Update(evt event.UpdateEvent) bool {
+
+ if evt.MetaNew == nil {
+ return false
+ }
+
+ labels := evt.MetaNew.GetLabels()
+ return checkLabel(labels)
+}
+
+func (j *jobPredicate) Generic(evt event.GenericEvent) bool {
+
+ labels := evt.Meta.GetLabels()
+ return checkLabel(labels)
+}
diff --git a/src/monitor/pkg/controller/resourcebundlestate/pod_controller.go b/src/monitor/pkg/controller/resourcebundlestate/pod_controller.go
index ba3685f7..65a324db 100644
--- a/src/monitor/pkg/controller/resourcebundlestate/pod_controller.go
+++ b/src/monitor/pkg/controller/resourcebundlestate/pod_controller.go
@@ -24,7 +24,7 @@ func AddPodController(mgr manager.Manager) error {
func addPodController(mgr manager.Manager, r *podReconciler) error {
// Create a new controller
- c, err := controller.New("ResourceBundleState-controller", mgr, controller.Options{Reconciler: r})
+ c, err := controller.New("Pod-controller", mgr, controller.Options{Reconciler: r})
if err != nil {
return err
}
diff --git a/src/monitor/pkg/controller/resourcebundlestate/secret_controller.go b/src/monitor/pkg/controller/resourcebundlestate/secret_controller.go
new file mode 100644
index 00000000..fe70d53f
--- /dev/null
+++ b/src/monitor/pkg/controller/resourcebundlestate/secret_controller.go
@@ -0,0 +1,179 @@
+package resourcebundlestate
+
+import (
+ "context"
+ "log"
+
+ "github.com/onap/multicloud-k8s/src/monitor/pkg/apis/k8splugin/v1alpha1"
+
+ corev1 "k8s.io/api/core/v1"
+ k8serrors "k8s.io/apimachinery/pkg/api/errors"
+ "k8s.io/apimachinery/pkg/types"
+ "sigs.k8s.io/controller-runtime/pkg/client"
+ "sigs.k8s.io/controller-runtime/pkg/controller"
+ "sigs.k8s.io/controller-runtime/pkg/handler"
+ "sigs.k8s.io/controller-runtime/pkg/manager"
+ "sigs.k8s.io/controller-runtime/pkg/reconcile"
+ "sigs.k8s.io/controller-runtime/pkg/source"
+)
+
+// AddSecretController the new controller to the controller manager
+func AddSecretController(mgr manager.Manager) error {
+ return addSecretController(mgr, newSecretReconciler(mgr))
+}
+
+func addSecretController(mgr manager.Manager, r *secretReconciler) error {
+ // Create a new controller
+ c, err := controller.New("Secret-controller", mgr, controller.Options{Reconciler: r})
+ if err != nil {
+ return err
+ }
+
+ // Watch for changes to secondar resource Secret
+ // Predicate filters Secret which don't have the k8splugin label
+ err = c.Watch(&source.Kind{Type: &corev1.Secret{}}, &handler.EnqueueRequestForObject{}, &secretPredicate{})
+ if err != nil {
+ return err
+ }
+
+ return nil
+}
+
+func newSecretReconciler(m manager.Manager) *secretReconciler {
+ return &secretReconciler{client: m.GetClient()}
+}
+
+type secretReconciler struct {
+ client client.Client
+}
+
+// Reconcile implements the loop that will update the ResourceBundleState CR
+// whenever we get any updates from all the Secrets we watch.
+func (r *secretReconciler) Reconcile(req reconcile.Request) (reconcile.Result, error) {
+ log.Printf("Updating ResourceBundleState for Secret: %+v\n", req)
+
+ sec := &corev1.Secret{}
+ err := r.client.Get(context.TODO(), req.NamespacedName, sec)
+ if err != nil {
+ if k8serrors.IsNotFound(err) {
+ log.Printf("Secret not found: %+v. Remove from CR if it is stored there.\n", req.NamespacedName)
+ // Remove the Secret's status from StatusList
+ // This can happen if we get the DeletionTimeStamp event
+ // after the Secret has been deleted.
+ r.deleteSecretFromAllCRs(req.NamespacedName)
+ return reconcile.Result{}, nil
+ }
+ log.Printf("Failed to get Secret: %+v\n", req.NamespacedName)
+ return reconcile.Result{}, err
+ }
+
+ // Find the CRs which track this Secret via the labelselector
+ crSelector := returnLabel(sec.GetLabels())
+ if crSelector == nil {
+ log.Println("We should not be here. The predicate should have filtered this Secret")
+ }
+
+ // Get the CRs which have this label and update them all
+ // Ideally, we will have only one CR, but there is nothing
+ // preventing the creation of multiple.
+ // TODO: Consider using an admission validating webook to prevent multiple
+ rbStatusList := &v1alpha1.ResourceBundleStateList{}
+ err = listResources(r.client, req.Namespace, crSelector, rbStatusList)
+ if err != nil || len(rbStatusList.Items) == 0 {
+ log.Printf("Did not find any CRs tracking this resource\n")
+ return reconcile.Result{}, nil
+ }
+
+ err = r.updateCRs(rbStatusList, sec)
+ if err != nil {
+ // Requeue the update
+ return reconcile.Result{}, err
+ }
+
+ return reconcile.Result{}, nil
+}
+
+// deleteSecretFromAllCRs deletes Secret status from all the CRs when the Secret itself has been deleted
+// and we have not handled the updateCRs yet.
+// Since, we don't have the Secret's labels, we need to look at all the CRs in this namespace
+func (r *secretReconciler) deleteSecretFromAllCRs(namespacedName types.NamespacedName) error {
+
+ rbStatusList := &v1alpha1.ResourceBundleStateList{}
+ err := listResources(r.client, namespacedName.Namespace, nil, rbStatusList)
+ if err != nil || len(rbStatusList.Items) == 0 {
+ log.Printf("Did not find any CRs tracking this resource\n")
+ return nil
+ }
+ for _, cr := range rbStatusList.Items {
+ r.deleteFromSingleCR(&cr, namespacedName.Name)
+ }
+
+ return nil
+}
+
+func (r *secretReconciler) updateCRs(crl *v1alpha1.ResourceBundleStateList, sec *corev1.Secret) error {
+
+ for _, cr := range crl.Items {
+ // Secret is not scheduled for deletion
+ if sec.DeletionTimestamp == nil {
+ err := r.updateSingleCR(&cr, sec)
+ if err != nil {
+ return err
+ }
+ } else {
+ // Secret is scheduled for deletion
+ r.deleteFromSingleCR(&cr, sec.Name)
+ }
+ }
+
+ return nil
+}
+
+func (r *secretReconciler) deleteFromSingleCR(cr *v1alpha1.ResourceBundleState, name string) error {
+ cr.Status.ResourceCount--
+ length := len(cr.Status.SecretStatuses)
+ for i, rstatus := range cr.Status.SecretStatuses {
+ if rstatus.Name == name {
+ //Delete that status from the array
+ cr.Status.SecretStatuses[i] = cr.Status.SecretStatuses[length-1]
+ cr.Status.SecretStatuses = cr.Status.SecretStatuses[:length-1]
+ return nil
+ }
+ }
+
+ log.Println("Did not find a status for SecretStatuses in CR")
+ return nil
+}
+
+func (r *secretReconciler) updateSingleCR(cr *v1alpha1.ResourceBundleState, sec *corev1.Secret) error {
+
+ // Update status after searching for it in the list of resourceStatuses
+ for _, rstatus := range cr.Status.SecretStatuses {
+ // Look for the status if we already have it in the CR
+ if rstatus.Name == sec.Name {
+ err := r.client.Status().Update(context.TODO(), cr)
+ if err != nil {
+ log.Printf("failed to update rbstate: %v\n", err)
+ return err
+ }
+ return nil
+ }
+ }
+
+ // Exited for loop with no status found
+ // Increment the number of tracked resources
+ cr.Status.ResourceCount++
+
+ // Add it to CR
+ cr.Status.SecretStatuses = append(cr.Status.SecretStatuses, corev1.Secret{
+ ObjectMeta: sec.ObjectMeta,
+ })
+
+ err := r.client.Status().Update(context.TODO(), cr)
+ if err != nil {
+ log.Printf("failed to update rbstate: %v\n", err)
+ return err
+ }
+
+ return nil
+}
diff --git a/src/monitor/pkg/controller/resourcebundlestate/secret_predicate.go b/src/monitor/pkg/controller/resourcebundlestate/secret_predicate.go
new file mode 100644
index 00000000..e25e1114
--- /dev/null
+++ b/src/monitor/pkg/controller/resourcebundlestate/secret_predicate.go
@@ -0,0 +1,44 @@
+package resourcebundlestate
+
+import (
+ "sigs.k8s.io/controller-runtime/pkg/event"
+)
+
+type secretPredicate struct {
+}
+
+func (s *secretPredicate) Create(evt event.CreateEvent) bool {
+
+ if evt.Meta == nil {
+ return false
+ }
+
+ labels := evt.Meta.GetLabels()
+ return checkLabel(labels)
+}
+
+func (s *secretPredicate) Delete(evt event.DeleteEvent) bool {
+
+ if evt.Meta == nil {
+ return false
+ }
+
+ labels := evt.Meta.GetLabels()
+ return checkLabel(labels)
+}
+
+func (s *secretPredicate) Update(evt event.UpdateEvent) bool {
+
+ if evt.MetaNew == nil {
+ return false
+ }
+
+ labels := evt.MetaNew.GetLabels()
+ return checkLabel(labels)
+}
+
+func (s *secretPredicate) Generic(evt event.GenericEvent) bool {
+
+ labels := evt.Meta.GetLabels()
+ return checkLabel(labels)
+}
diff --git a/src/monitor/pkg/controller/resourcebundlestate/service_controller.go b/src/monitor/pkg/controller/resourcebundlestate/service_controller.go
new file mode 100644
index 00000000..d1bb2fd6
--- /dev/null
+++ b/src/monitor/pkg/controller/resourcebundlestate/service_controller.go
@@ -0,0 +1,182 @@
+package resourcebundlestate
+
+import (
+ "context"
+ "log"
+
+ "github.com/onap/multicloud-k8s/src/monitor/pkg/apis/k8splugin/v1alpha1"
+
+ corev1 "k8s.io/api/core/v1"
+ k8serrors "k8s.io/apimachinery/pkg/api/errors"
+ "k8s.io/apimachinery/pkg/types"
+ "sigs.k8s.io/controller-runtime/pkg/client"
+ "sigs.k8s.io/controller-runtime/pkg/controller"
+ "sigs.k8s.io/controller-runtime/pkg/handler"
+ "sigs.k8s.io/controller-runtime/pkg/manager"
+ "sigs.k8s.io/controller-runtime/pkg/reconcile"
+ "sigs.k8s.io/controller-runtime/pkg/source"
+)
+
+// AddServiceController the new controller to the controller manager
+func AddServiceController(mgr manager.Manager) error {
+ return addServiceController(mgr, newServiceReconciler(mgr))
+}
+
+func addServiceController(mgr manager.Manager, r *serviceReconciler) error {
+ // Create a new controller
+ c, err := controller.New("Service-controller", mgr, controller.Options{Reconciler: r})
+ if err != nil {
+ return err
+ }
+
+ // Watch for changes to secondar resource Services
+ // Predicate filters Service which don't have the k8splugin label
+ err = c.Watch(&source.Kind{Type: &corev1.Service{}}, &handler.EnqueueRequestForObject{}, &servicePredicate{})
+ if err != nil {
+ return err
+ }
+
+ return nil
+}
+
+func newServiceReconciler(m manager.Manager) *serviceReconciler {
+ return &serviceReconciler{client: m.GetClient()}
+}
+
+type serviceReconciler struct {
+ client client.Client
+}
+
+// Reconcile implements the loop that will update the ResourceBundleState CR
+// whenever we get any updates from all the services we watch.
+func (r *serviceReconciler) Reconcile(req reconcile.Request) (reconcile.Result, error) {
+ log.Printf("Updating ResourceBundleState for Service: %+v\n", req)
+
+ svc := &corev1.Service{}
+ err := r.client.Get(context.TODO(), req.NamespacedName, svc)
+ if err != nil {
+ if k8serrors.IsNotFound(err) {
+ log.Printf("Service not found: %+v. Remove from CR if it is stored there.\n", req.NamespacedName)
+ // Remove the Service's status from StatusList
+ // This can happen if we get the DeletionTimeStamp event
+ // after the Service has been deleted.
+ r.deleteServiceFromAllCRs(req.NamespacedName)
+ return reconcile.Result{}, nil
+ }
+ log.Printf("Failed to get service: %+v\n", req.NamespacedName)
+ return reconcile.Result{}, err
+ }
+
+ // Find the CRs which track this service via the labelselector
+ crSelector := returnLabel(svc.GetLabels())
+ if crSelector == nil {
+ log.Println("We should not be here. The predicate should have filtered this Service")
+ }
+
+ // Get the CRs which have this label and update them all
+ // Ideally, we will have only one CR, but there is nothing
+ // preventing the creation of multiple.
+ // TODO: Consider using an admission validating webook to prevent multiple
+ rbStatusList := &v1alpha1.ResourceBundleStateList{}
+ err = listResources(r.client, req.Namespace, crSelector, rbStatusList)
+ if err != nil || len(rbStatusList.Items) == 0 {
+ log.Printf("Did not find any CRs tracking this resource\n")
+ return reconcile.Result{}, nil
+ }
+
+ err = r.updateCRs(rbStatusList, svc)
+ if err != nil {
+ // Requeue the update
+ return reconcile.Result{}, err
+ }
+
+ return reconcile.Result{}, nil
+}
+
+// deleteServiceFromAllCRs deletes service status from all the CRs when the Service itself has been deleted
+// and we have not handled the updateCRs yet.
+// Since, we don't have the service's labels, we need to look at all the CRs in this namespace
+func (r *serviceReconciler) deleteServiceFromAllCRs(namespacedName types.NamespacedName) error {
+
+ rbStatusList := &v1alpha1.ResourceBundleStateList{}
+ err := listResources(r.client, namespacedName.Namespace, nil, rbStatusList)
+ if err != nil || len(rbStatusList.Items) == 0 {
+ log.Printf("Did not find any CRs tracking this resource\n")
+ return nil
+ }
+ for _, cr := range rbStatusList.Items {
+ r.deleteFromSingleCR(&cr, namespacedName.Name)
+ }
+
+ return nil
+}
+
+func (r *serviceReconciler) updateCRs(crl *v1alpha1.ResourceBundleStateList, svc *corev1.Service) error {
+
+ for _, cr := range crl.Items {
+ // Service is not scheduled for deletion
+ if svc.DeletionTimestamp == nil {
+ err := r.updateSingleCR(&cr, svc)
+ if err != nil {
+ return err
+ }
+ } else {
+ // Service is scheduled for deletion
+ r.deleteFromSingleCR(&cr, svc.Name)
+ }
+ }
+
+ return nil
+}
+
+func (r *serviceReconciler) deleteFromSingleCR(cr *v1alpha1.ResourceBundleState, name string) error {
+ cr.Status.ResourceCount--
+ length := len(cr.Status.ServiceStatuses)
+ for i, rstatus := range cr.Status.ServiceStatuses {
+ if rstatus.Name == name {
+ //Delete that status from the array
+ cr.Status.ServiceStatuses[i] = cr.Status.ServiceStatuses[length-1]
+ cr.Status.ServiceStatuses[length-1].Status = corev1.ServiceStatus{}
+ cr.Status.ServiceStatuses = cr.Status.ServiceStatuses[:length-1]
+ return nil
+ }
+ }
+
+ log.Println("Did not find a status for Service in CR")
+ return nil
+}
+
+func (r *serviceReconciler) updateSingleCR(cr *v1alpha1.ResourceBundleState, svc *corev1.Service) error {
+
+ // Update status after searching for it in the list of resourceStatuses
+ for i, rstatus := range cr.Status.ServiceStatuses {
+ // Look for the status if we already have it in the CR
+ if rstatus.Name == svc.Name {
+ svc.Status.DeepCopyInto(&cr.Status.ServiceStatuses[i].Status)
+ err := r.client.Status().Update(context.TODO(), cr)
+ if err != nil {
+ log.Printf("failed to update rbstate: %v\n", err)
+ return err
+ }
+ return nil
+ }
+ }
+
+ // Exited for loop with no status found
+ // Increment the number of tracked resources
+ cr.Status.ResourceCount++
+
+ // Add it to CR
+ cr.Status.ServiceStatuses = append(cr.Status.ServiceStatuses, corev1.Service{
+ ObjectMeta: svc.ObjectMeta,
+ Status: svc.Status,
+ })
+
+ err := r.client.Status().Update(context.TODO(), cr)
+ if err != nil {
+ log.Printf("failed to update rbstate: %v\n", err)
+ return err
+ }
+
+ return nil
+}
diff --git a/src/monitor/pkg/controller/resourcebundlestate/service_predicate.go b/src/monitor/pkg/controller/resourcebundlestate/service_predicate.go
new file mode 100644
index 00000000..d427443f
--- /dev/null
+++ b/src/monitor/pkg/controller/resourcebundlestate/service_predicate.go
@@ -0,0 +1,44 @@
+package resourcebundlestate
+
+import (
+ "sigs.k8s.io/controller-runtime/pkg/event"
+)
+
+type servicePredicate struct {
+}
+
+func (s *servicePredicate) Create(evt event.CreateEvent) bool {
+
+ if evt.Meta == nil {
+ return false
+ }
+
+ labels := evt.Meta.GetLabels()
+ return checkLabel(labels)
+}
+
+func (s *servicePredicate) Delete(evt event.DeleteEvent) bool {
+
+ if evt.Meta == nil {
+ return false
+ }
+
+ labels := evt.Meta.GetLabels()
+ return checkLabel(labels)
+}
+
+func (s *servicePredicate) Update(evt event.UpdateEvent) bool {
+
+ if evt.MetaNew == nil {
+ return false
+ }
+
+ labels := evt.MetaNew.GetLabels()
+ return checkLabel(labels)
+}
+
+func (s *servicePredicate) Generic(evt event.GenericEvent) bool {
+
+ labels := evt.Meta.GetLabels()
+ return checkLabel(labels)
+}
diff --git a/src/monitor/pkg/controller/resourcebundlestate/statefulSet_controller.go b/src/monitor/pkg/controller/resourcebundlestate/statefulSet_controller.go
new file mode 100644
index 00000000..ebe61dba
--- /dev/null
+++ b/src/monitor/pkg/controller/resourcebundlestate/statefulSet_controller.go
@@ -0,0 +1,182 @@
+package resourcebundlestate
+
+import (
+ "context"
+ "log"
+
+ "github.com/onap/multicloud-k8s/src/monitor/pkg/apis/k8splugin/v1alpha1"
+
+ appsv1 "k8s.io/api/apps/v1"
+ k8serrors "k8s.io/apimachinery/pkg/api/errors"
+ "k8s.io/apimachinery/pkg/types"
+ "sigs.k8s.io/controller-runtime/pkg/client"
+ "sigs.k8s.io/controller-runtime/pkg/controller"
+ "sigs.k8s.io/controller-runtime/pkg/handler"
+ "sigs.k8s.io/controller-runtime/pkg/manager"
+ "sigs.k8s.io/controller-runtime/pkg/reconcile"
+ "sigs.k8s.io/controller-runtime/pkg/source"
+)
+
+// AddStatefulSetController the new controller to the controller manager
+func AddStatefulSetController(mgr manager.Manager) error {
+ return addStatefulSetController(mgr, newStatefulSetReconciler(mgr))
+}
+
+func addStatefulSetController(mgr manager.Manager, r *statefulSetReconciler) error {
+ // Create a new controller
+ c, err := controller.New("Statefulset-controller", mgr, controller.Options{Reconciler: r})
+ if err != nil {
+ return err
+ }
+
+ // Watch for changes to secondar resource StatefulSets
+ // Predicate filters StatefulSet which don't have the k8splugin label
+ err = c.Watch(&source.Kind{Type: &appsv1.StatefulSet{}}, &handler.EnqueueRequestForObject{}, &statefulSetPredicate{})
+ if err != nil {
+ return err
+ }
+
+ return nil
+}
+
+func newStatefulSetReconciler(m manager.Manager) *statefulSetReconciler {
+ return &statefulSetReconciler{client: m.GetClient()}
+}
+
+type statefulSetReconciler struct {
+ client client.Client
+}
+
+// Reconcile implements the loop that will update the ResourceBundleState CR
+// whenever we get any updates from all the StatefulSets we watch.
+func (r *statefulSetReconciler) Reconcile(req reconcile.Request) (reconcile.Result, error) {
+ log.Printf("Updating ResourceBundleState for StatefulSet: %+v\n", req)
+
+ sfs := &appsv1.StatefulSet{}
+ err := r.client.Get(context.TODO(), req.NamespacedName, sfs)
+ if err != nil {
+ if k8serrors.IsNotFound(err) {
+ log.Printf("StatefulSet not found: %+v. Remove from CR if it is stored there.\n", req.NamespacedName)
+ // Remove the StatefulSet's status from StatusList
+ // This can happen if we get the DeletionTimeStamp event
+ // after the StatefulSet has been deleted.
+ r.deleteStatefulSetFromAllCRs(req.NamespacedName)
+ return reconcile.Result{}, nil
+ }
+ log.Printf("Failed to get statefulSet: %+v\n", req.NamespacedName)
+ return reconcile.Result{}, err
+ }
+
+ // Find the CRs which track this statefulSet via the labelselector
+ crSelector := returnLabel(sfs.GetLabels())
+ if crSelector == nil {
+ log.Println("We should not be here. The predicate should have filtered this StatefulSet")
+ }
+
+ // Get the CRs which have this label and update them all
+ // Ideally, we will have only one CR, but there is nothing
+ // preventing the creation of multiple.
+ // TODO: Consider using an admission validating webook to prevent multiple
+ rbStatusList := &v1alpha1.ResourceBundleStateList{}
+ err = listResources(r.client, req.Namespace, crSelector, rbStatusList)
+ if err != nil || len(rbStatusList.Items) == 0 {
+ log.Printf("Did not find any CRs tracking this resource\n")
+ return reconcile.Result{}, nil
+ }
+
+ err = r.updateCRs(rbStatusList, sfs)
+ if err != nil {
+ // Requeue the update
+ return reconcile.Result{}, err
+ }
+
+ return reconcile.Result{}, nil
+}
+
+// deleteStatefulSetFromAllCRs deletes statefulSet status from all the CRs when the StatefulSet itself has been deleted
+// and we have not handled the updateCRs yet.
+// Since, we don't have the statefulSet's labels, we need to look at all the CRs in this namespace
+func (r *statefulSetReconciler) deleteStatefulSetFromAllCRs(namespacedName types.NamespacedName) error {
+
+ rbStatusList := &v1alpha1.ResourceBundleStateList{}
+ err := listResources(r.client, namespacedName.Namespace, nil, rbStatusList)
+ if err != nil || len(rbStatusList.Items) == 0 {
+ log.Printf("Did not find any CRs tracking this resource\n")
+ return nil
+ }
+ for _, cr := range rbStatusList.Items {
+ r.deleteFromSingleCR(&cr, namespacedName.Name)
+ }
+
+ return nil
+}
+
+func (r *statefulSetReconciler) updateCRs(crl *v1alpha1.ResourceBundleStateList, sfs *appsv1.StatefulSet) error {
+
+ for _, cr := range crl.Items {
+ // StatefulSet is not scheduled for deletion
+ if sfs.DeletionTimestamp == nil {
+ err := r.updateSingleCR(&cr, sfs)
+ if err != nil {
+ return err
+ }
+ } else {
+ // StatefulSet is scheduled for deletion
+ r.deleteFromSingleCR(&cr, sfs.Name)
+ }
+ }
+
+ return nil
+}
+
+func (r *statefulSetReconciler) deleteFromSingleCR(cr *v1alpha1.ResourceBundleState, name string) error {
+ cr.Status.ResourceCount--
+ length := len(cr.Status.StatefulSetStatuses)
+ for i, rstatus := range cr.Status.StatefulSetStatuses {
+ if rstatus.Name == name {
+ //Delete that status from the array
+ cr.Status.StatefulSetStatuses[i] = cr.Status.StatefulSetStatuses[length-1]
+ cr.Status.StatefulSetStatuses[length-1].Status = appsv1.StatefulSetStatus{}
+ cr.Status.StatefulSetStatuses = cr.Status.StatefulSetStatuses[:length-1]
+ return nil
+ }
+ }
+
+ log.Println("Did not find a status for StatefulSet in CR")
+ return nil
+}
+
+func (r *statefulSetReconciler) updateSingleCR(cr *v1alpha1.ResourceBundleState, sfs *appsv1.StatefulSet) error {
+
+ // Update status after searching for it in the list of resourceStatuses
+ for i, rstatus := range cr.Status.StatefulSetStatuses {
+ // Look for the status if we already have it in the CR
+ if rstatus.Name == sfs.Name {
+ sfs.Status.DeepCopyInto(&cr.Status.StatefulSetStatuses[i].Status)
+ err := r.client.Status().Update(context.TODO(), cr)
+ if err != nil {
+ log.Printf("failed to update rbstate: %v\n", err)
+ return err
+ }
+ return nil
+ }
+ }
+
+ // Exited for loop with no status found
+ // Increment the number of tracked resources
+ cr.Status.ResourceCount++
+
+ // Add it to CR
+ cr.Status.StatefulSetStatuses = append(cr.Status.StatefulSetStatuses, appsv1.StatefulSet{
+ ObjectMeta: sfs.ObjectMeta,
+ Status: sfs.Status,
+ })
+
+ err := r.client.Status().Update(context.TODO(), cr)
+ if err != nil {
+ log.Printf("failed to update rbstate: %v\n", err)
+ return err
+ }
+
+ return nil
+}
diff --git a/src/monitor/pkg/controller/resourcebundlestate/statefulSet_predicate.go b/src/monitor/pkg/controller/resourcebundlestate/statefulSet_predicate.go
new file mode 100644
index 00000000..af313131
--- /dev/null
+++ b/src/monitor/pkg/controller/resourcebundlestate/statefulSet_predicate.go
@@ -0,0 +1,44 @@
+package resourcebundlestate
+
+import (
+ "sigs.k8s.io/controller-runtime/pkg/event"
+)
+
+type statefulSetPredicate struct {
+}
+
+func (s *statefulSetPredicate) Create(evt event.CreateEvent) bool {
+
+ if evt.Meta == nil {
+ return false
+ }
+
+ labels := evt.Meta.GetLabels()
+ return checkLabel(labels)
+}
+
+func (s *statefulSetPredicate) Delete(evt event.DeleteEvent) bool {
+
+ if evt.Meta == nil {
+ return false
+ }
+
+ labels := evt.Meta.GetLabels()
+ return checkLabel(labels)
+}
+
+func (s *statefulSetPredicate) Update(evt event.UpdateEvent) bool {
+
+ if evt.MetaNew == nil {
+ return false
+ }
+
+ labels := evt.MetaNew.GetLabels()
+ return checkLabel(labels)
+}
+
+func (s *statefulSetPredicate) Generic(evt event.GenericEvent) bool {
+
+ labels := evt.Meta.GetLabels()
+ return checkLabel(labels)
+}