aboutsummaryrefslogtreecommitdiffstats
path: root/src/monitor/pkg
diff options
context:
space:
mode:
authorKiran Kamineni <kiran.k.kamineni@intel.com>2019-07-31 15:32:28 -0700
committerKiran Kamineni <kiran.k.kamineni@intel.com>2019-08-28 12:45:14 -0700
commitab8c95eff5c1228237f758d3ccfc99c751f713f7 (patch)
treea950ac5356f547434f24c8cc3bf53f3620e84a74 /src/monitor/pkg
parentdcace0784979890bb986fb078348b4b3ceef146c (diff)
Adding monitor operator to monitor edge resources
Add an operator to monitor resources at the edge location. The operator listens to pods and services right now and stores their information in a CustomResource Issue-ID: MULTICLOUD-675 Change-Id: I801478a77fcd019010ea1b4388d6077f63b89d05 Signed-off-by: Kiran Kamineni <kiran.k.kamineni@intel.com>
Diffstat (limited to 'src/monitor/pkg')
-rw-r--r--src/monitor/pkg/apis/addtoscheme_k8splugin_v1alpha1.go10
-rw-r--r--src/monitor/pkg/apis/apis.go13
-rw-r--r--src/monitor/pkg/apis/k8splugin/group.go6
-rw-r--r--src/monitor/pkg/apis/k8splugin/v1alpha1/doc.go4
-rw-r--r--src/monitor/pkg/apis/k8splugin/v1alpha1/register.go19
-rw-r--r--src/monitor/pkg/apis/k8splugin/v1alpha1/types.go55
-rw-r--r--src/monitor/pkg/apis/k8splugin/v1alpha1/zz_generated.deepcopy.go141
-rw-r--r--src/monitor/pkg/apis/k8splugin/v1alpha1/zz_generated.openapi.go164
-rw-r--r--src/monitor/pkg/controller/add_resourcebundlestate.go10
-rw-r--r--src/monitor/pkg/controller/controller.go18
-rw-r--r--src/monitor/pkg/controller/resourcebundlestate/controller.go126
-rw-r--r--src/monitor/pkg/controller/resourcebundlestate/handler.go28
-rw-r--r--src/monitor/pkg/controller/resourcebundlestate/helpers.go54
-rw-r--r--src/monitor/pkg/controller/resourcebundlestate/pod_controller.go183
-rw-r--r--src/monitor/pkg/controller/resourcebundlestate/pod_predicate.go44
15 files changed, 875 insertions, 0 deletions
diff --git a/src/monitor/pkg/apis/addtoscheme_k8splugin_v1alpha1.go b/src/monitor/pkg/apis/addtoscheme_k8splugin_v1alpha1.go
new file mode 100644
index 00000000..3c8f595d
--- /dev/null
+++ b/src/monitor/pkg/apis/addtoscheme_k8splugin_v1alpha1.go
@@ -0,0 +1,10 @@
+package apis
+
+import (
+ "monitor/pkg/apis/k8splugin/v1alpha1"
+)
+
+func init() {
+ // Register the types with the Scheme so the components can map objects to GroupVersionKinds and back
+ AddToSchemes = append(AddToSchemes, v1alpha1.SchemeBuilder.AddToScheme)
+}
diff --git a/src/monitor/pkg/apis/apis.go b/src/monitor/pkg/apis/apis.go
new file mode 100644
index 00000000..07dc9616
--- /dev/null
+++ b/src/monitor/pkg/apis/apis.go
@@ -0,0 +1,13 @@
+package apis
+
+import (
+ "k8s.io/apimachinery/pkg/runtime"
+)
+
+// AddToSchemes may be used to add all resources defined in the project to a Scheme
+var AddToSchemes runtime.SchemeBuilder
+
+// AddToScheme adds all Resources to the Scheme
+func AddToScheme(s *runtime.Scheme) error {
+ return AddToSchemes.AddToScheme(s)
+}
diff --git a/src/monitor/pkg/apis/k8splugin/group.go b/src/monitor/pkg/apis/k8splugin/group.go
new file mode 100644
index 00000000..9696c626
--- /dev/null
+++ b/src/monitor/pkg/apis/k8splugin/group.go
@@ -0,0 +1,6 @@
+// Package k8splugin contains k8splugin API versions.
+//
+// This file ensures Go source parsers acknowledge the k8splugin package
+// and any child packages. It can be removed if any other Go source files are
+// added to this package.
+package k8splugin
diff --git a/src/monitor/pkg/apis/k8splugin/v1alpha1/doc.go b/src/monitor/pkg/apis/k8splugin/v1alpha1/doc.go
new file mode 100644
index 00000000..c2bb0907
--- /dev/null
+++ b/src/monitor/pkg/apis/k8splugin/v1alpha1/doc.go
@@ -0,0 +1,4 @@
+// Package v1alpha1 contains API Schema definitions for the k8splugin v1alpha1 API group
+// +k8s:deepcopy-gen=package,register
+// +groupName=k8splugin.io
+package v1alpha1
diff --git a/src/monitor/pkg/apis/k8splugin/v1alpha1/register.go b/src/monitor/pkg/apis/k8splugin/v1alpha1/register.go
new file mode 100644
index 00000000..ee2af820
--- /dev/null
+++ b/src/monitor/pkg/apis/k8splugin/v1alpha1/register.go
@@ -0,0 +1,19 @@
+// NOTE: Boilerplate only. Ignore this file.
+
+// Package v1alpha1 contains API Schema definitions for the k8splugin v1alpha1 API group
+// +k8s:deepcopy-gen=package,register
+// +groupName=k8splugin.io
+package v1alpha1
+
+import (
+ "k8s.io/apimachinery/pkg/runtime/schema"
+ "sigs.k8s.io/controller-runtime/pkg/runtime/scheme"
+)
+
+var (
+ // SchemeGroupVersion is group version used to register these objects
+ SchemeGroupVersion = schema.GroupVersion{Group: "k8splugin.io", Version: "v1alpha1"}
+
+ // SchemeBuilder is used to add go types to the GroupVersionKind scheme
+ SchemeBuilder = &scheme.Builder{GroupVersion: SchemeGroupVersion}
+)
diff --git a/src/monitor/pkg/apis/k8splugin/v1alpha1/types.go b/src/monitor/pkg/apis/k8splugin/v1alpha1/types.go
new file mode 100644
index 00000000..22dfdd25
--- /dev/null
+++ b/src/monitor/pkg/apis/k8splugin/v1alpha1/types.go
@@ -0,0 +1,55 @@
+package v1alpha1
+
+import (
+ corev1 "k8s.io/api/core/v1"
+ metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
+)
+
+// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object
+
+// ResourceBundleState is the Schema for the ResourceBundleStatees API
+// +k8s:openapi-gen=true
+// +kubebuilder:subresource:status
+type ResourceBundleState struct {
+ metav1.TypeMeta `json:",inline"`
+ metav1.ObjectMeta `json:"metadata,omitempty" protobuf:"bytes,1,opt,name=metadata"`
+
+ Spec ResourceBundleStateSpec `json:"spec,omitempty" protobuf:"bytes,2,opt,name=spec"`
+ Status ResourceBundleStatus `json:"status,omitempty" protobuf:"bytes,3,opt,name=status"`
+}
+
+// ResourceBundleStateSpec defines the desired state of ResourceBundleState
+// +k8s:openapi-gen=true
+type ResourceBundleStateSpec struct {
+ Selector *metav1.LabelSelector `json:"selector" protobuf:"bytes,1,opt,name=selector"`
+}
+
+// ResourceBundleStatus defines the observed state of ResourceBundleState
+// +k8s:openapi-gen=true
+type ResourceBundleStatus struct {
+ Ready bool `json:"ready" protobuf:"varint,1,opt,name=ready"`
+ ResourceCount int32 `json:"resourceCount" protobuf:"varint,2,opt,name=resourceCount"`
+ PodStatuses []PodStatus `json:"podStatuses" protobuf:"varint,3,opt,name=podStatuses"`
+ ServiceStatuses []corev1.Service `json:"serviceStatuses" protobuf:"varint,4,opt,name=serviceStatuses"`
+}
+
+// PodStatus defines the observed state of ResourceBundleState
+// +k8s:openapi-gen=true
+type PodStatus struct {
+ metav1.ObjectMeta `json:"metadata,omitempty" protobuf:"bytes,1,opt,name=metadata"`
+ Ready bool `json:"ready" protobuf:"varint,2,opt,name=ready"`
+ Status corev1.PodStatus `json:"status,omitempty" protobuf:"bytes,3,opt,name=status"`
+}
+
+// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object
+
+// ResourceBundleStateList contains a list of ResourceBundleState
+type ResourceBundleStateList struct {
+ metav1.TypeMeta `json:",inline"`
+ metav1.ListMeta `json:"metadata,omitempty"`
+ Items []ResourceBundleState `json:"items"`
+}
+
+func init() {
+ SchemeBuilder.Register(&ResourceBundleState{}, &ResourceBundleStateList{})
+}
diff --git a/src/monitor/pkg/apis/k8splugin/v1alpha1/zz_generated.deepcopy.go b/src/monitor/pkg/apis/k8splugin/v1alpha1/zz_generated.deepcopy.go
new file mode 100644
index 00000000..72036ed1
--- /dev/null
+++ b/src/monitor/pkg/apis/k8splugin/v1alpha1/zz_generated.deepcopy.go
@@ -0,0 +1,141 @@
+// +build !ignore_autogenerated
+
+// Code generated by operator-sdk-v0.9.0-x86_64-linux-gnu. DO NOT EDIT.
+
+package v1alpha1
+
+import (
+ corev1 "k8s.io/api/core/v1"
+ v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
+ runtime "k8s.io/apimachinery/pkg/runtime"
+)
+
+// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
+func (in *PodStatus) DeepCopyInto(out *PodStatus) {
+ *out = *in
+ in.ObjectMeta.DeepCopyInto(&out.ObjectMeta)
+ in.Status.DeepCopyInto(&out.Status)
+ return
+}
+
+// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new PodStatus.
+func (in *PodStatus) DeepCopy() *PodStatus {
+ if in == nil {
+ return nil
+ }
+ out := new(PodStatus)
+ in.DeepCopyInto(out)
+ return out
+}
+
+// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
+func (in *ResourceBundleState) DeepCopyInto(out *ResourceBundleState) {
+ *out = *in
+ out.TypeMeta = in.TypeMeta
+ in.ObjectMeta.DeepCopyInto(&out.ObjectMeta)
+ in.Spec.DeepCopyInto(&out.Spec)
+ in.Status.DeepCopyInto(&out.Status)
+ return
+}
+
+// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new ResourceBundleState.
+func (in *ResourceBundleState) DeepCopy() *ResourceBundleState {
+ if in == nil {
+ return nil
+ }
+ out := new(ResourceBundleState)
+ in.DeepCopyInto(out)
+ return out
+}
+
+// DeepCopyObject is an autogenerated deepcopy function, copying the receiver, creating a new runtime.Object.
+func (in *ResourceBundleState) DeepCopyObject() runtime.Object {
+ if c := in.DeepCopy(); c != nil {
+ return c
+ }
+ return nil
+}
+
+// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
+func (in *ResourceBundleStateList) DeepCopyInto(out *ResourceBundleStateList) {
+ *out = *in
+ out.TypeMeta = in.TypeMeta
+ out.ListMeta = in.ListMeta
+ if in.Items != nil {
+ in, out := &in.Items, &out.Items
+ *out = make([]ResourceBundleState, len(*in))
+ for i := range *in {
+ (*in)[i].DeepCopyInto(&(*out)[i])
+ }
+ }
+ return
+}
+
+// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new ResourceBundleStateList.
+func (in *ResourceBundleStateList) DeepCopy() *ResourceBundleStateList {
+ if in == nil {
+ return nil
+ }
+ out := new(ResourceBundleStateList)
+ in.DeepCopyInto(out)
+ return out
+}
+
+// DeepCopyObject is an autogenerated deepcopy function, copying the receiver, creating a new runtime.Object.
+func (in *ResourceBundleStateList) DeepCopyObject() runtime.Object {
+ if c := in.DeepCopy(); c != nil {
+ return c
+ }
+ return nil
+}
+
+// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
+func (in *ResourceBundleStateSpec) DeepCopyInto(out *ResourceBundleStateSpec) {
+ *out = *in
+ if in.Selector != nil {
+ in, out := &in.Selector, &out.Selector
+ *out = new(v1.LabelSelector)
+ (*in).DeepCopyInto(*out)
+ }
+ return
+}
+
+// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new ResourceBundleStateSpec.
+func (in *ResourceBundleStateSpec) DeepCopy() *ResourceBundleStateSpec {
+ if in == nil {
+ return nil
+ }
+ out := new(ResourceBundleStateSpec)
+ in.DeepCopyInto(out)
+ return out
+}
+
+// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
+func (in *ResourceBundleStatus) DeepCopyInto(out *ResourceBundleStatus) {
+ *out = *in
+ if in.PodStatuses != nil {
+ in, out := &in.PodStatuses, &out.PodStatuses
+ *out = make([]PodStatus, len(*in))
+ for i := range *in {
+ (*in)[i].DeepCopyInto(&(*out)[i])
+ }
+ }
+ if in.ServiceStatuses != nil {
+ in, out := &in.ServiceStatuses, &out.ServiceStatuses
+ *out = make([]corev1.Service, len(*in))
+ for i := range *in {
+ (*in)[i].DeepCopyInto(&(*out)[i])
+ }
+ }
+ return
+}
+
+// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new ResourceBundleStatus.
+func (in *ResourceBundleStatus) DeepCopy() *ResourceBundleStatus {
+ if in == nil {
+ return nil
+ }
+ out := new(ResourceBundleStatus)
+ in.DeepCopyInto(out)
+ return out
+}
diff --git a/src/monitor/pkg/apis/k8splugin/v1alpha1/zz_generated.openapi.go b/src/monitor/pkg/apis/k8splugin/v1alpha1/zz_generated.openapi.go
new file mode 100644
index 00000000..232acefb
--- /dev/null
+++ b/src/monitor/pkg/apis/k8splugin/v1alpha1/zz_generated.openapi.go
@@ -0,0 +1,164 @@
+// +build !
+
+// This file was autogenerated by openapi-gen. Do not edit it manually!
+
+package v1alpha1
+
+import (
+ spec "github.com/go-openapi/spec"
+ common "k8s.io/kube-openapi/pkg/common"
+)
+
+func GetOpenAPIDefinitions(ref common.ReferenceCallback) map[string]common.OpenAPIDefinition {
+ return map[string]common.OpenAPIDefinition{
+ "monitor/pkg/apis/k8splugin/v1alpha1.PodStatus": schema_pkg_apis_k8splugin_v1alpha1_PodStatus(ref),
+ "monitor/pkg/apis/k8splugin/v1alpha1.ResourceBundleState": schema_pkg_apis_k8splugin_v1alpha1_ResourceBundleState(ref),
+ "monitor/pkg/apis/k8splugin/v1alpha1.ResourceBundleStateSpec": schema_pkg_apis_k8splugin_v1alpha1_ResourceBundleStateSpec(ref),
+ "monitor/pkg/apis/k8splugin/v1alpha1.ResourceBundleStatus": schema_pkg_apis_k8splugin_v1alpha1_ResourceBundleStatus(ref),
+ }
+}
+
+func schema_pkg_apis_k8splugin_v1alpha1_PodStatus(ref common.ReferenceCallback) common.OpenAPIDefinition {
+ return common.OpenAPIDefinition{
+ Schema: spec.Schema{
+ SchemaProps: spec.SchemaProps{
+ Description: "PodStatus defines the observed state of ResourceBundleState",
+ Properties: map[string]spec.Schema{
+ "metadata": {
+ SchemaProps: spec.SchemaProps{
+ Ref: ref("k8s.io/apimachinery/pkg/apis/meta/v1.ObjectMeta"),
+ },
+ },
+ "ready": {
+ SchemaProps: spec.SchemaProps{
+ Type: []string{"boolean"},
+ Format: "",
+ },
+ },
+ "status": {
+ SchemaProps: spec.SchemaProps{
+ Ref: ref("k8s.io/api/core/v1.PodStatus"),
+ },
+ },
+ },
+ Required: []string{"ready"},
+ },
+ },
+ Dependencies: []string{
+ "k8s.io/api/core/v1.PodStatus", "k8s.io/apimachinery/pkg/apis/meta/v1.ObjectMeta"},
+ }
+}
+
+func schema_pkg_apis_k8splugin_v1alpha1_ResourceBundleState(ref common.ReferenceCallback) common.OpenAPIDefinition {
+ return common.OpenAPIDefinition{
+ Schema: spec.Schema{
+ SchemaProps: spec.SchemaProps{
+ Description: "ResourceBundleState is the Schema for the ResourceBundleStatees API",
+ Properties: map[string]spec.Schema{
+ "kind": {
+ SchemaProps: spec.SchemaProps{
+ Description: "Kind is a string value representing the REST resource this object represents. Servers may infer this from the endpoint the client submits requests to. Cannot be updated. In CamelCase. More info: https://git.k8s.io/community/contributors/devel/api-conventions.md#types-kinds",
+ Type: []string{"string"},
+ Format: "",
+ },
+ },
+ "apiVersion": {
+ SchemaProps: spec.SchemaProps{
+ Description: "APIVersion defines the versioned schema of this representation of an object. Servers should convert recognized schemas to the latest internal value, and may reject unrecognized values. More info: https://git.k8s.io/community/contributors/devel/api-conventions.md#resources",
+ Type: []string{"string"},
+ Format: "",
+ },
+ },
+ "metadata": {
+ SchemaProps: spec.SchemaProps{
+ Ref: ref("k8s.io/apimachinery/pkg/apis/meta/v1.ObjectMeta"),
+ },
+ },
+ "spec": {
+ SchemaProps: spec.SchemaProps{
+ Ref: ref("monitor/pkg/apis/k8splugin/v1alpha1.ResourceBundleStateSpec"),
+ },
+ },
+ "status": {
+ SchemaProps: spec.SchemaProps{
+ Ref: ref("monitor/pkg/apis/k8splugin/v1alpha1.ResourceBundleStatus"),
+ },
+ },
+ },
+ },
+ },
+ Dependencies: []string{
+ "k8s.io/apimachinery/pkg/apis/meta/v1.ObjectMeta", "monitor/pkg/apis/k8splugin/v1alpha1.ResourceBundleStateSpec", "monitor/pkg/apis/k8splugin/v1alpha1.ResourceBundleStatus"},
+ }
+}
+
+func schema_pkg_apis_k8splugin_v1alpha1_ResourceBundleStateSpec(ref common.ReferenceCallback) common.OpenAPIDefinition {
+ return common.OpenAPIDefinition{
+ Schema: spec.Schema{
+ SchemaProps: spec.SchemaProps{
+ Description: "ResourceBundleStateSpec defines the desired state of ResourceBundleState",
+ Properties: map[string]spec.Schema{
+ "selector": {
+ SchemaProps: spec.SchemaProps{
+ Ref: ref("k8s.io/apimachinery/pkg/apis/meta/v1.LabelSelector"),
+ },
+ },
+ },
+ Required: []string{"selector"},
+ },
+ },
+ Dependencies: []string{
+ "k8s.io/apimachinery/pkg/apis/meta/v1.LabelSelector"},
+ }
+}
+
+func schema_pkg_apis_k8splugin_v1alpha1_ResourceBundleStatus(ref common.ReferenceCallback) common.OpenAPIDefinition {
+ return common.OpenAPIDefinition{
+ Schema: spec.Schema{
+ SchemaProps: spec.SchemaProps{
+ Description: "ResourceBundleStatus defines the observed state of ResourceBundleState",
+ Properties: map[string]spec.Schema{
+ "ready": {
+ SchemaProps: spec.SchemaProps{
+ Type: []string{"boolean"},
+ Format: "",
+ },
+ },
+ "resourceCount": {
+ SchemaProps: spec.SchemaProps{
+ Type: []string{"integer"},
+ Format: "int32",
+ },
+ },
+ "podStatuses": {
+ SchemaProps: spec.SchemaProps{
+ Type: []string{"array"},
+ Items: &spec.SchemaOrArray{
+ Schema: &spec.Schema{
+ SchemaProps: spec.SchemaProps{
+ Ref: ref("monitor/pkg/apis/k8splugin/v1alpha1.PodStatus"),
+ },
+ },
+ },
+ },
+ },
+ "serviceStatuses": {
+ SchemaProps: spec.SchemaProps{
+ Type: []string{"array"},
+ Items: &spec.SchemaOrArray{
+ Schema: &spec.Schema{
+ SchemaProps: spec.SchemaProps{
+ Ref: ref("k8s.io/api/core/v1.Service"),
+ },
+ },
+ },
+ },
+ },
+ },
+ Required: []string{"ready", "resourceCount", "podStatuses", "serviceStatuses"},
+ },
+ },
+ Dependencies: []string{
+ "k8s.io/api/core/v1.Service", "monitor/pkg/apis/k8splugin/v1alpha1.PodStatus"},
+ }
+}
diff --git a/src/monitor/pkg/controller/add_resourcebundlestate.go b/src/monitor/pkg/controller/add_resourcebundlestate.go
new file mode 100644
index 00000000..c709dfd4
--- /dev/null
+++ b/src/monitor/pkg/controller/add_resourcebundlestate.go
@@ -0,0 +1,10 @@
+package controller
+
+import (
+ "monitor/pkg/controller/resourcebundlestate"
+)
+
+func init() {
+ AddToManagerFuncs = append(AddToManagerFuncs, resourcebundlestate.Add)
+ AddToManagerFuncs = append(AddToManagerFuncs, resourcebundlestate.AddPodController)
+}
diff --git a/src/monitor/pkg/controller/controller.go b/src/monitor/pkg/controller/controller.go
new file mode 100644
index 00000000..7c069f3e
--- /dev/null
+++ b/src/monitor/pkg/controller/controller.go
@@ -0,0 +1,18 @@
+package controller
+
+import (
+ "sigs.k8s.io/controller-runtime/pkg/manager"
+)
+
+// AddToManagerFuncs is a list of functions to add all Controllers to the Manager
+var AddToManagerFuncs []func(manager.Manager) error
+
+// AddToManager adds all Controllers to the Manager
+func AddToManager(m manager.Manager) error {
+ for _, f := range AddToManagerFuncs {
+ if err := f(m); err != nil {
+ return err
+ }
+ }
+ return nil
+}
diff --git a/src/monitor/pkg/controller/resourcebundlestate/controller.go b/src/monitor/pkg/controller/resourcebundlestate/controller.go
new file mode 100644
index 00000000..debd5f65
--- /dev/null
+++ b/src/monitor/pkg/controller/resourcebundlestate/controller.go
@@ -0,0 +1,126 @@
+package resourcebundlestate
+
+import (
+ "context"
+ "log"
+
+ "monitor/pkg/apis/k8splugin/v1alpha1"
+
+ corev1 "k8s.io/api/core/v1"
+ k8serrors "k8s.io/apimachinery/pkg/api/errors"
+ "sigs.k8s.io/controller-runtime/pkg/client"
+ "sigs.k8s.io/controller-runtime/pkg/controller"
+ "sigs.k8s.io/controller-runtime/pkg/manager"
+ "sigs.k8s.io/controller-runtime/pkg/reconcile"
+ "sigs.k8s.io/controller-runtime/pkg/source"
+)
+
+// Add the new controller to the controller manager
+func Add(mgr manager.Manager) error {
+ return add(mgr, newReconciler(mgr))
+}
+
+func add(mgr manager.Manager, r *reconciler) error {
+ // Create a new controller
+ c, err := controller.New("ResourceBundleState-controller", mgr, controller.Options{Reconciler: r})
+ if err != nil {
+ return err
+ }
+
+ // Watch for changes to primary resource ResourceBundleState
+ err = c.Watch(&source.Kind{Type: &v1alpha1.ResourceBundleState{}}, &EventHandler{})
+ if err != nil {
+ return err
+ }
+
+ return nil
+}
+
+func newReconciler(m manager.Manager) *reconciler {
+ return &reconciler{client: m.GetClient()}
+}
+
+type reconciler struct {
+ // Stores an array of all the ResourceBundleState
+ crList []v1alpha1.ResourceBundleState
+ client client.Client
+}
+
+// Reconcile implements the loop that will manage the ResourceBundleState CR
+// We only accept CREATE events here and any subsequent changes are ignored.
+func (r *reconciler) Reconcile(req reconcile.Request) (reconcile.Result, error) {
+ log.Printf("Reconciling ResourceBundleState %+v\n", req)
+
+ rbstate := &v1alpha1.ResourceBundleState{}
+ err := r.client.Get(context.TODO(), req.NamespacedName, rbstate)
+ if err != nil {
+ if k8serrors.IsNotFound(err) {
+ log.Printf("Object not found: %+v. Ignore as it must have been deleted.\n", req.NamespacedName)
+ return reconcile.Result{}, nil
+ }
+ log.Printf("Failed to get object: %+v\n", req.NamespacedName)
+ return reconcile.Result{}, err
+ }
+
+ err = r.updatePods(rbstate, rbstate.Spec.Selector.MatchLabels)
+ if err != nil {
+ log.Printf("Error adding podstatuses: %v\n", err)
+ return reconcile.Result{}, err
+ }
+
+ err = r.updateServices(rbstate, rbstate.Spec.Selector.MatchLabels)
+ if err != nil {
+ log.Printf("Error adding services: %v\n", err)
+ return reconcile.Result{}, err
+ }
+
+ // TODO: Update this based on the statuses of the lower resources
+ rbstate.Status.Ready = false
+ err = r.client.Status().Update(context.TODO(), rbstate)
+ if err != nil {
+ log.Printf("failed to update rbstate: %v\n", err)
+ return reconcile.Result{}, err
+ }
+
+ return reconcile.Result{}, nil
+}
+
+func (r *reconciler) updateServices(rbstate *v1alpha1.ResourceBundleState,
+ selectors map[string]string) error {
+
+ // Update the CR with the Services created as well
+ serviceList := &corev1.ServiceList{}
+ err := listResources(r.client, rbstate.Namespace, selectors, serviceList)
+ if err != nil {
+ log.Printf("Failed to list services: %v", err)
+ return err
+ }
+
+ rbstate.Status.ServiceStatuses = serviceList.Items
+ return nil
+}
+
+func (r *reconciler) updatePods(rbstate *v1alpha1.ResourceBundleState,
+ selectors map[string]string) error {
+
+ // Update the CR with the pods tracked
+ podList := &corev1.PodList{}
+ err := listResources(r.client, rbstate.Namespace, selectors, podList)
+ if err != nil {
+ log.Printf("Failed to list pods: %v", err)
+ return err
+ }
+
+ rbstate.Status.PodStatuses = []v1alpha1.PodStatus{}
+
+ for _, pod := range podList.Items {
+ resStatus := v1alpha1.PodStatus{
+ ObjectMeta: pod.ObjectMeta,
+ Ready: false,
+ Status: pod.Status,
+ }
+ rbstate.Status.PodStatuses = append(rbstate.Status.PodStatuses, resStatus)
+ }
+
+ return nil
+}
diff --git a/src/monitor/pkg/controller/resourcebundlestate/handler.go b/src/monitor/pkg/controller/resourcebundlestate/handler.go
new file mode 100644
index 00000000..b84af69c
--- /dev/null
+++ b/src/monitor/pkg/controller/resourcebundlestate/handler.go
@@ -0,0 +1,28 @@
+package resourcebundlestate
+
+import (
+ "k8s.io/client-go/util/workqueue"
+ "sigs.k8s.io/controller-runtime/pkg/event"
+ "sigs.k8s.io/controller-runtime/pkg/handler"
+)
+
+// EventHandler adds some specific handling for certain types of events
+// related to the ResourceBundleState CR.
+type EventHandler struct {
+ handler.EnqueueRequestForObject
+}
+
+// Delete ignores any delete operations on a ResourceBundleState CR
+func (p *EventHandler) Delete(evt event.DeleteEvent, q workqueue.RateLimitingInterface) {
+ return
+}
+
+// Update ignores any update operations on a ResourceBundleState CR
+func (p *EventHandler) Update(evt event.UpdateEvent, q workqueue.RateLimitingInterface) {
+ return
+}
+
+// Generic ignores any generic operations on a ResourceBundleState CR
+func (p *EventHandler) Generic(evt event.GenericEvent, q workqueue.RateLimitingInterface) {
+ return
+}
diff --git a/src/monitor/pkg/controller/resourcebundlestate/helpers.go b/src/monitor/pkg/controller/resourcebundlestate/helpers.go
new file mode 100644
index 00000000..dab78825
--- /dev/null
+++ b/src/monitor/pkg/controller/resourcebundlestate/helpers.go
@@ -0,0 +1,54 @@
+package resourcebundlestate
+
+import (
+ "context"
+ "log"
+
+ "k8s.io/apimachinery/pkg/labels"
+ "k8s.io/apimachinery/pkg/runtime"
+ "sigs.k8s.io/controller-runtime/pkg/client"
+)
+
+// checkLabel verifies if the expected label exists and returns bool
+func checkLabel(labels map[string]string) bool {
+
+ _, ok := labels["k8splugin.io/rb-inst-id"]
+ if !ok {
+ log.Printf("Pod does not have label. Filter it.")
+ return false
+ }
+ return true
+}
+
+// returnLabel verifies if the expected label exists and returns a map
+func returnLabel(labels map[string]string) map[string]string {
+
+ l, ok := labels["k8splugin.io/rb-inst-id"]
+ if !ok {
+ log.Printf("Pod does not have label. Filter it.")
+ return nil
+ }
+ return map[string]string{
+ "k8splugin.io/rb-inst-id": l,
+ }
+}
+
+// listResources lists resources based on the selectors provided
+// The data is returned in the pointer to the runtime.Object
+// provided as argument.
+func listResources(cli client.Client, namespace string,
+ labelSelector map[string]string, returnData runtime.Object) error {
+
+ listOptions := &client.ListOptions{
+ Namespace: namespace,
+ LabelSelector: labels.SelectorFromSet(labelSelector),
+ }
+
+ err := cli.List(context.TODO(), listOptions, returnData)
+ if err != nil {
+ log.Printf("Failed to list CRs: %v", err)
+ return err
+ }
+
+ return nil
+}
diff --git a/src/monitor/pkg/controller/resourcebundlestate/pod_controller.go b/src/monitor/pkg/controller/resourcebundlestate/pod_controller.go
new file mode 100644
index 00000000..e0742a13
--- /dev/null
+++ b/src/monitor/pkg/controller/resourcebundlestate/pod_controller.go
@@ -0,0 +1,183 @@
+package resourcebundlestate
+
+import (
+ "context"
+ "log"
+
+ "monitor/pkg/apis/k8splugin/v1alpha1"
+
+ corev1 "k8s.io/api/core/v1"
+ k8serrors "k8s.io/apimachinery/pkg/api/errors"
+ "k8s.io/apimachinery/pkg/types"
+ "sigs.k8s.io/controller-runtime/pkg/client"
+ "sigs.k8s.io/controller-runtime/pkg/controller"
+ "sigs.k8s.io/controller-runtime/pkg/handler"
+ "sigs.k8s.io/controller-runtime/pkg/manager"
+ "sigs.k8s.io/controller-runtime/pkg/reconcile"
+ "sigs.k8s.io/controller-runtime/pkg/source"
+)
+
+// AddPodController the new controller to the controller manager
+func AddPodController(mgr manager.Manager) error {
+ return addPodController(mgr, newPodReconciler(mgr))
+}
+
+func addPodController(mgr manager.Manager, r *podReconciler) error {
+ // Create a new controller
+ c, err := controller.New("ResourceBundleState-controller", mgr, controller.Options{Reconciler: r})
+ if err != nil {
+ return err
+ }
+
+ // Watch for changes to secondar resource Pods
+ // Predicate filters pods which don't have the k8splugin label
+ err = c.Watch(&source.Kind{Type: &corev1.Pod{}}, &handler.EnqueueRequestForObject{}, &podPredicate{})
+ if err != nil {
+ return err
+ }
+
+ return nil
+}
+
+func newPodReconciler(m manager.Manager) *podReconciler {
+ return &podReconciler{client: m.GetClient()}
+}
+
+type podReconciler struct {
+ client client.Client
+}
+
+// Reconcile implements the loop that will update the ResourceBundleState CR
+// whenever we get any updates from all the pods we watch.
+func (r *podReconciler) Reconcile(req reconcile.Request) (reconcile.Result, error) {
+ log.Printf("Updating ResourceBundleState for Pod: %+v\n", req)
+
+ pod := &corev1.Pod{}
+ err := r.client.Get(context.TODO(), req.NamespacedName, pod)
+ if err != nil {
+ if k8serrors.IsNotFound(err) {
+ log.Printf("Pod not found: %+v. Remove from CR if it is stored there.\n", req.NamespacedName)
+ // Remove the Pod's status from StatusList
+ // This can happen if we get the DeletionTimeStamp event
+ // after the POD has been deleted.
+ r.deletePodFromAllCRs(req.NamespacedName)
+ return reconcile.Result{}, nil
+ }
+ log.Printf("Failed to get pod: %+v\n", req.NamespacedName)
+ return reconcile.Result{}, err
+ }
+
+ // Find the CRs which track this pod via the labelselector
+ crSelector := returnLabel(pod.GetLabels())
+ if crSelector == nil {
+ log.Println("We should not be here. The predicate should have filtered this Pod")
+ }
+
+ // Get the CRs which have this label and update them all
+ // Ideally, we will have only one CR, but there is nothing
+ // preventing the creation of multiple.
+ // TODO: Consider using an admission validating webook to prevent multiple
+ rbStatusList := &v1alpha1.ResourceBundleStateList{}
+ err = listResources(r.client, req.Namespace, crSelector, rbStatusList)
+ if err != nil || len(rbStatusList.Items) == 0 {
+ log.Printf("Did not find any CRs tracking this resource\n")
+ return reconcile.Result{}, nil
+ }
+
+ err = r.updateCRs(rbStatusList, pod)
+ if err != nil {
+ // Requeue the update
+ return reconcile.Result{}, err
+ }
+
+ return reconcile.Result{}, nil
+}
+
+// deletePodFromAllCRs deletes pod status from all the CRs when the POD itself has been deleted
+// and we have not handled the updateCRs yet.
+// Since, we don't have the pod's labels, we need to look at all the CRs in this namespace
+func (r *podReconciler) deletePodFromAllCRs(namespacedName types.NamespacedName) error {
+
+ rbStatusList := &v1alpha1.ResourceBundleStateList{}
+ err := listResources(r.client, namespacedName.Namespace, nil, rbStatusList)
+ if err != nil || len(rbStatusList.Items) == 0 {
+ log.Printf("Did not find any CRs tracking this resource\n")
+ return nil
+ }
+ for _, cr := range rbStatusList.Items {
+ r.deleteFromSingleCR(&cr, namespacedName.Name)
+ }
+
+ return nil
+}
+
+func (r *podReconciler) updateCRs(crl *v1alpha1.ResourceBundleStateList, pod *corev1.Pod) error {
+
+ for _, cr := range crl.Items {
+ // Pod is not scheduled for deletion
+ if pod.DeletionTimestamp == nil {
+ err := r.updateSingleCR(&cr, pod)
+ if err != nil {
+ return err
+ }
+ } else {
+ // Pod is scheduled for deletion
+ r.deleteFromSingleCR(&cr, pod.Name)
+ }
+ }
+
+ return nil
+}
+
+func (r *podReconciler) deleteFromSingleCR(cr *v1alpha1.ResourceBundleState, name string) error {
+ cr.Status.ResourceCount--
+ length := len(cr.Status.PodStatuses)
+ for i, rstatus := range cr.Status.PodStatuses {
+ if rstatus.Name == name {
+ //Delete that status from the array
+ cr.Status.PodStatuses[i] = cr.Status.PodStatuses[length-1]
+ cr.Status.PodStatuses[length-1] = v1alpha1.PodStatus{}
+ cr.Status.PodStatuses = cr.Status.PodStatuses[:length-1]
+ return nil
+ }
+ }
+
+ log.Println("Did not find a status for POD in CR")
+ return nil
+}
+
+func (r *podReconciler) updateSingleCR(cr *v1alpha1.ResourceBundleState, pod *corev1.Pod) error {
+
+ // Update status after searching for it in the list of resourceStatuses
+ for i, rstatus := range cr.Status.PodStatuses {
+ // Look for the status if we already have it in the CR
+ if rstatus.Name == pod.Name {
+ pod.Status.DeepCopyInto(&cr.Status.PodStatuses[i].Status)
+ err := r.client.Status().Update(context.TODO(), cr)
+ if err != nil {
+ log.Printf("failed to update rbstate: %v\n", err)
+ return err
+ }
+ return nil
+ }
+ }
+
+ // Exited for loop with no status found
+ // Increment the number of tracked resources
+ cr.Status.ResourceCount++
+
+ // Add it to CR
+ cr.Status.PodStatuses = append(cr.Status.PodStatuses, v1alpha1.PodStatus{
+ ObjectMeta: pod.ObjectMeta,
+ Ready: false,
+ Status: pod.Status,
+ })
+
+ err := r.client.Status().Update(context.TODO(), cr)
+ if err != nil {
+ log.Printf("failed to update rbstate: %v\n", err)
+ return err
+ }
+
+ return nil
+}
diff --git a/src/monitor/pkg/controller/resourcebundlestate/pod_predicate.go b/src/monitor/pkg/controller/resourcebundlestate/pod_predicate.go
new file mode 100644
index 00000000..f1c1960c
--- /dev/null
+++ b/src/monitor/pkg/controller/resourcebundlestate/pod_predicate.go
@@ -0,0 +1,44 @@
+package resourcebundlestate
+
+import (
+ "sigs.k8s.io/controller-runtime/pkg/event"
+)
+
+type podPredicate struct {
+}
+
+func (p *podPredicate) Create(evt event.CreateEvent) bool {
+
+ if evt.Meta == nil {
+ return false
+ }
+
+ labels := evt.Meta.GetLabels()
+ return checkLabel(labels)
+}
+
+func (p *podPredicate) Delete(evt event.DeleteEvent) bool {
+
+ if evt.Meta == nil {
+ return false
+ }
+
+ labels := evt.Meta.GetLabels()
+ return checkLabel(labels)
+}
+
+func (p *podPredicate) Update(evt event.UpdateEvent) bool {
+
+ if evt.MetaNew == nil {
+ return false
+ }
+
+ labels := evt.MetaNew.GetLabels()
+ return checkLabel(labels)
+}
+
+func (p *podPredicate) Generic(evt event.GenericEvent) bool {
+
+ labels := evt.Meta.GetLabels()
+ return checkLabel(labels)
+}