diff options
author | Srivahni Chivukula <srivahni.chivukula@intel.com> | 2020-04-07 17:52:05 -0700 |
---|---|---|
committer | Ritu Sood <ritu.sood@intel.com> | 2020-06-16 20:06:29 +0000 |
commit | 964db6a95cfdd82969f6af5a09822929a1862408 (patch) | |
tree | b8f31df98b537c32763a2c409d07eaec153f9372 /src/monitor/pkg/controller/resourcebundlestate | |
parent | dd6613ec4e4bbe79699f6b5802334f968dfb8306 (diff) |
Status operator to update status of resources
This operator monitors the status of
resources like pods, services, deployments,
daemonsets, configmaps etc. and updates the
status in the CR accordingly.
Issue-ID: MULTICLOUD-1047
Signed-off-by: Srivahni Chivukula <srivahni.chivukula@intel.com>
Change-Id: I7d92584a44c8add2df69f2985140a55b460ac037
Diffstat (limited to 'src/monitor/pkg/controller/resourcebundlestate')
19 files changed, 2028 insertions, 6 deletions
diff --git a/src/monitor/pkg/controller/resourcebundlestate/configMap_controller.go b/src/monitor/pkg/controller/resourcebundlestate/configMap_controller.go new file mode 100644 index 00000000..f93355af --- /dev/null +++ b/src/monitor/pkg/controller/resourcebundlestate/configMap_controller.go @@ -0,0 +1,179 @@ +package resourcebundlestate + +import ( + "context" + "log" + + "github.com/onap/multicloud-k8s/src/monitor/pkg/apis/k8splugin/v1alpha1" + + corev1 "k8s.io/api/core/v1" + k8serrors "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/types" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/controller" + "sigs.k8s.io/controller-runtime/pkg/handler" + "sigs.k8s.io/controller-runtime/pkg/manager" + "sigs.k8s.io/controller-runtime/pkg/reconcile" + "sigs.k8s.io/controller-runtime/pkg/source" +) + +// AddConfigMapController the new controller to the controller manager +func AddConfigMapController(mgr manager.Manager) error { + return addConfigMapController(mgr, newConfigMapReconciler(mgr)) +} + +func addConfigMapController(mgr manager.Manager, r *configMapReconciler) error { + // Create a new controller + c, err := controller.New("ConfigMap-controller", mgr, controller.Options{Reconciler: r}) + if err != nil { + return err + } + + // Watch for changes to secondar resource ConfigMaps + // Predicate filters Service which don't have the k8splugin label + err = c.Watch(&source.Kind{Type: &corev1.ConfigMap{}}, &handler.EnqueueRequestForObject{}, &configMapPredicate{}) + if err != nil { + return err + } + + return nil +} + +func newConfigMapReconciler(m manager.Manager) *configMapReconciler { + return &configMapReconciler{client: m.GetClient()} +} + +type configMapReconciler struct { + client client.Client +} + +// Reconcile implements the loop that will update the ResourceBundleState CR +// whenever we get any updates from all the ConfigMaps we watch. +func (r *configMapReconciler) Reconcile(req reconcile.Request) (reconcile.Result, error) { + log.Printf("Updating ResourceBundleState for ConfigMap: %+v\n", req) + + cm := &corev1.ConfigMap{} + err := r.client.Get(context.TODO(), req.NamespacedName, cm) + if err != nil { + if k8serrors.IsNotFound(err) { + log.Printf("ConfigMap not found: %+v. Remove from CR if it is stored there.\n", req.NamespacedName) + // Remove the ConfigMap's status from StatusList + // This can happen if we get the DeletionTimeStamp event + // after the ConfigMap has been deleted. + r.deleteConfigMapFromAllCRs(req.NamespacedName) + return reconcile.Result{}, nil + } + log.Printf("Failed to get ConfigMap: %+v\n", req.NamespacedName) + return reconcile.Result{}, err + } + + // Find the CRs which track this ConfigMap via the labelselector + crSelector := returnLabel(cm.GetLabels()) + if crSelector == nil { + log.Println("We should not be here. The predicate should have filtered this ConfigMap") + } + + // Get the CRs which have this label and update them all + // Ideally, we will have only one CR, but there is nothing + // preventing the creation of multiple. + // TODO: Consider using an admission validating webook to prevent multiple + rbStatusList := &v1alpha1.ResourceBundleStateList{} + err = listResources(r.client, req.Namespace, crSelector, rbStatusList) + if err != nil || len(rbStatusList.Items) == 0 { + log.Printf("Did not find any CRs tracking this resource\n") + return reconcile.Result{}, nil + } + + err = r.updateCRs(rbStatusList, cm) + if err != nil { + // Requeue the update + return reconcile.Result{}, err + } + + return reconcile.Result{}, nil +} + +// deleteConfigMapFromAllCRs deletes ConfigMap status from all the CRs when the ConfigMap itself has been deleted +// and we have not handled the updateCRs yet. +// Since, we don't have the ConfigMap's labels, we need to look at all the CRs in this namespace +func (r *configMapReconciler) deleteConfigMapFromAllCRs(namespacedName types.NamespacedName) error { + + rbStatusList := &v1alpha1.ResourceBundleStateList{} + err := listResources(r.client, namespacedName.Namespace, nil, rbStatusList) + if err != nil || len(rbStatusList.Items) == 0 { + log.Printf("Did not find any CRs tracking this resource\n") + return nil + } + for _, cr := range rbStatusList.Items { + r.deleteFromSingleCR(&cr, namespacedName.Name) + } + + return nil +} + +func (r *configMapReconciler) updateCRs(crl *v1alpha1.ResourceBundleStateList, cm *corev1.ConfigMap) error { + + for _, cr := range crl.Items { + // ConfigMap is not scheduled for deletion + if cm.DeletionTimestamp == nil { + err := r.updateSingleCR(&cr, cm) + if err != nil { + return err + } + } else { + // ConfigMap is scheduled for deletion + r.deleteFromSingleCR(&cr, cm.Name) + } + } + + return nil +} + +func (r *configMapReconciler) deleteFromSingleCR(cr *v1alpha1.ResourceBundleState, name string) error { + cr.Status.ResourceCount-- + length := len(cr.Status.ConfigMapStatuses) + for i, rstatus := range cr.Status.ConfigMapStatuses { + if rstatus.Name == name { + //Delete that status from the array + cr.Status.ConfigMapStatuses[i] = cr.Status.ConfigMapStatuses[length-1] + cr.Status.ConfigMapStatuses = cr.Status.ConfigMapStatuses[:length-1] + return nil + } + } + + log.Println("Did not find a status for ConfigMapStatuses in CR") + return nil +} + +func (r *configMapReconciler) updateSingleCR(cr *v1alpha1.ResourceBundleState, cm *corev1.ConfigMap) error { + + // Update status after searching for it in the list of resourceStatuses + for _, rstatus := range cr.Status.ConfigMapStatuses { + // Look for the status if we already have it in the CR + if rstatus.Name == cm.Name { + err := r.client.Status().Update(context.TODO(), cr) + if err != nil { + log.Printf("failed to update rbstate: %v\n", err) + return err + } + return nil + } + } + + // Exited for loop with no status found + // Increment the number of tracked resources + cr.Status.ResourceCount++ + + // Add it to CR + cr.Status.ConfigMapStatuses = append(cr.Status.ConfigMapStatuses, corev1.ConfigMap{ + ObjectMeta: cm.ObjectMeta, + }) + + err := r.client.Status().Update(context.TODO(), cr) + if err != nil { + log.Printf("failed to update rbstate: %v\n", err) + return err + } + + return nil +} diff --git a/src/monitor/pkg/controller/resourcebundlestate/configMap_predicate.go b/src/monitor/pkg/controller/resourcebundlestate/configMap_predicate.go new file mode 100644 index 00000000..b9b17738 --- /dev/null +++ b/src/monitor/pkg/controller/resourcebundlestate/configMap_predicate.go @@ -0,0 +1,44 @@ +package resourcebundlestate + +import ( + "sigs.k8s.io/controller-runtime/pkg/event" +) + +type configMapPredicate struct { +} + +func (c *configMapPredicate) Create(evt event.CreateEvent) bool { + + if evt.Meta == nil { + return false + } + + labels := evt.Meta.GetLabels() + return checkLabel(labels) +} + +func (c *configMapPredicate) Delete(evt event.DeleteEvent) bool { + + if evt.Meta == nil { + return false + } + + labels := evt.Meta.GetLabels() + return checkLabel(labels) +} + +func (c *configMapPredicate) Update(evt event.UpdateEvent) bool { + + if evt.MetaNew == nil { + return false + } + + labels := evt.MetaNew.GetLabels() + return checkLabel(labels) +} + +func (c *configMapPredicate) Generic(evt event.GenericEvent) bool { + + labels := evt.Meta.GetLabels() + return checkLabel(labels) +} diff --git a/src/monitor/pkg/controller/resourcebundlestate/controller.go b/src/monitor/pkg/controller/resourcebundlestate/controller.go index 71765e97..7206116b 100644 --- a/src/monitor/pkg/controller/resourcebundlestate/controller.go +++ b/src/monitor/pkg/controller/resourcebundlestate/controller.go @@ -6,7 +6,10 @@ import ( "github.com/onap/multicloud-k8s/src/monitor/pkg/apis/k8splugin/v1alpha1" + appsv1 "k8s.io/api/apps/v1" + v1 "k8s.io/api/batch/v1" corev1 "k8s.io/api/core/v1" + v1beta1 "k8s.io/api/extensions/v1beta1" k8serrors "k8s.io/apimachinery/pkg/api/errors" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/controller" @@ -70,7 +73,49 @@ func (r *reconciler) Reconcile(req reconcile.Request) (reconcile.Result, error) err = r.updateServices(rbstate, rbstate.Spec.Selector.MatchLabels) if err != nil { - log.Printf("Error adding services: %v\n", err) + log.Printf("Error adding servicestatuses: %v\n", err) + return reconcile.Result{}, err + } + + err = r.updateConfigMaps(rbstate, rbstate.Spec.Selector.MatchLabels) + if err != nil { + log.Printf("Error adding configmapstatuses: %v\n", err) + return reconcile.Result{}, err + } + + err = r.updateDeployments(rbstate, rbstate.Spec.Selector.MatchLabels) + if err != nil { + log.Printf("Error adding deploymentstatuses: %v\n", err) + return reconcile.Result{}, err + } + + err = r.updateSecrets(rbstate, rbstate.Spec.Selector.MatchLabels) + if err != nil { + log.Printf("Error adding secretstatuses: %v\n", err) + return reconcile.Result{}, err + } + + err = r.updateDaemonSets(rbstate, rbstate.Spec.Selector.MatchLabels) + if err != nil { + log.Printf("Error adding daemonSetstatuses: %v\n", err) + return reconcile.Result{}, err + } + + err = r.updateIngresses(rbstate, rbstate.Spec.Selector.MatchLabels) + if err != nil { + log.Printf("Error adding ingressStatuses: %v\n", err) + return reconcile.Result{}, err + } + + err = r.updateJobs(rbstate, rbstate.Spec.Selector.MatchLabels) + if err != nil { + log.Printf("Error adding jobstatuses: %v\n", err) + return reconcile.Result{}, err + } + + err = r.updateStatefulSets(rbstate, rbstate.Spec.Selector.MatchLabels) + if err != nil { + log.Printf("Error adding statefulSetstatuses: %v\n", err) return reconcile.Result{}, err } @@ -96,7 +141,16 @@ func (r *reconciler) updateServices(rbstate *v1alpha1.ResourceBundleState, return err } - rbstate.Status.ServiceStatuses = serviceList.Items + rbstate.Status.ServiceStatuses = []corev1.Service{} + + for _, svc := range serviceList.Items { + resStatus := corev1.Service{ + ObjectMeta: svc.ObjectMeta, + Status: svc.Status, + } + rbstate.Status.ServiceStatuses = append(rbstate.Status.ServiceStatuses, resStatus) + } + return nil } @@ -124,3 +178,169 @@ func (r *reconciler) updatePods(rbstate *v1alpha1.ResourceBundleState, return nil } + +func (r *reconciler) updateConfigMaps(rbstate *v1alpha1.ResourceBundleState, + selectors map[string]string) error { + + // Update the CR with the ConfigMaps created as well + configMapList := &corev1.ConfigMapList{} + err := listResources(r.client, rbstate.Namespace, selectors, configMapList) + if err != nil { + log.Printf("Failed to list configMaps: %v", err) + return err + } + + rbstate.Status.ConfigMapStatuses = []corev1.ConfigMap{} + + for _, cm := range configMapList.Items { + resStatus := corev1.ConfigMap{ + ObjectMeta: cm.ObjectMeta, + } + rbstate.Status.ConfigMapStatuses = append(rbstate.Status.ConfigMapStatuses, resStatus) + } + + return nil +} + +func (r *reconciler) updateDeployments(rbstate *v1alpha1.ResourceBundleState, + selectors map[string]string) error { + + // Update the CR with the Deployments created as well + deploymentList := &appsv1.DeploymentList{} + err := listResources(r.client, rbstate.Namespace, selectors, deploymentList) + if err != nil { + log.Printf("Failed to list deployments: %v", err) + return err + } + + rbstate.Status.DeploymentStatuses = []appsv1.Deployment{} + + for _, dep := range deploymentList.Items { + resStatus := appsv1.Deployment{ + ObjectMeta: dep.ObjectMeta, + Status: dep.Status, + } + rbstate.Status.DeploymentStatuses = append(rbstate.Status.DeploymentStatuses, resStatus) + } + + return nil +} + +func (r *reconciler) updateSecrets(rbstate *v1alpha1.ResourceBundleState, + selectors map[string]string) error { + + // Update the CR with the Secrets created as well + secretList := &corev1.SecretList{} + err := listResources(r.client, rbstate.Namespace, selectors, secretList) + if err != nil { + log.Printf("Failed to list secrets: %v", err) + return err + } + + rbstate.Status.SecretStatuses = []corev1.Secret{} + + for _, sec := range secretList.Items { + resStatus := corev1.Secret{ + ObjectMeta: sec.ObjectMeta, + } + rbstate.Status.SecretStatuses = append(rbstate.Status.SecretStatuses, resStatus) + } + + return nil +} + +func (r *reconciler) updateDaemonSets(rbstate *v1alpha1.ResourceBundleState, + selectors map[string]string) error { + + // Update the CR with the DaemonSets created as well + daemonSetList := &appsv1.DaemonSetList{} + err := listResources(r.client, rbstate.Namespace, selectors, daemonSetList) + if err != nil { + log.Printf("Failed to list DaemonSets: %v", err) + return err + } + + rbstate.Status.DaemonSetStatuses = []appsv1.DaemonSet{} + + for _, ds := range daemonSetList.Items { + resStatus := appsv1.DaemonSet{ + ObjectMeta: ds.ObjectMeta, + Status: ds.Status, + } + rbstate.Status.DaemonSetStatuses = append(rbstate.Status.DaemonSetStatuses, resStatus) + } + + return nil +} + +func (r *reconciler) updateIngresses(rbstate *v1alpha1.ResourceBundleState, + selectors map[string]string) error { + + // Update the CR with the Ingresses created as well + ingressList := &v1beta1.IngressList{} + err := listResources(r.client, rbstate.Namespace, selectors, ingressList) + if err != nil { + log.Printf("Failed to list ingresses: %v", err) + return err + } + + rbstate.Status.IngressStatuses = []v1beta1.Ingress{} + + for _, ing := range ingressList.Items { + resStatus := v1beta1.Ingress{ + ObjectMeta: ing.ObjectMeta, + Status: ing.Status, + } + rbstate.Status.IngressStatuses = append(rbstate.Status.IngressStatuses, resStatus) + } + + return nil +} + +func (r *reconciler) updateJobs(rbstate *v1alpha1.ResourceBundleState, + selectors map[string]string) error { + + // Update the CR with the Services created as well + jobList := &v1.JobList{} + err := listResources(r.client, rbstate.Namespace, selectors, jobList) + if err != nil { + log.Printf("Failed to list jobs: %v", err) + return err + } + + rbstate.Status.JobStatuses = []v1.Job{} + + for _, job := range jobList.Items { + resStatus := v1.Job{ + ObjectMeta: job.ObjectMeta, + Status: job.Status, + } + rbstate.Status.JobStatuses = append(rbstate.Status.JobStatuses, resStatus) + } + + return nil +} + +func (r *reconciler) updateStatefulSets(rbstate *v1alpha1.ResourceBundleState, + selectors map[string]string) error { + + // Update the CR with the StatefulSets created as well + statefulSetList := &appsv1.StatefulSetList{} + err := listResources(r.client, rbstate.Namespace, selectors, statefulSetList) + if err != nil { + log.Printf("Failed to list statefulSets: %v", err) + return err + } + + rbstate.Status.StatefulSetStatuses = []appsv1.StatefulSet{} + + for _, sfs := range statefulSetList.Items { + resStatus := appsv1.StatefulSet{ + ObjectMeta: sfs.ObjectMeta, + Status: sfs.Status, + } + rbstate.Status.StatefulSetStatuses = append(rbstate.Status.StatefulSetStatuses, resStatus) + } + + return nil +} diff --git a/src/monitor/pkg/controller/resourcebundlestate/daemonSet_controller.go b/src/monitor/pkg/controller/resourcebundlestate/daemonSet_controller.go new file mode 100644 index 00000000..3ccb40ce --- /dev/null +++ b/src/monitor/pkg/controller/resourcebundlestate/daemonSet_controller.go @@ -0,0 +1,182 @@ +package resourcebundlestate + +import ( + "context" + "log" + + "github.com/onap/multicloud-k8s/src/monitor/pkg/apis/k8splugin/v1alpha1" + + appsv1 "k8s.io/api/apps/v1" + k8serrors "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/types" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/controller" + "sigs.k8s.io/controller-runtime/pkg/handler" + "sigs.k8s.io/controller-runtime/pkg/manager" + "sigs.k8s.io/controller-runtime/pkg/reconcile" + "sigs.k8s.io/controller-runtime/pkg/source" +) + +// AddDaemonSetController the new controller to the controller manager +func AddDaemonSetController(mgr manager.Manager) error { + return addDaemonSetController(mgr, newDaemonSetReconciler(mgr)) +} + +func addDaemonSetController(mgr manager.Manager, r *daemonSetReconciler) error { + // Create a new controller + c, err := controller.New("Daemonset-controller", mgr, controller.Options{Reconciler: r}) + if err != nil { + return err + } + + // Watch for changes to secondar resource DaemonSets + // Predicate filters DaemonSets which don't have the k8splugin label + err = c.Watch(&source.Kind{Type: &appsv1.DaemonSet{}}, &handler.EnqueueRequestForObject{}, &daemonSetPredicate{}) + if err != nil { + return err + } + + return nil +} + +func newDaemonSetReconciler(m manager.Manager) *daemonSetReconciler { + return &daemonSetReconciler{client: m.GetClient()} +} + +type daemonSetReconciler struct { + client client.Client +} + +// Reconcile implements the loop that will update the ResourceBundleState CR +// whenever we get any updates from all the daemonSets we watch. +func (r *daemonSetReconciler) Reconcile(req reconcile.Request) (reconcile.Result, error) { + log.Printf("Updating ResourceBundleState for DaemonSet: %+v\n", req) + + ds := &appsv1.DaemonSet{} + err := r.client.Get(context.TODO(), req.NamespacedName, ds) + if err != nil { + if k8serrors.IsNotFound(err) { + log.Printf("DaemonSet not found: %+v. Remove from CR if it is stored there.\n", req.NamespacedName) + // Remove the DaemonSet's status from StatusList + // This can happen if we get the DeletionTimeStamp event + // after the DaemonSet has been deleted. + r.deleteDaemonSetFromAllCRs(req.NamespacedName) + return reconcile.Result{}, nil + } + log.Printf("Failed to get daemonSet: %+v\n", req.NamespacedName) + return reconcile.Result{}, err + } + + // Find the CRs which track this daemonSet via the labelselector + crSelector := returnLabel(ds.GetLabels()) + if crSelector == nil { + log.Println("We should not be here. The predicate should have filtered this DaemonSet") + } + + // Get the CRs which have this label and update them all + // Ideally, we will have only one CR, but there is nothing + // preventing the creation of multiple. + // TODO: Consider using an admission validating webook to prevent multiple + rbStatusList := &v1alpha1.ResourceBundleStateList{} + err = listResources(r.client, req.Namespace, crSelector, rbStatusList) + if err != nil || len(rbStatusList.Items) == 0 { + log.Printf("Did not find any CRs tracking this resource\n") + return reconcile.Result{}, nil + } + + err = r.updateCRs(rbStatusList, ds) + if err != nil { + // Requeue the update + return reconcile.Result{}, err + } + + return reconcile.Result{}, nil +} + +// deleteDaemonSetFromAllCRs deletes daemonSet status from all the CRs when the DaemonSet itself has been deleted +// and we have not handled the updateCRs yet. +// Since, we don't have the daemonSet's labels, we need to look at all the CRs in this namespace +func (r *daemonSetReconciler) deleteDaemonSetFromAllCRs(namespacedName types.NamespacedName) error { + + rbStatusList := &v1alpha1.ResourceBundleStateList{} + err := listResources(r.client, namespacedName.Namespace, nil, rbStatusList) + if err != nil || len(rbStatusList.Items) == 0 { + log.Printf("Did not find any CRs tracking this resource\n") + return nil + } + for _, cr := range rbStatusList.Items { + r.deleteFromSingleCR(&cr, namespacedName.Name) + } + + return nil +} + +func (r *daemonSetReconciler) updateCRs(crl *v1alpha1.ResourceBundleStateList, ds *appsv1.DaemonSet) error { + + for _, cr := range crl.Items { + // DaemonSet is not scheduled for deletion + if ds.DeletionTimestamp == nil { + err := r.updateSingleCR(&cr, ds) + if err != nil { + return err + } + } else { + // DaemonSet is scheduled for deletion + r.deleteFromSingleCR(&cr, ds.Name) + } + } + + return nil +} + +func (r *daemonSetReconciler) deleteFromSingleCR(cr *v1alpha1.ResourceBundleState, name string) error { + cr.Status.ResourceCount-- + length := len(cr.Status.DaemonSetStatuses) + for i, rstatus := range cr.Status.DaemonSetStatuses { + if rstatus.Name == name { + //Delete that status from the array + cr.Status.DaemonSetStatuses[i] = cr.Status.DaemonSetStatuses[length-1] + cr.Status.DaemonSetStatuses[length-1].Status = appsv1.DaemonSetStatus{} + cr.Status.DaemonSetStatuses = cr.Status.DaemonSetStatuses[:length-1] + return nil + } + } + + log.Println("Did not find a status for DaemonSet in CR") + return nil +} + +func (r *daemonSetReconciler) updateSingleCR(cr *v1alpha1.ResourceBundleState, ds *appsv1.DaemonSet) error { + + // Update status after searching for it in the list of resourceStatuses + for i, rstatus := range cr.Status.DaemonSetStatuses { + // Look for the status if we already have it in the CR + if rstatus.Name == ds.Name { + ds.Status.DeepCopyInto(&cr.Status.DaemonSetStatuses[i].Status) + err := r.client.Status().Update(context.TODO(), cr) + if err != nil { + log.Printf("failed to update rbstate: %v\n", err) + return err + } + return nil + } + } + + // Exited for loop with no status found + // Increment the number of tracked resources + cr.Status.ResourceCount++ + + // Add it to CR + cr.Status.DaemonSetStatuses = append(cr.Status.DaemonSetStatuses, appsv1.DaemonSet{ + ObjectMeta: ds.ObjectMeta, + Status: ds.Status, + }) + + err := r.client.Status().Update(context.TODO(), cr) + if err != nil { + log.Printf("failed to update rbstate: %v\n", err) + return err + } + + return nil +} diff --git a/src/monitor/pkg/controller/resourcebundlestate/daemonSet_predicate.go b/src/monitor/pkg/controller/resourcebundlestate/daemonSet_predicate.go new file mode 100644 index 00000000..16a8bc54 --- /dev/null +++ b/src/monitor/pkg/controller/resourcebundlestate/daemonSet_predicate.go @@ -0,0 +1,44 @@ +package resourcebundlestate + +import ( + "sigs.k8s.io/controller-runtime/pkg/event" +) + +type daemonSetPredicate struct { +} + +func (d *daemonSetPredicate) Create(evt event.CreateEvent) bool { + + if evt.Meta == nil { + return false + } + + labels := evt.Meta.GetLabels() + return checkLabel(labels) +} + +func (d *daemonSetPredicate) Delete(evt event.DeleteEvent) bool { + + if evt.Meta == nil { + return false + } + + labels := evt.Meta.GetLabels() + return checkLabel(labels) +} + +func (d *daemonSetPredicate) Update(evt event.UpdateEvent) bool { + + if evt.MetaNew == nil { + return false + } + + labels := evt.MetaNew.GetLabels() + return checkLabel(labels) +} + +func (d *daemonSetPredicate) Generic(evt event.GenericEvent) bool { + + labels := evt.Meta.GetLabels() + return checkLabel(labels) +} diff --git a/src/monitor/pkg/controller/resourcebundlestate/deployment_controller.go b/src/monitor/pkg/controller/resourcebundlestate/deployment_controller.go new file mode 100644 index 00000000..c563ed77 --- /dev/null +++ b/src/monitor/pkg/controller/resourcebundlestate/deployment_controller.go @@ -0,0 +1,182 @@ +package resourcebundlestate + +import ( + "context" + "log" + + "github.com/onap/multicloud-k8s/src/monitor/pkg/apis/k8splugin/v1alpha1" + + appsv1 "k8s.io/api/apps/v1" + k8serrors "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/types" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/controller" + "sigs.k8s.io/controller-runtime/pkg/handler" + "sigs.k8s.io/controller-runtime/pkg/manager" + "sigs.k8s.io/controller-runtime/pkg/reconcile" + "sigs.k8s.io/controller-runtime/pkg/source" +) + +// AddDeploymentController the new controller to the controller manager +func AddDeploymentController(mgr manager.Manager) error { + return addDeploymentController(mgr, newDeploymentReconciler(mgr)) +} + +func addDeploymentController(mgr manager.Manager, r *deploymentReconciler) error { + // Create a new controller + c, err := controller.New("Deployment-controller", mgr, controller.Options{Reconciler: r}) + if err != nil { + return err + } + + // Watch for changes to secondar resource Deployments + // Predicate filters Deployment which don't have the k8splugin label + err = c.Watch(&source.Kind{Type: &appsv1.Deployment{}}, &handler.EnqueueRequestForObject{}, &deploymentPredicate{}) + if err != nil { + return err + } + + return nil +} + +func newDeploymentReconciler(m manager.Manager) *deploymentReconciler { + return &deploymentReconciler{client: m.GetClient()} +} + +type deploymentReconciler struct { + client client.Client +} + +// Reconcile implements the loop that will update the ResourceBundleState CR +// whenever we get any updates from all the deployments we watch. +func (r *deploymentReconciler) Reconcile(req reconcile.Request) (reconcile.Result, error) { + log.Printf("Updating ResourceBundleState for Deployment: %+v\n", req) + + dep := &appsv1.Deployment{} + err := r.client.Get(context.TODO(), req.NamespacedName, dep) + if err != nil { + if k8serrors.IsNotFound(err) { + log.Printf("Deployment not found: %+v. Remove from CR if it is stored there.\n", req.NamespacedName) + // Remove the Deployment's status from StatusList + // This can happen if we get the DeletionTimeStamp event + // after the Deployment has been deleted. + r.deleteDeploymentFromAllCRs(req.NamespacedName) + return reconcile.Result{}, nil + } + log.Printf("Failed to get deployment: %+v\n", req.NamespacedName) + return reconcile.Result{}, err + } + + // Find the CRs which track this deployment via the labelselector + crSelector := returnLabel(dep.GetLabels()) + if crSelector == nil { + log.Println("We should not be here. The predicate should have filtered this Deployment") + } + + // Get the CRs which have this label and update them all + // Ideally, we will have only one CR, but there is nothing + // preventing the creation of multiple. + // TODO: Consider using an admission validating webook to prevent multiple + rbStatusList := &v1alpha1.ResourceBundleStateList{} + err = listResources(r.client, req.Namespace, crSelector, rbStatusList) + if err != nil || len(rbStatusList.Items) == 0 { + log.Printf("Did not find any CRs tracking this resource\n") + return reconcile.Result{}, nil + } + + err = r.updateCRs(rbStatusList, dep) + if err != nil { + // Requeue the update + return reconcile.Result{}, err + } + + return reconcile.Result{}, nil +} + +// deleteDeploymentFromAllCRs deletes deployment status from all the CRs when the Deployment itself has been deleted +// and we have not handled the updateCRs yet. +// Since, we don't have the deployment's labels, we need to look at all the CRs in this namespace +func (r *deploymentReconciler) deleteDeploymentFromAllCRs(namespacedName types.NamespacedName) error { + + rbStatusList := &v1alpha1.ResourceBundleStateList{} + err := listResources(r.client, namespacedName.Namespace, nil, rbStatusList) + if err != nil || len(rbStatusList.Items) == 0 { + log.Printf("Did not find any CRs tracking this resource\n") + return nil + } + for _, cr := range rbStatusList.Items { + r.deleteFromSingleCR(&cr, namespacedName.Name) + } + + return nil +} + +func (r *deploymentReconciler) updateCRs(crl *v1alpha1.ResourceBundleStateList, dep *appsv1.Deployment) error { + + for _, cr := range crl.Items { + // Deployment is not scheduled for deletion + if dep.DeletionTimestamp == nil { + err := r.updateSingleCR(&cr, dep) + if err != nil { + return err + } + } else { + // Deployment is scheduled for deletion + r.deleteFromSingleCR(&cr, dep.Name) + } + } + + return nil +} + +func (r *deploymentReconciler) deleteFromSingleCR(cr *v1alpha1.ResourceBundleState, name string) error { + cr.Status.ResourceCount-- + length := len(cr.Status.DeploymentStatuses) + for i, rstatus := range cr.Status.DeploymentStatuses { + if rstatus.Name == name { + //Delete that status from the array + cr.Status.DeploymentStatuses[i] = cr.Status.DeploymentStatuses[length-1] + cr.Status.DeploymentStatuses[length-1].Status = appsv1.DeploymentStatus{} + cr.Status.DeploymentStatuses = cr.Status.DeploymentStatuses[:length-1] + return nil + } + } + + log.Println("Did not find a status for Deployment in CR") + return nil +} + +func (r *deploymentReconciler) updateSingleCR(cr *v1alpha1.ResourceBundleState, dep *appsv1.Deployment) error { + + // Update status after searching for it in the list of resourceStatuses + for i, rstatus := range cr.Status.DeploymentStatuses { + // Look for the status if we already have it in the CR + if rstatus.Name == dep.Name { + dep.Status.DeepCopyInto(&cr.Status.DeploymentStatuses[i].Status) + err := r.client.Status().Update(context.TODO(), cr) + if err != nil { + log.Printf("failed to update rbstate: %v\n", err) + return err + } + return nil + } + } + + // Exited for loop with no status found + // Increment the number of tracked resources + cr.Status.ResourceCount++ + + // Add it to CR + cr.Status.DeploymentStatuses = append(cr.Status.DeploymentStatuses, appsv1.Deployment{ + ObjectMeta: dep.ObjectMeta, + Status: dep.Status, + }) + + err := r.client.Status().Update(context.TODO(), cr) + if err != nil { + log.Printf("failed to update rbstate: %v\n", err) + return err + } + + return nil +} diff --git a/src/monitor/pkg/controller/resourcebundlestate/deployment_predicate.go b/src/monitor/pkg/controller/resourcebundlestate/deployment_predicate.go new file mode 100644 index 00000000..6061e93f --- /dev/null +++ b/src/monitor/pkg/controller/resourcebundlestate/deployment_predicate.go @@ -0,0 +1,44 @@ +package resourcebundlestate + +import ( + "sigs.k8s.io/controller-runtime/pkg/event" +) + +type deploymentPredicate struct { +} + +func (d *deploymentPredicate) Create(evt event.CreateEvent) bool { + + if evt.Meta == nil { + return false + } + + labels := evt.Meta.GetLabels() + return checkLabel(labels) +} + +func (d *deploymentPredicate) Delete(evt event.DeleteEvent) bool { + + if evt.Meta == nil { + return false + } + + labels := evt.Meta.GetLabels() + return checkLabel(labels) +} + +func (d *deploymentPredicate) Update(evt event.UpdateEvent) bool { + + if evt.MetaNew == nil { + return false + } + + labels := evt.MetaNew.GetLabels() + return checkLabel(labels) +} + +func (d *deploymentPredicate) Generic(evt event.GenericEvent) bool { + + labels := evt.Meta.GetLabels() + return checkLabel(labels) +} diff --git a/src/monitor/pkg/controller/resourcebundlestate/helpers.go b/src/monitor/pkg/controller/resourcebundlestate/helpers.go index dab78825..5a5676f8 100644 --- a/src/monitor/pkg/controller/resourcebundlestate/helpers.go +++ b/src/monitor/pkg/controller/resourcebundlestate/helpers.go @@ -12,7 +12,7 @@ import ( // checkLabel verifies if the expected label exists and returns bool func checkLabel(labels map[string]string) bool { - _, ok := labels["k8splugin.io/rb-inst-id"] + _, ok := labels["emco/deployment-id"] if !ok { log.Printf("Pod does not have label. Filter it.") return false @@ -23,13 +23,13 @@ func checkLabel(labels map[string]string) bool { // returnLabel verifies if the expected label exists and returns a map func returnLabel(labels map[string]string) map[string]string { - l, ok := labels["k8splugin.io/rb-inst-id"] + l, ok := labels["emco/deployment-id"] if !ok { log.Printf("Pod does not have label. Filter it.") return nil } return map[string]string{ - "k8splugin.io/rb-inst-id": l, + "emco/deployment-id": l, } } diff --git a/src/monitor/pkg/controller/resourcebundlestate/ingress_controller.go b/src/monitor/pkg/controller/resourcebundlestate/ingress_controller.go new file mode 100644 index 00000000..603536b3 --- /dev/null +++ b/src/monitor/pkg/controller/resourcebundlestate/ingress_controller.go @@ -0,0 +1,182 @@ +package resourcebundlestate + +import ( + "context" + "log" + + "github.com/onap/multicloud-k8s/src/monitor/pkg/apis/k8splugin/v1alpha1" + + v1beta1 "k8s.io/api/extensions/v1beta1" + k8serrors "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/types" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/controller" + "sigs.k8s.io/controller-runtime/pkg/handler" + "sigs.k8s.io/controller-runtime/pkg/manager" + "sigs.k8s.io/controller-runtime/pkg/reconcile" + "sigs.k8s.io/controller-runtime/pkg/source" +) + +// AddIngressController the new controller to the controller manager +func AddIngressController(mgr manager.Manager) error { + return addIngressController(mgr, newIngressReconciler(mgr)) +} + +func addIngressController(mgr manager.Manager, r *ingressReconciler) error { + // Create a new controller + c, err := controller.New("Ingress-controller", mgr, controller.Options{Reconciler: r}) + if err != nil { + return err + } + + // Watch for changes to secondar resource Ingress + // Predicate filters Ingress which don't have the k8splugin label + err = c.Watch(&source.Kind{Type: &v1beta1.Ingress{}}, &handler.EnqueueRequestForObject{}, &ingressPredicate{}) + if err != nil { + return err + } + + return nil +} + +func newIngressReconciler(m manager.Manager) *ingressReconciler { + return &ingressReconciler{client: m.GetClient()} +} + +type ingressReconciler struct { + client client.Client +} + +// Reconcile implements the loop that will update the ResourceBundleState CR +// whenever we get any updates from all the ingress we watch. +func (r *ingressReconciler) Reconcile(req reconcile.Request) (reconcile.Result, error) { + log.Printf("Updating ResourceBundleState for Ingress: %+v\n", req) + + ing := &v1beta1.Ingress{} + err := r.client.Get(context.TODO(), req.NamespacedName, ing) + if err != nil { + if k8serrors.IsNotFound(err) { + log.Printf("Ingress not found: %+v. Remove from CR if it is stored there.\n", req.NamespacedName) + // Remove the Ingress's status from StatusList + // This can happen if we get the DeletionTimeStamp event + // after the Ingress has been deleted. + r.deleteIngressFromAllCRs(req.NamespacedName) + return reconcile.Result{}, nil + } + log.Printf("Failed to get ingress: %+v\n", req.NamespacedName) + return reconcile.Result{}, err + } + + // Find the CRs which track this Ingress via the labelselector + crSelector := returnLabel(ing.GetLabels()) + if crSelector == nil { + log.Println("We should not be here. The predicate should have filtered this Ingress") + } + + // Get the CRs which have this label and update them all + // Ideally, we will have only one CR, but there is nothing + // preventing the creation of multiple. + // TODO: Consider using an admission validating webook to prevent multiple + rbStatusList := &v1alpha1.ResourceBundleStateList{} + err = listResources(r.client, req.Namespace, crSelector, rbStatusList) + if err != nil || len(rbStatusList.Items) == 0 { + log.Printf("Did not find any CRs tracking this resource\n") + return reconcile.Result{}, nil + } + + err = r.updateCRs(rbStatusList, ing) + if err != nil { + // Requeue the update + return reconcile.Result{}, err + } + + return reconcile.Result{}, nil +} + +// deleteIngressFromAllCRs deletes ingress status from all the CRs when the Ingress itself has been deleted +// and we have not handled the updateCRs yet. +// Since, we don't have the Ingress's labels, we need to look at all the CRs in this namespace +func (r *ingressReconciler) deleteIngressFromAllCRs(namespacedName types.NamespacedName) error { + + rbStatusList := &v1alpha1.ResourceBundleStateList{} + err := listResources(r.client, namespacedName.Namespace, nil, rbStatusList) + if err != nil || len(rbStatusList.Items) == 0 { + log.Printf("Did not find any CRs tracking this resource\n") + return nil + } + for _, cr := range rbStatusList.Items { + r.deleteFromSingleCR(&cr, namespacedName.Name) + } + + return nil +} + +func (r *ingressReconciler) updateCRs(crl *v1alpha1.ResourceBundleStateList, ing *v1beta1.Ingress) error { + + for _, cr := range crl.Items { + // Ingress is not scheduled for deletion + if ing.DeletionTimestamp == nil { + err := r.updateSingleCR(&cr, ing) + if err != nil { + return err + } + } else { + // Ingress is scheduled for deletion + r.deleteFromSingleCR(&cr, ing.Name) + } + } + + return nil +} + +func (r *ingressReconciler) deleteFromSingleCR(cr *v1alpha1.ResourceBundleState, name string) error { + cr.Status.ResourceCount-- + length := len(cr.Status.IngressStatuses) + for i, rstatus := range cr.Status.IngressStatuses { + if rstatus.Name == name { + //Delete that status from the array + cr.Status.IngressStatuses[i] = cr.Status.IngressStatuses[length-1] + cr.Status.IngressStatuses[length-1].Status = v1beta1.IngressStatus{} + cr.Status.IngressStatuses = cr.Status.IngressStatuses[:length-1] + return nil + } + } + + log.Println("Did not find a status for Ingress in CR") + return nil +} + +func (r *ingressReconciler) updateSingleCR(cr *v1alpha1.ResourceBundleState, ing *v1beta1.Ingress) error { + + // Update status after searching for it in the list of resourceStatuses + for i, rstatus := range cr.Status.IngressStatuses { + // Look for the status if we already have it in the CR + if rstatus.Name == ing.Name { + ing.Status.DeepCopyInto(&cr.Status.IngressStatuses[i].Status) + err := r.client.Status().Update(context.TODO(), cr) + if err != nil { + log.Printf("failed to update rbstate: %v\n", err) + return err + } + return nil + } + } + + // Exited for loop with no status found + // Increment the number of tracked resources + cr.Status.ResourceCount++ + + // Add it to CR + cr.Status.IngressStatuses = append(cr.Status.IngressStatuses, v1beta1.Ingress{ + ObjectMeta: ing.ObjectMeta, + Status: ing.Status, + }) + + err := r.client.Status().Update(context.TODO(), cr) + if err != nil { + log.Printf("failed to update rbstate: %v\n", err) + return err + } + + return nil +} diff --git a/src/monitor/pkg/controller/resourcebundlestate/ingress_predicate.go b/src/monitor/pkg/controller/resourcebundlestate/ingress_predicate.go new file mode 100644 index 00000000..9a41c842 --- /dev/null +++ b/src/monitor/pkg/controller/resourcebundlestate/ingress_predicate.go @@ -0,0 +1,44 @@ +package resourcebundlestate + +import ( + "sigs.k8s.io/controller-runtime/pkg/event" +) + +type ingressPredicate struct { +} + +func (i *ingressPredicate) Create(evt event.CreateEvent) bool { + + if evt.Meta == nil { + return false + } + + labels := evt.Meta.GetLabels() + return checkLabel(labels) +} + +func (i *ingressPredicate) Delete(evt event.DeleteEvent) bool { + + if evt.Meta == nil { + return false + } + + labels := evt.Meta.GetLabels() + return checkLabel(labels) +} + +func (i *ingressPredicate) Update(evt event.UpdateEvent) bool { + + if evt.MetaNew == nil { + return false + } + + labels := evt.MetaNew.GetLabels() + return checkLabel(labels) +} + +func (i *ingressPredicate) Generic(evt event.GenericEvent) bool { + + labels := evt.Meta.GetLabels() + return checkLabel(labels) +} diff --git a/src/monitor/pkg/controller/resourcebundlestate/job_controller.go b/src/monitor/pkg/controller/resourcebundlestate/job_controller.go new file mode 100644 index 00000000..cd76e66f --- /dev/null +++ b/src/monitor/pkg/controller/resourcebundlestate/job_controller.go @@ -0,0 +1,182 @@ +package resourcebundlestate + +import ( + "context" + "log" + + "github.com/onap/multicloud-k8s/src/monitor/pkg/apis/k8splugin/v1alpha1" + + v1 "k8s.io/api/batch/v1" + k8serrors "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/types" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/controller" + "sigs.k8s.io/controller-runtime/pkg/handler" + "sigs.k8s.io/controller-runtime/pkg/manager" + "sigs.k8s.io/controller-runtime/pkg/reconcile" + "sigs.k8s.io/controller-runtime/pkg/source" +) + +// AddJobController the new controller to the controller manager +func AddJobController(mgr manager.Manager) error { + return addJobController(mgr, newJobReconciler(mgr)) +} + +func addJobController(mgr manager.Manager, r *jobReconciler) error { + // Create a new controller + c, err := controller.New("Job-controller", mgr, controller.Options{Reconciler: r}) + if err != nil { + return err + } + + // Watch for changes to secondar resource Jobs + // Predicate filters Job which don't have the k8splugin label + err = c.Watch(&source.Kind{Type: &v1.Job{}}, &handler.EnqueueRequestForObject{}, &jobPredicate{}) + if err != nil { + return err + } + + return nil +} + +func newJobReconciler(m manager.Manager) *jobReconciler { + return &jobReconciler{client: m.GetClient()} +} + +type jobReconciler struct { + client client.Client +} + +// Reconcile implements the loop that will update the ResourceBundleState CR +// whenever we get any updates from all the jobs we watch. +func (r *jobReconciler) Reconcile(req reconcile.Request) (reconcile.Result, error) { + log.Printf("Updating ResourceBundleState for Job: %+v\n", req) + + job := &v1.Job{} + err := r.client.Get(context.TODO(), req.NamespacedName, job) + if err != nil { + if k8serrors.IsNotFound(err) { + log.Printf("Job not found: %+v. Remove from CR if it is stored there.\n", req.NamespacedName) + // Remove the Job's status from StatusList + // This can happen if we get the DeletionTimeStamp event + // after the Job has been deleted. + r.deleteJobFromAllCRs(req.NamespacedName) + return reconcile.Result{}, nil + } + log.Printf("Failed to get Job: %+v\n", req.NamespacedName) + return reconcile.Result{}, err + } + + // Find the CRs which track this Job via the labelselector + crSelector := returnLabel(job.GetLabels()) + if crSelector == nil { + log.Println("We should not be here. The predicate should have filtered this Job") + } + + // Get the CRs which have this label and update them all + // Ideally, we will have only one CR, but there is nothing + // preventing the creation of multiple. + // TODO: Consider using an admission validating webook to prevent multiple + rbStatusList := &v1alpha1.ResourceBundleStateList{} + err = listResources(r.client, req.Namespace, crSelector, rbStatusList) + if err != nil || len(rbStatusList.Items) == 0 { + log.Printf("Did not find any CRs tracking this resource\n") + return reconcile.Result{}, nil + } + + err = r.updateCRs(rbStatusList, job) + if err != nil { + // Requeue the update + return reconcile.Result{}, err + } + + return reconcile.Result{}, nil +} + +// deleteJobFromAllCRs deletes job status from all the CRs when the Job itself has been deleted +// and we have not handled the updateCRs yet. +// Since, we don't have the job's labels, we need to look at all the CRs in this namespace +func (r *jobReconciler) deleteJobFromAllCRs(namespacedName types.NamespacedName) error { + + rbStatusList := &v1alpha1.ResourceBundleStateList{} + err := listResources(r.client, namespacedName.Namespace, nil, rbStatusList) + if err != nil || len(rbStatusList.Items) == 0 { + log.Printf("Did not find any CRs tracking this resource\n") + return nil + } + for _, cr := range rbStatusList.Items { + r.deleteFromSingleCR(&cr, namespacedName.Name) + } + + return nil +} + +func (r *jobReconciler) updateCRs(crl *v1alpha1.ResourceBundleStateList, job *v1.Job) error { + + for _, cr := range crl.Items { + // Job is not scheduled for deletion + if job.DeletionTimestamp == nil { + err := r.updateSingleCR(&cr, job) + if err != nil { + return err + } + } else { + // Job is scheduled for deletion + r.deleteFromSingleCR(&cr, job.Name) + } + } + + return nil +} + +func (r *jobReconciler) deleteFromSingleCR(cr *v1alpha1.ResourceBundleState, name string) error { + cr.Status.ResourceCount-- + length := len(cr.Status.JobStatuses) + for i, rstatus := range cr.Status.JobStatuses { + if rstatus.Name == name { + //Delete that status from the array + cr.Status.JobStatuses[i] = cr.Status.JobStatuses[length-1] + cr.Status.JobStatuses[length-1].Status = v1.JobStatus{} + cr.Status.JobStatuses = cr.Status.JobStatuses[:length-1] + return nil + } + } + + log.Println("Did not find a status for Job in CR") + return nil +} + +func (r *jobReconciler) updateSingleCR(cr *v1alpha1.ResourceBundleState, job *v1.Job) error { + + // Update status after searching for it in the list of resourceStatuses + for i, rstatus := range cr.Status.JobStatuses { + // Look for the status if we already have it in the CR + if rstatus.Name == job.Name { + job.Status.DeepCopyInto(&cr.Status.JobStatuses[i].Status) + err := r.client.Status().Update(context.TODO(), cr) + if err != nil { + log.Printf("failed to update rbstate: %v\n", err) + return err + } + return nil + } + } + + // Exited for loop with no status found + // Increment the number of tracked resources + cr.Status.ResourceCount++ + + // Add it to CR + cr.Status.JobStatuses = append(cr.Status.JobStatuses, v1.Job{ + ObjectMeta: job.ObjectMeta, + Status: job.Status, + }) + + err := r.client.Status().Update(context.TODO(), cr) + if err != nil { + log.Printf("failed to update rbstate: %v\n", err) + return err + } + + return nil +} diff --git a/src/monitor/pkg/controller/resourcebundlestate/job_predicate.go b/src/monitor/pkg/controller/resourcebundlestate/job_predicate.go new file mode 100644 index 00000000..14841532 --- /dev/null +++ b/src/monitor/pkg/controller/resourcebundlestate/job_predicate.go @@ -0,0 +1,44 @@ +package resourcebundlestate + +import ( + "sigs.k8s.io/controller-runtime/pkg/event" +) + +type jobPredicate struct { +} + +func (j *jobPredicate) Create(evt event.CreateEvent) bool { + + if evt.Meta == nil { + return false + } + + labels := evt.Meta.GetLabels() + return checkLabel(labels) +} + +func (j *jobPredicate) Delete(evt event.DeleteEvent) bool { + + if evt.Meta == nil { + return false + } + + labels := evt.Meta.GetLabels() + return checkLabel(labels) +} + +func (j *jobPredicate) Update(evt event.UpdateEvent) bool { + + if evt.MetaNew == nil { + return false + } + + labels := evt.MetaNew.GetLabels() + return checkLabel(labels) +} + +func (j *jobPredicate) Generic(evt event.GenericEvent) bool { + + labels := evt.Meta.GetLabels() + return checkLabel(labels) +} diff --git a/src/monitor/pkg/controller/resourcebundlestate/pod_controller.go b/src/monitor/pkg/controller/resourcebundlestate/pod_controller.go index ba3685f7..65a324db 100644 --- a/src/monitor/pkg/controller/resourcebundlestate/pod_controller.go +++ b/src/monitor/pkg/controller/resourcebundlestate/pod_controller.go @@ -24,7 +24,7 @@ func AddPodController(mgr manager.Manager) error { func addPodController(mgr manager.Manager, r *podReconciler) error { // Create a new controller - c, err := controller.New("ResourceBundleState-controller", mgr, controller.Options{Reconciler: r}) + c, err := controller.New("Pod-controller", mgr, controller.Options{Reconciler: r}) if err != nil { return err } diff --git a/src/monitor/pkg/controller/resourcebundlestate/secret_controller.go b/src/monitor/pkg/controller/resourcebundlestate/secret_controller.go new file mode 100644 index 00000000..fe70d53f --- /dev/null +++ b/src/monitor/pkg/controller/resourcebundlestate/secret_controller.go @@ -0,0 +1,179 @@ +package resourcebundlestate + +import ( + "context" + "log" + + "github.com/onap/multicloud-k8s/src/monitor/pkg/apis/k8splugin/v1alpha1" + + corev1 "k8s.io/api/core/v1" + k8serrors "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/types" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/controller" + "sigs.k8s.io/controller-runtime/pkg/handler" + "sigs.k8s.io/controller-runtime/pkg/manager" + "sigs.k8s.io/controller-runtime/pkg/reconcile" + "sigs.k8s.io/controller-runtime/pkg/source" +) + +// AddSecretController the new controller to the controller manager +func AddSecretController(mgr manager.Manager) error { + return addSecretController(mgr, newSecretReconciler(mgr)) +} + +func addSecretController(mgr manager.Manager, r *secretReconciler) error { + // Create a new controller + c, err := controller.New("Secret-controller", mgr, controller.Options{Reconciler: r}) + if err != nil { + return err + } + + // Watch for changes to secondar resource Secret + // Predicate filters Secret which don't have the k8splugin label + err = c.Watch(&source.Kind{Type: &corev1.Secret{}}, &handler.EnqueueRequestForObject{}, &secretPredicate{}) + if err != nil { + return err + } + + return nil +} + +func newSecretReconciler(m manager.Manager) *secretReconciler { + return &secretReconciler{client: m.GetClient()} +} + +type secretReconciler struct { + client client.Client +} + +// Reconcile implements the loop that will update the ResourceBundleState CR +// whenever we get any updates from all the Secrets we watch. +func (r *secretReconciler) Reconcile(req reconcile.Request) (reconcile.Result, error) { + log.Printf("Updating ResourceBundleState for Secret: %+v\n", req) + + sec := &corev1.Secret{} + err := r.client.Get(context.TODO(), req.NamespacedName, sec) + if err != nil { + if k8serrors.IsNotFound(err) { + log.Printf("Secret not found: %+v. Remove from CR if it is stored there.\n", req.NamespacedName) + // Remove the Secret's status from StatusList + // This can happen if we get the DeletionTimeStamp event + // after the Secret has been deleted. + r.deleteSecretFromAllCRs(req.NamespacedName) + return reconcile.Result{}, nil + } + log.Printf("Failed to get Secret: %+v\n", req.NamespacedName) + return reconcile.Result{}, err + } + + // Find the CRs which track this Secret via the labelselector + crSelector := returnLabel(sec.GetLabels()) + if crSelector == nil { + log.Println("We should not be here. The predicate should have filtered this Secret") + } + + // Get the CRs which have this label and update them all + // Ideally, we will have only one CR, but there is nothing + // preventing the creation of multiple. + // TODO: Consider using an admission validating webook to prevent multiple + rbStatusList := &v1alpha1.ResourceBundleStateList{} + err = listResources(r.client, req.Namespace, crSelector, rbStatusList) + if err != nil || len(rbStatusList.Items) == 0 { + log.Printf("Did not find any CRs tracking this resource\n") + return reconcile.Result{}, nil + } + + err = r.updateCRs(rbStatusList, sec) + if err != nil { + // Requeue the update + return reconcile.Result{}, err + } + + return reconcile.Result{}, nil +} + +// deleteSecretFromAllCRs deletes Secret status from all the CRs when the Secret itself has been deleted +// and we have not handled the updateCRs yet. +// Since, we don't have the Secret's labels, we need to look at all the CRs in this namespace +func (r *secretReconciler) deleteSecretFromAllCRs(namespacedName types.NamespacedName) error { + + rbStatusList := &v1alpha1.ResourceBundleStateList{} + err := listResources(r.client, namespacedName.Namespace, nil, rbStatusList) + if err != nil || len(rbStatusList.Items) == 0 { + log.Printf("Did not find any CRs tracking this resource\n") + return nil + } + for _, cr := range rbStatusList.Items { + r.deleteFromSingleCR(&cr, namespacedName.Name) + } + + return nil +} + +func (r *secretReconciler) updateCRs(crl *v1alpha1.ResourceBundleStateList, sec *corev1.Secret) error { + + for _, cr := range crl.Items { + // Secret is not scheduled for deletion + if sec.DeletionTimestamp == nil { + err := r.updateSingleCR(&cr, sec) + if err != nil { + return err + } + } else { + // Secret is scheduled for deletion + r.deleteFromSingleCR(&cr, sec.Name) + } + } + + return nil +} + +func (r *secretReconciler) deleteFromSingleCR(cr *v1alpha1.ResourceBundleState, name string) error { + cr.Status.ResourceCount-- + length := len(cr.Status.SecretStatuses) + for i, rstatus := range cr.Status.SecretStatuses { + if rstatus.Name == name { + //Delete that status from the array + cr.Status.SecretStatuses[i] = cr.Status.SecretStatuses[length-1] + cr.Status.SecretStatuses = cr.Status.SecretStatuses[:length-1] + return nil + } + } + + log.Println("Did not find a status for SecretStatuses in CR") + return nil +} + +func (r *secretReconciler) updateSingleCR(cr *v1alpha1.ResourceBundleState, sec *corev1.Secret) error { + + // Update status after searching for it in the list of resourceStatuses + for _, rstatus := range cr.Status.SecretStatuses { + // Look for the status if we already have it in the CR + if rstatus.Name == sec.Name { + err := r.client.Status().Update(context.TODO(), cr) + if err != nil { + log.Printf("failed to update rbstate: %v\n", err) + return err + } + return nil + } + } + + // Exited for loop with no status found + // Increment the number of tracked resources + cr.Status.ResourceCount++ + + // Add it to CR + cr.Status.SecretStatuses = append(cr.Status.SecretStatuses, corev1.Secret{ + ObjectMeta: sec.ObjectMeta, + }) + + err := r.client.Status().Update(context.TODO(), cr) + if err != nil { + log.Printf("failed to update rbstate: %v\n", err) + return err + } + + return nil +} diff --git a/src/monitor/pkg/controller/resourcebundlestate/secret_predicate.go b/src/monitor/pkg/controller/resourcebundlestate/secret_predicate.go new file mode 100644 index 00000000..e25e1114 --- /dev/null +++ b/src/monitor/pkg/controller/resourcebundlestate/secret_predicate.go @@ -0,0 +1,44 @@ +package resourcebundlestate + +import ( + "sigs.k8s.io/controller-runtime/pkg/event" +) + +type secretPredicate struct { +} + +func (s *secretPredicate) Create(evt event.CreateEvent) bool { + + if evt.Meta == nil { + return false + } + + labels := evt.Meta.GetLabels() + return checkLabel(labels) +} + +func (s *secretPredicate) Delete(evt event.DeleteEvent) bool { + + if evt.Meta == nil { + return false + } + + labels := evt.Meta.GetLabels() + return checkLabel(labels) +} + +func (s *secretPredicate) Update(evt event.UpdateEvent) bool { + + if evt.MetaNew == nil { + return false + } + + labels := evt.MetaNew.GetLabels() + return checkLabel(labels) +} + +func (s *secretPredicate) Generic(evt event.GenericEvent) bool { + + labels := evt.Meta.GetLabels() + return checkLabel(labels) +} diff --git a/src/monitor/pkg/controller/resourcebundlestate/service_controller.go b/src/monitor/pkg/controller/resourcebundlestate/service_controller.go new file mode 100644 index 00000000..d1bb2fd6 --- /dev/null +++ b/src/monitor/pkg/controller/resourcebundlestate/service_controller.go @@ -0,0 +1,182 @@ +package resourcebundlestate + +import ( + "context" + "log" + + "github.com/onap/multicloud-k8s/src/monitor/pkg/apis/k8splugin/v1alpha1" + + corev1 "k8s.io/api/core/v1" + k8serrors "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/types" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/controller" + "sigs.k8s.io/controller-runtime/pkg/handler" + "sigs.k8s.io/controller-runtime/pkg/manager" + "sigs.k8s.io/controller-runtime/pkg/reconcile" + "sigs.k8s.io/controller-runtime/pkg/source" +) + +// AddServiceController the new controller to the controller manager +func AddServiceController(mgr manager.Manager) error { + return addServiceController(mgr, newServiceReconciler(mgr)) +} + +func addServiceController(mgr manager.Manager, r *serviceReconciler) error { + // Create a new controller + c, err := controller.New("Service-controller", mgr, controller.Options{Reconciler: r}) + if err != nil { + return err + } + + // Watch for changes to secondar resource Services + // Predicate filters Service which don't have the k8splugin label + err = c.Watch(&source.Kind{Type: &corev1.Service{}}, &handler.EnqueueRequestForObject{}, &servicePredicate{}) + if err != nil { + return err + } + + return nil +} + +func newServiceReconciler(m manager.Manager) *serviceReconciler { + return &serviceReconciler{client: m.GetClient()} +} + +type serviceReconciler struct { + client client.Client +} + +// Reconcile implements the loop that will update the ResourceBundleState CR +// whenever we get any updates from all the services we watch. +func (r *serviceReconciler) Reconcile(req reconcile.Request) (reconcile.Result, error) { + log.Printf("Updating ResourceBundleState for Service: %+v\n", req) + + svc := &corev1.Service{} + err := r.client.Get(context.TODO(), req.NamespacedName, svc) + if err != nil { + if k8serrors.IsNotFound(err) { + log.Printf("Service not found: %+v. Remove from CR if it is stored there.\n", req.NamespacedName) + // Remove the Service's status from StatusList + // This can happen if we get the DeletionTimeStamp event + // after the Service has been deleted. + r.deleteServiceFromAllCRs(req.NamespacedName) + return reconcile.Result{}, nil + } + log.Printf("Failed to get service: %+v\n", req.NamespacedName) + return reconcile.Result{}, err + } + + // Find the CRs which track this service via the labelselector + crSelector := returnLabel(svc.GetLabels()) + if crSelector == nil { + log.Println("We should not be here. The predicate should have filtered this Service") + } + + // Get the CRs which have this label and update them all + // Ideally, we will have only one CR, but there is nothing + // preventing the creation of multiple. + // TODO: Consider using an admission validating webook to prevent multiple + rbStatusList := &v1alpha1.ResourceBundleStateList{} + err = listResources(r.client, req.Namespace, crSelector, rbStatusList) + if err != nil || len(rbStatusList.Items) == 0 { + log.Printf("Did not find any CRs tracking this resource\n") + return reconcile.Result{}, nil + } + + err = r.updateCRs(rbStatusList, svc) + if err != nil { + // Requeue the update + return reconcile.Result{}, err + } + + return reconcile.Result{}, nil +} + +// deleteServiceFromAllCRs deletes service status from all the CRs when the Service itself has been deleted +// and we have not handled the updateCRs yet. +// Since, we don't have the service's labels, we need to look at all the CRs in this namespace +func (r *serviceReconciler) deleteServiceFromAllCRs(namespacedName types.NamespacedName) error { + + rbStatusList := &v1alpha1.ResourceBundleStateList{} + err := listResources(r.client, namespacedName.Namespace, nil, rbStatusList) + if err != nil || len(rbStatusList.Items) == 0 { + log.Printf("Did not find any CRs tracking this resource\n") + return nil + } + for _, cr := range rbStatusList.Items { + r.deleteFromSingleCR(&cr, namespacedName.Name) + } + + return nil +} + +func (r *serviceReconciler) updateCRs(crl *v1alpha1.ResourceBundleStateList, svc *corev1.Service) error { + + for _, cr := range crl.Items { + // Service is not scheduled for deletion + if svc.DeletionTimestamp == nil { + err := r.updateSingleCR(&cr, svc) + if err != nil { + return err + } + } else { + // Service is scheduled for deletion + r.deleteFromSingleCR(&cr, svc.Name) + } + } + + return nil +} + +func (r *serviceReconciler) deleteFromSingleCR(cr *v1alpha1.ResourceBundleState, name string) error { + cr.Status.ResourceCount-- + length := len(cr.Status.ServiceStatuses) + for i, rstatus := range cr.Status.ServiceStatuses { + if rstatus.Name == name { + //Delete that status from the array + cr.Status.ServiceStatuses[i] = cr.Status.ServiceStatuses[length-1] + cr.Status.ServiceStatuses[length-1].Status = corev1.ServiceStatus{} + cr.Status.ServiceStatuses = cr.Status.ServiceStatuses[:length-1] + return nil + } + } + + log.Println("Did not find a status for Service in CR") + return nil +} + +func (r *serviceReconciler) updateSingleCR(cr *v1alpha1.ResourceBundleState, svc *corev1.Service) error { + + // Update status after searching for it in the list of resourceStatuses + for i, rstatus := range cr.Status.ServiceStatuses { + // Look for the status if we already have it in the CR + if rstatus.Name == svc.Name { + svc.Status.DeepCopyInto(&cr.Status.ServiceStatuses[i].Status) + err := r.client.Status().Update(context.TODO(), cr) + if err != nil { + log.Printf("failed to update rbstate: %v\n", err) + return err + } + return nil + } + } + + // Exited for loop with no status found + // Increment the number of tracked resources + cr.Status.ResourceCount++ + + // Add it to CR + cr.Status.ServiceStatuses = append(cr.Status.ServiceStatuses, corev1.Service{ + ObjectMeta: svc.ObjectMeta, + Status: svc.Status, + }) + + err := r.client.Status().Update(context.TODO(), cr) + if err != nil { + log.Printf("failed to update rbstate: %v\n", err) + return err + } + + return nil +} diff --git a/src/monitor/pkg/controller/resourcebundlestate/service_predicate.go b/src/monitor/pkg/controller/resourcebundlestate/service_predicate.go new file mode 100644 index 00000000..d427443f --- /dev/null +++ b/src/monitor/pkg/controller/resourcebundlestate/service_predicate.go @@ -0,0 +1,44 @@ +package resourcebundlestate + +import ( + "sigs.k8s.io/controller-runtime/pkg/event" +) + +type servicePredicate struct { +} + +func (s *servicePredicate) Create(evt event.CreateEvent) bool { + + if evt.Meta == nil { + return false + } + + labels := evt.Meta.GetLabels() + return checkLabel(labels) +} + +func (s *servicePredicate) Delete(evt event.DeleteEvent) bool { + + if evt.Meta == nil { + return false + } + + labels := evt.Meta.GetLabels() + return checkLabel(labels) +} + +func (s *servicePredicate) Update(evt event.UpdateEvent) bool { + + if evt.MetaNew == nil { + return false + } + + labels := evt.MetaNew.GetLabels() + return checkLabel(labels) +} + +func (s *servicePredicate) Generic(evt event.GenericEvent) bool { + + labels := evt.Meta.GetLabels() + return checkLabel(labels) +} diff --git a/src/monitor/pkg/controller/resourcebundlestate/statefulSet_controller.go b/src/monitor/pkg/controller/resourcebundlestate/statefulSet_controller.go new file mode 100644 index 00000000..ebe61dba --- /dev/null +++ b/src/monitor/pkg/controller/resourcebundlestate/statefulSet_controller.go @@ -0,0 +1,182 @@ +package resourcebundlestate + +import ( + "context" + "log" + + "github.com/onap/multicloud-k8s/src/monitor/pkg/apis/k8splugin/v1alpha1" + + appsv1 "k8s.io/api/apps/v1" + k8serrors "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/types" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/controller" + "sigs.k8s.io/controller-runtime/pkg/handler" + "sigs.k8s.io/controller-runtime/pkg/manager" + "sigs.k8s.io/controller-runtime/pkg/reconcile" + "sigs.k8s.io/controller-runtime/pkg/source" +) + +// AddStatefulSetController the new controller to the controller manager +func AddStatefulSetController(mgr manager.Manager) error { + return addStatefulSetController(mgr, newStatefulSetReconciler(mgr)) +} + +func addStatefulSetController(mgr manager.Manager, r *statefulSetReconciler) error { + // Create a new controller + c, err := controller.New("Statefulset-controller", mgr, controller.Options{Reconciler: r}) + if err != nil { + return err + } + + // Watch for changes to secondar resource StatefulSets + // Predicate filters StatefulSet which don't have the k8splugin label + err = c.Watch(&source.Kind{Type: &appsv1.StatefulSet{}}, &handler.EnqueueRequestForObject{}, &statefulSetPredicate{}) + if err != nil { + return err + } + + return nil +} + +func newStatefulSetReconciler(m manager.Manager) *statefulSetReconciler { + return &statefulSetReconciler{client: m.GetClient()} +} + +type statefulSetReconciler struct { + client client.Client +} + +// Reconcile implements the loop that will update the ResourceBundleState CR +// whenever we get any updates from all the StatefulSets we watch. +func (r *statefulSetReconciler) Reconcile(req reconcile.Request) (reconcile.Result, error) { + log.Printf("Updating ResourceBundleState for StatefulSet: %+v\n", req) + + sfs := &appsv1.StatefulSet{} + err := r.client.Get(context.TODO(), req.NamespacedName, sfs) + if err != nil { + if k8serrors.IsNotFound(err) { + log.Printf("StatefulSet not found: %+v. Remove from CR if it is stored there.\n", req.NamespacedName) + // Remove the StatefulSet's status from StatusList + // This can happen if we get the DeletionTimeStamp event + // after the StatefulSet has been deleted. + r.deleteStatefulSetFromAllCRs(req.NamespacedName) + return reconcile.Result{}, nil + } + log.Printf("Failed to get statefulSet: %+v\n", req.NamespacedName) + return reconcile.Result{}, err + } + + // Find the CRs which track this statefulSet via the labelselector + crSelector := returnLabel(sfs.GetLabels()) + if crSelector == nil { + log.Println("We should not be here. The predicate should have filtered this StatefulSet") + } + + // Get the CRs which have this label and update them all + // Ideally, we will have only one CR, but there is nothing + // preventing the creation of multiple. + // TODO: Consider using an admission validating webook to prevent multiple + rbStatusList := &v1alpha1.ResourceBundleStateList{} + err = listResources(r.client, req.Namespace, crSelector, rbStatusList) + if err != nil || len(rbStatusList.Items) == 0 { + log.Printf("Did not find any CRs tracking this resource\n") + return reconcile.Result{}, nil + } + + err = r.updateCRs(rbStatusList, sfs) + if err != nil { + // Requeue the update + return reconcile.Result{}, err + } + + return reconcile.Result{}, nil +} + +// deleteStatefulSetFromAllCRs deletes statefulSet status from all the CRs when the StatefulSet itself has been deleted +// and we have not handled the updateCRs yet. +// Since, we don't have the statefulSet's labels, we need to look at all the CRs in this namespace +func (r *statefulSetReconciler) deleteStatefulSetFromAllCRs(namespacedName types.NamespacedName) error { + + rbStatusList := &v1alpha1.ResourceBundleStateList{} + err := listResources(r.client, namespacedName.Namespace, nil, rbStatusList) + if err != nil || len(rbStatusList.Items) == 0 { + log.Printf("Did not find any CRs tracking this resource\n") + return nil + } + for _, cr := range rbStatusList.Items { + r.deleteFromSingleCR(&cr, namespacedName.Name) + } + + return nil +} + +func (r *statefulSetReconciler) updateCRs(crl *v1alpha1.ResourceBundleStateList, sfs *appsv1.StatefulSet) error { + + for _, cr := range crl.Items { + // StatefulSet is not scheduled for deletion + if sfs.DeletionTimestamp == nil { + err := r.updateSingleCR(&cr, sfs) + if err != nil { + return err + } + } else { + // StatefulSet is scheduled for deletion + r.deleteFromSingleCR(&cr, sfs.Name) + } + } + + return nil +} + +func (r *statefulSetReconciler) deleteFromSingleCR(cr *v1alpha1.ResourceBundleState, name string) error { + cr.Status.ResourceCount-- + length := len(cr.Status.StatefulSetStatuses) + for i, rstatus := range cr.Status.StatefulSetStatuses { + if rstatus.Name == name { + //Delete that status from the array + cr.Status.StatefulSetStatuses[i] = cr.Status.StatefulSetStatuses[length-1] + cr.Status.StatefulSetStatuses[length-1].Status = appsv1.StatefulSetStatus{} + cr.Status.StatefulSetStatuses = cr.Status.StatefulSetStatuses[:length-1] + return nil + } + } + + log.Println("Did not find a status for StatefulSet in CR") + return nil +} + +func (r *statefulSetReconciler) updateSingleCR(cr *v1alpha1.ResourceBundleState, sfs *appsv1.StatefulSet) error { + + // Update status after searching for it in the list of resourceStatuses + for i, rstatus := range cr.Status.StatefulSetStatuses { + // Look for the status if we already have it in the CR + if rstatus.Name == sfs.Name { + sfs.Status.DeepCopyInto(&cr.Status.StatefulSetStatuses[i].Status) + err := r.client.Status().Update(context.TODO(), cr) + if err != nil { + log.Printf("failed to update rbstate: %v\n", err) + return err + } + return nil + } + } + + // Exited for loop with no status found + // Increment the number of tracked resources + cr.Status.ResourceCount++ + + // Add it to CR + cr.Status.StatefulSetStatuses = append(cr.Status.StatefulSetStatuses, appsv1.StatefulSet{ + ObjectMeta: sfs.ObjectMeta, + Status: sfs.Status, + }) + + err := r.client.Status().Update(context.TODO(), cr) + if err != nil { + log.Printf("failed to update rbstate: %v\n", err) + return err + } + + return nil +} diff --git a/src/monitor/pkg/controller/resourcebundlestate/statefulSet_predicate.go b/src/monitor/pkg/controller/resourcebundlestate/statefulSet_predicate.go new file mode 100644 index 00000000..af313131 --- /dev/null +++ b/src/monitor/pkg/controller/resourcebundlestate/statefulSet_predicate.go @@ -0,0 +1,44 @@ +package resourcebundlestate + +import ( + "sigs.k8s.io/controller-runtime/pkg/event" +) + +type statefulSetPredicate struct { +} + +func (s *statefulSetPredicate) Create(evt event.CreateEvent) bool { + + if evt.Meta == nil { + return false + } + + labels := evt.Meta.GetLabels() + return checkLabel(labels) +} + +func (s *statefulSetPredicate) Delete(evt event.DeleteEvent) bool { + + if evt.Meta == nil { + return false + } + + labels := evt.Meta.GetLabels() + return checkLabel(labels) +} + +func (s *statefulSetPredicate) Update(evt event.UpdateEvent) bool { + + if evt.MetaNew == nil { + return false + } + + labels := evt.MetaNew.GetLabels() + return checkLabel(labels) +} + +func (s *statefulSetPredicate) Generic(evt event.GenericEvent) bool { + + labels := evt.Meta.GetLabels() + return checkLabel(labels) +} |