aboutsummaryrefslogtreecommitdiffstats
path: root/src/monitor/pkg/controller/resourcebundlestate
diff options
context:
space:
mode:
authorKiran Kamineni <kiran.k.kamineni@intel.com>2019-07-31 15:32:28 -0700
committerKiran Kamineni <kiran.k.kamineni@intel.com>2019-08-28 12:45:14 -0700
commitab8c95eff5c1228237f758d3ccfc99c751f713f7 (patch)
treea950ac5356f547434f24c8cc3bf53f3620e84a74 /src/monitor/pkg/controller/resourcebundlestate
parentdcace0784979890bb986fb078348b4b3ceef146c (diff)
Adding monitor operator to monitor edge resources
Add an operator to monitor resources at the edge location. The operator listens to pods and services right now and stores their information in a CustomResource Issue-ID: MULTICLOUD-675 Change-Id: I801478a77fcd019010ea1b4388d6077f63b89d05 Signed-off-by: Kiran Kamineni <kiran.k.kamineni@intel.com>
Diffstat (limited to 'src/monitor/pkg/controller/resourcebundlestate')
-rw-r--r--src/monitor/pkg/controller/resourcebundlestate/controller.go126
-rw-r--r--src/monitor/pkg/controller/resourcebundlestate/handler.go28
-rw-r--r--src/monitor/pkg/controller/resourcebundlestate/helpers.go54
-rw-r--r--src/monitor/pkg/controller/resourcebundlestate/pod_controller.go183
-rw-r--r--src/monitor/pkg/controller/resourcebundlestate/pod_predicate.go44
5 files changed, 435 insertions, 0 deletions
diff --git a/src/monitor/pkg/controller/resourcebundlestate/controller.go b/src/monitor/pkg/controller/resourcebundlestate/controller.go
new file mode 100644
index 00000000..debd5f65
--- /dev/null
+++ b/src/monitor/pkg/controller/resourcebundlestate/controller.go
@@ -0,0 +1,126 @@
+package resourcebundlestate
+
+import (
+ "context"
+ "log"
+
+ "monitor/pkg/apis/k8splugin/v1alpha1"
+
+ corev1 "k8s.io/api/core/v1"
+ k8serrors "k8s.io/apimachinery/pkg/api/errors"
+ "sigs.k8s.io/controller-runtime/pkg/client"
+ "sigs.k8s.io/controller-runtime/pkg/controller"
+ "sigs.k8s.io/controller-runtime/pkg/manager"
+ "sigs.k8s.io/controller-runtime/pkg/reconcile"
+ "sigs.k8s.io/controller-runtime/pkg/source"
+)
+
+// Add the new controller to the controller manager
+func Add(mgr manager.Manager) error {
+ return add(mgr, newReconciler(mgr))
+}
+
+func add(mgr manager.Manager, r *reconciler) error {
+ // Create a new controller
+ c, err := controller.New("ResourceBundleState-controller", mgr, controller.Options{Reconciler: r})
+ if err != nil {
+ return err
+ }
+
+ // Watch for changes to primary resource ResourceBundleState
+ err = c.Watch(&source.Kind{Type: &v1alpha1.ResourceBundleState{}}, &EventHandler{})
+ if err != nil {
+ return err
+ }
+
+ return nil
+}
+
+func newReconciler(m manager.Manager) *reconciler {
+ return &reconciler{client: m.GetClient()}
+}
+
+type reconciler struct {
+ // Stores an array of all the ResourceBundleState
+ crList []v1alpha1.ResourceBundleState
+ client client.Client
+}
+
+// Reconcile implements the loop that will manage the ResourceBundleState CR
+// We only accept CREATE events here and any subsequent changes are ignored.
+func (r *reconciler) Reconcile(req reconcile.Request) (reconcile.Result, error) {
+ log.Printf("Reconciling ResourceBundleState %+v\n", req)
+
+ rbstate := &v1alpha1.ResourceBundleState{}
+ err := r.client.Get(context.TODO(), req.NamespacedName, rbstate)
+ if err != nil {
+ if k8serrors.IsNotFound(err) {
+ log.Printf("Object not found: %+v. Ignore as it must have been deleted.\n", req.NamespacedName)
+ return reconcile.Result{}, nil
+ }
+ log.Printf("Failed to get object: %+v\n", req.NamespacedName)
+ return reconcile.Result{}, err
+ }
+
+ err = r.updatePods(rbstate, rbstate.Spec.Selector.MatchLabels)
+ if err != nil {
+ log.Printf("Error adding podstatuses: %v\n", err)
+ return reconcile.Result{}, err
+ }
+
+ err = r.updateServices(rbstate, rbstate.Spec.Selector.MatchLabels)
+ if err != nil {
+ log.Printf("Error adding services: %v\n", err)
+ return reconcile.Result{}, err
+ }
+
+ // TODO: Update this based on the statuses of the lower resources
+ rbstate.Status.Ready = false
+ err = r.client.Status().Update(context.TODO(), rbstate)
+ if err != nil {
+ log.Printf("failed to update rbstate: %v\n", err)
+ return reconcile.Result{}, err
+ }
+
+ return reconcile.Result{}, nil
+}
+
+func (r *reconciler) updateServices(rbstate *v1alpha1.ResourceBundleState,
+ selectors map[string]string) error {
+
+ // Update the CR with the Services created as well
+ serviceList := &corev1.ServiceList{}
+ err := listResources(r.client, rbstate.Namespace, selectors, serviceList)
+ if err != nil {
+ log.Printf("Failed to list services: %v", err)
+ return err
+ }
+
+ rbstate.Status.ServiceStatuses = serviceList.Items
+ return nil
+}
+
+func (r *reconciler) updatePods(rbstate *v1alpha1.ResourceBundleState,
+ selectors map[string]string) error {
+
+ // Update the CR with the pods tracked
+ podList := &corev1.PodList{}
+ err := listResources(r.client, rbstate.Namespace, selectors, podList)
+ if err != nil {
+ log.Printf("Failed to list pods: %v", err)
+ return err
+ }
+
+ rbstate.Status.PodStatuses = []v1alpha1.PodStatus{}
+
+ for _, pod := range podList.Items {
+ resStatus := v1alpha1.PodStatus{
+ ObjectMeta: pod.ObjectMeta,
+ Ready: false,
+ Status: pod.Status,
+ }
+ rbstate.Status.PodStatuses = append(rbstate.Status.PodStatuses, resStatus)
+ }
+
+ return nil
+}
diff --git a/src/monitor/pkg/controller/resourcebundlestate/handler.go b/src/monitor/pkg/controller/resourcebundlestate/handler.go
new file mode 100644
index 00000000..b84af69c
--- /dev/null
+++ b/src/monitor/pkg/controller/resourcebundlestate/handler.go
@@ -0,0 +1,28 @@
+package resourcebundlestate
+
+import (
+ "k8s.io/client-go/util/workqueue"
+ "sigs.k8s.io/controller-runtime/pkg/event"
+ "sigs.k8s.io/controller-runtime/pkg/handler"
+)
+
+// EventHandler adds some specific handling for certain types of events
+// related to the ResourceBundleState CR.
+type EventHandler struct {
+ handler.EnqueueRequestForObject
+}
+
+// Delete ignores any delete operations on a ResourceBundleState CR
+func (p *EventHandler) Delete(evt event.DeleteEvent, q workqueue.RateLimitingInterface) {
+ return
+}
+
+// Update ignores any update operations on a ResourceBundleState CR
+func (p *EventHandler) Update(evt event.UpdateEvent, q workqueue.RateLimitingInterface) {
+ return
+}
+
+// Generic ignores any generic operations on a ResourceBundleState CR
+func (p *EventHandler) Generic(evt event.GenericEvent, q workqueue.RateLimitingInterface) {
+ return
+}
diff --git a/src/monitor/pkg/controller/resourcebundlestate/helpers.go b/src/monitor/pkg/controller/resourcebundlestate/helpers.go
new file mode 100644
index 00000000..dab78825
--- /dev/null
+++ b/src/monitor/pkg/controller/resourcebundlestate/helpers.go
@@ -0,0 +1,54 @@
+package resourcebundlestate
+
+import (
+ "context"
+ "log"
+
+ "k8s.io/apimachinery/pkg/labels"
+ "k8s.io/apimachinery/pkg/runtime"
+ "sigs.k8s.io/controller-runtime/pkg/client"
+)
+
+// checkLabel verifies if the expected label exists and returns bool
+func checkLabel(labels map[string]string) bool {
+
+ _, ok := labels["k8splugin.io/rb-inst-id"]
+ if !ok {
+ log.Printf("Pod does not have label. Filter it.")
+ return false
+ }
+ return true
+}
+
+// returnLabel verifies if the expected label exists and returns a map
+func returnLabel(labels map[string]string) map[string]string {
+
+ l, ok := labels["k8splugin.io/rb-inst-id"]
+ if !ok {
+ log.Printf("Pod does not have label. Filter it.")
+ return nil
+ }
+ return map[string]string{
+ "k8splugin.io/rb-inst-id": l,
+ }
+}
+
+// listResources lists resources based on the selectors provided
+// The data is returned in the pointer to the runtime.Object
+// provided as argument.
+func listResources(cli client.Client, namespace string,
+ labelSelector map[string]string, returnData runtime.Object) error {
+
+ listOptions := &client.ListOptions{
+ Namespace: namespace,
+ LabelSelector: labels.SelectorFromSet(labelSelector),
+ }
+
+ err := cli.List(context.TODO(), listOptions, returnData)
+ if err != nil {
+ log.Printf("Failed to list CRs: %v", err)
+ return err
+ }
+
+ return nil
+}
diff --git a/src/monitor/pkg/controller/resourcebundlestate/pod_controller.go b/src/monitor/pkg/controller/resourcebundlestate/pod_controller.go
new file mode 100644
index 00000000..e0742a13
--- /dev/null
+++ b/src/monitor/pkg/controller/resourcebundlestate/pod_controller.go
@@ -0,0 +1,183 @@
+package resourcebundlestate
+
+import (
+ "context"
+ "log"
+
+ "monitor/pkg/apis/k8splugin/v1alpha1"
+
+ corev1 "k8s.io/api/core/v1"
+ k8serrors "k8s.io/apimachinery/pkg/api/errors"
+ "k8s.io/apimachinery/pkg/types"
+ "sigs.k8s.io/controller-runtime/pkg/client"
+ "sigs.k8s.io/controller-runtime/pkg/controller"
+ "sigs.k8s.io/controller-runtime/pkg/handler"
+ "sigs.k8s.io/controller-runtime/pkg/manager"
+ "sigs.k8s.io/controller-runtime/pkg/reconcile"
+ "sigs.k8s.io/controller-runtime/pkg/source"
+)
+
+// AddPodController the new controller to the controller manager
+func AddPodController(mgr manager.Manager) error {
+ return addPodController(mgr, newPodReconciler(mgr))
+}
+
+func addPodController(mgr manager.Manager, r *podReconciler) error {
+ // Create a new controller
+ c, err := controller.New("ResourceBundleState-controller", mgr, controller.Options{Reconciler: r})
+ if err != nil {
+ return err
+ }
+
+ // Watch for changes to secondar resource Pods
+ // Predicate filters pods which don't have the k8splugin label
+ err = c.Watch(&source.Kind{Type: &corev1.Pod{}}, &handler.EnqueueRequestForObject{}, &podPredicate{})
+ if err != nil {
+ return err
+ }
+
+ return nil
+}
+
+func newPodReconciler(m manager.Manager) *podReconciler {
+ return &podReconciler{client: m.GetClient()}
+}
+
+type podReconciler struct {
+ client client.Client
+}
+
+// Reconcile implements the loop that will update the ResourceBundleState CR
+// whenever we get any updates from all the pods we watch.
+func (r *podReconciler) Reconcile(req reconcile.Request) (reconcile.Result, error) {
+ log.Printf("Updating ResourceBundleState for Pod: %+v\n", req)
+
+ pod := &corev1.Pod{}
+ err := r.client.Get(context.TODO(), req.NamespacedName, pod)
+ if err != nil {
+ if k8serrors.IsNotFound(err) {
+ log.Printf("Pod not found: %+v. Remove from CR if it is stored there.\n", req.NamespacedName)
+ // Remove the Pod's status from StatusList
+ // This can happen if we get the DeletionTimeStamp event
+ // after the POD has been deleted.
+ r.deletePodFromAllCRs(req.NamespacedName)
+ return reconcile.Result{}, nil
+ }
+ log.Printf("Failed to get pod: %+v\n", req.NamespacedName)
+ return reconcile.Result{}, err
+ }
+
+ // Find the CRs which track this pod via the labelselector
+ crSelector := returnLabel(pod.GetLabels())
+ if crSelector == nil {
+ log.Println("We should not be here. The predicate should have filtered this Pod")
+ }
+
+ // Get the CRs which have this label and update them all
+ // Ideally, we will have only one CR, but there is nothing
+ // preventing the creation of multiple.
+ // TODO: Consider using an admission validating webook to prevent multiple
+ rbStatusList := &v1alpha1.ResourceBundleStateList{}
+ err = listResources(r.client, req.Namespace, crSelector, rbStatusList)
+ if err != nil || len(rbStatusList.Items) == 0 {
+ log.Printf("Did not find any CRs tracking this resource\n")
+ return reconcile.Result{}, nil
+ }
+
+ err = r.updateCRs(rbStatusList, pod)
+ if err != nil {
+ // Requeue the update
+ return reconcile.Result{}, err
+ }
+
+ return reconcile.Result{}, nil
+}
+
+// deletePodFromAllCRs deletes pod status from all the CRs when the POD itself has been deleted
+// and we have not handled the updateCRs yet.
+// Since, we don't have the pod's labels, we need to look at all the CRs in this namespace
+func (r *podReconciler) deletePodFromAllCRs(namespacedName types.NamespacedName) error {
+
+ rbStatusList := &v1alpha1.ResourceBundleStateList{}
+ err := listResources(r.client, namespacedName.Namespace, nil, rbStatusList)
+ if err != nil || len(rbStatusList.Items) == 0 {
+ log.Printf("Did not find any CRs tracking this resource\n")
+ return nil
+ }
+ for _, cr := range rbStatusList.Items {
+ r.deleteFromSingleCR(&cr, namespacedName.Name)
+ }
+
+ return nil
+}
+
+func (r *podReconciler) updateCRs(crl *v1alpha1.ResourceBundleStateList, pod *corev1.Pod) error {
+
+ for _, cr := range crl.Items {
+ // Pod is not scheduled for deletion
+ if pod.DeletionTimestamp == nil {
+ err := r.updateSingleCR(&cr, pod)
+ if err != nil {
+ return err
+ }
+ } else {
+ // Pod is scheduled for deletion
+ r.deleteFromSingleCR(&cr, pod.Name)
+ }
+ }
+
+ return nil
+}
+
+func (r *podReconciler) deleteFromSingleCR(cr *v1alpha1.ResourceBundleState, name string) error {
+ cr.Status.ResourceCount--
+ length := len(cr.Status.PodStatuses)
+ for i, rstatus := range cr.Status.PodStatuses {
+ if rstatus.Name == name {
+ //Delete that status from the array
+ cr.Status.PodStatuses[i] = cr.Status.PodStatuses[length-1]
+ cr.Status.PodStatuses[length-1] = v1alpha1.PodStatus{}
+ cr.Status.PodStatuses = cr.Status.PodStatuses[:length-1]
+ return nil
+ }
+ }
+
+ log.Println("Did not find a status for POD in CR")
+ return nil
+}
+
+func (r *podReconciler) updateSingleCR(cr *v1alpha1.ResourceBundleState, pod *corev1.Pod) error {
+
+ // Update status after searching for it in the list of resourceStatuses
+ for i, rstatus := range cr.Status.PodStatuses {
+ // Look for the status if we already have it in the CR
+ if rstatus.Name == pod.Name {
+ pod.Status.DeepCopyInto(&cr.Status.PodStatuses[i].Status)
+ err := r.client.Status().Update(context.TODO(), cr)
+ if err != nil {
+ log.Printf("failed to update rbstate: %v\n", err)
+ return err
+ }
+ return nil
+ }
+ }
+
+ // Exited for loop with no status found
+ // Increment the number of tracked resources
+ cr.Status.ResourceCount++
+
+ // Add it to CR
+ cr.Status.PodStatuses = append(cr.Status.PodStatuses, v1alpha1.PodStatus{
+ ObjectMeta: pod.ObjectMeta,
+ Ready: false,
+ Status: pod.Status,
+ })
+
+ err := r.client.Status().Update(context.TODO(), cr)
+ if err != nil {
+ log.Printf("failed to update rbstate: %v\n", err)
+ return err
+ }
+
+ return nil
+}
diff --git a/src/monitor/pkg/controller/resourcebundlestate/pod_predicate.go b/src/monitor/pkg/controller/resourcebundlestate/pod_predicate.go
new file mode 100644
index 00000000..f1c1960c
--- /dev/null
+++ b/src/monitor/pkg/controller/resourcebundlestate/pod_predicate.go
@@ -0,0 +1,44 @@
+package resourcebundlestate
+
+import (
+ "sigs.k8s.io/controller-runtime/pkg/event"
+)
+
+type podPredicate struct {
+}
+
+func (p *podPredicate) Create(evt event.CreateEvent) bool {
+
+ if evt.Meta == nil {
+ return false
+ }
+
+ labels := evt.Meta.GetLabels()
+ return checkLabel(labels)
+}
+
+func (p *podPredicate) Delete(evt event.DeleteEvent) bool {
+
+ if evt.Meta == nil {
+ return false
+ }
+
+ labels := evt.Meta.GetLabels()
+ return checkLabel(labels)
+}
+
+func (p *podPredicate) Update(evt event.UpdateEvent) bool {
+
+ if evt.MetaNew == nil {
+ return false
+ }
+
+ labels := evt.MetaNew.GetLabels()
+ return checkLabel(labels)
+}
+
+func (p *podPredicate) Generic(evt event.GenericEvent) bool {
+
+ labels := evt.Meta.GetLabels()
+ return checkLabel(labels)
+}