summaryrefslogtreecommitdiffstats
path: root/src/monitor/pkg
diff options
context:
space:
mode:
Diffstat (limited to 'src/monitor/pkg')
-rw-r--r--src/monitor/pkg/apis/addtoscheme_k8splugin_v1alpha1.go10
-rw-r--r--src/monitor/pkg/apis/apis.go13
-rw-r--r--src/monitor/pkg/apis/k8splugin/group.go6
-rw-r--r--src/monitor/pkg/apis/k8splugin/v1alpha1/doc.go4
-rw-r--r--src/monitor/pkg/apis/k8splugin/v1alpha1/register.go19
-rw-r--r--src/monitor/pkg/apis/k8splugin/v1alpha1/types.go55
-rw-r--r--src/monitor/pkg/apis/k8splugin/v1alpha1/zz_generated.deepcopy.go141
-rw-r--r--src/monitor/pkg/apis/k8splugin/v1alpha1/zz_generated.openapi.go164
-rw-r--r--src/monitor/pkg/controller/add_resourcebundlestate.go10
-rw-r--r--src/monitor/pkg/controller/controller.go18
-rw-r--r--src/monitor/pkg/controller/resourcebundlestate/controller.go126
-rw-r--r--src/monitor/pkg/controller/resourcebundlestate/handler.go28
-rw-r--r--src/monitor/pkg/controller/resourcebundlestate/helpers.go54
-rw-r--r--src/monitor/pkg/controller/resourcebundlestate/pod_controller.go183
-rw-r--r--src/monitor/pkg/controller/resourcebundlestate/pod_predicate.go44
15 files changed, 875 insertions, 0 deletions
diff --git a/src/monitor/pkg/apis/addtoscheme_k8splugin_v1alpha1.go b/src/monitor/pkg/apis/addtoscheme_k8splugin_v1alpha1.go
new file mode 100644
index 00000000..3c8f595d
--- /dev/null
+++ b/src/monitor/pkg/apis/addtoscheme_k8splugin_v1alpha1.go
@@ -0,0 +1,10 @@
+package apis
+
+import (
+ "monitor/pkg/apis/k8splugin/v1alpha1"
+)
+
+func init() {
+ // Register the types with the Scheme so the components can map objects to GroupVersionKinds and back
+ AddToSchemes = append(AddToSchemes, v1alpha1.SchemeBuilder.AddToScheme)
+}
diff --git a/src/monitor/pkg/apis/apis.go b/src/monitor/pkg/apis/apis.go
new file mode 100644
index 00000000..07dc9616
--- /dev/null
+++ b/src/monitor/pkg/apis/apis.go
@@ -0,0 +1,13 @@
+package apis
+
+import (
+ "k8s.io/apimachinery/pkg/runtime"
+)
+
+// AddToSchemes may be used to add all resources defined in the project to a Scheme
+var AddToSchemes runtime.SchemeBuilder
+
+// AddToScheme adds all Resources to the Scheme
+func AddToScheme(s *runtime.Scheme) error {
+ return AddToSchemes.AddToScheme(s)
+}
diff --git a/src/monitor/pkg/apis/k8splugin/group.go b/src/monitor/pkg/apis/k8splugin/group.go
new file mode 100644
index 00000000..9696c626
--- /dev/null
+++ b/src/monitor/pkg/apis/k8splugin/group.go
@@ -0,0 +1,6 @@
+// Package k8splugin contains k8splugin API versions.
+//
+// This file ensures Go source parsers acknowledge the k8splugin package
+// and any child packages. It can be removed if any other Go source files are
+// added to this package.
+package k8splugin
diff --git a/src/monitor/pkg/apis/k8splugin/v1alpha1/doc.go b/src/monitor/pkg/apis/k8splugin/v1alpha1/doc.go
new file mode 100644
index 00000000..c2bb0907
--- /dev/null
+++ b/src/monitor/pkg/apis/k8splugin/v1alpha1/doc.go
@@ -0,0 +1,4 @@
+// Package v1alpha1 contains API Schema definitions for the k8splugin v1alpha1 API group
+// +k8s:deepcopy-gen=package,register
+// +groupName=k8splugin.io
+package v1alpha1
diff --git a/src/monitor/pkg/apis/k8splugin/v1alpha1/register.go b/src/monitor/pkg/apis/k8splugin/v1alpha1/register.go
new file mode 100644
index 00000000..ee2af820
--- /dev/null
+++ b/src/monitor/pkg/apis/k8splugin/v1alpha1/register.go
@@ -0,0 +1,19 @@
+// NOTE: Boilerplate only. Ignore this file.
+
+// Package v1alpha1 contains API Schema definitions for the k8splugin v1alpha1 API group
+// +k8s:deepcopy-gen=package,register
+// +groupName=k8splugin.io
+package v1alpha1
+
+import (
+ "k8s.io/apimachinery/pkg/runtime/schema"
+ "sigs.k8s.io/controller-runtime/pkg/runtime/scheme"
+)
+
+var (
+ // SchemeGroupVersion is group version used to register these objects
+ SchemeGroupVersion = schema.GroupVersion{Group: "k8splugin.io", Version: "v1alpha1"}
+
+ // SchemeBuilder is used to add go types to the GroupVersionKind scheme
+ SchemeBuilder = &scheme.Builder{GroupVersion: SchemeGroupVersion}
+)
diff --git a/src/monitor/pkg/apis/k8splugin/v1alpha1/types.go b/src/monitor/pkg/apis/k8splugin/v1alpha1/types.go
new file mode 100644
index 00000000..22dfdd25
--- /dev/null
+++ b/src/monitor/pkg/apis/k8splugin/v1alpha1/types.go
@@ -0,0 +1,55 @@
+package v1alpha1
+
+import (
+ corev1 "k8s.io/api/core/v1"
+ metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
+)
+
+// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object
+
+// ResourceBundleState is the Schema for the ResourceBundleStatees API
+// +k8s:openapi-gen=true
+// +kubebuilder:subresource:status
+type ResourceBundleState struct {
+ metav1.TypeMeta `json:",inline"`
+ metav1.ObjectMeta `json:"metadata,omitempty" protobuf:"bytes,1,opt,name=metadata"`
+
+ Spec ResourceBundleStateSpec `json:"spec,omitempty" protobuf:"bytes,2,opt,name=spec"`
+ Status ResourceBundleStatus `json:"status,omitempty" protobuf:"bytes,3,opt,name=status"`
+}
+
+// ResourceBundleStateSpec defines the desired state of ResourceBundleState
+// +k8s:openapi-gen=true
+type ResourceBundleStateSpec struct {
+ Selector *metav1.LabelSelector `json:"selector" protobuf:"bytes,1,opt,name=selector"`
+}
+
+// ResourceBundleStatus defines the observed state of ResourceBundleState
+// +k8s:openapi-gen=true
+type ResourceBundleStatus struct {
+ Ready bool `json:"ready" protobuf:"varint,1,opt,name=ready"`
+ ResourceCount int32 `json:"resourceCount" protobuf:"varint,2,opt,name=resourceCount"`
+ PodStatuses []PodStatus `json:"podStatuses" protobuf:"varint,3,opt,name=podStatuses"`
+ ServiceStatuses []corev1.Service `json:"serviceStatuses" protobuf:"varint,4,opt,name=serviceStatuses"`
+}
+
+// PodStatus defines the observed state of ResourceBundleState
+// +k8s:openapi-gen=true
+type PodStatus struct {
+ metav1.ObjectMeta `json:"metadata,omitempty" protobuf:"bytes,1,opt,name=metadata"`
+ Ready bool `json:"ready" protobuf:"varint,2,opt,name=ready"`
+ Status corev1.PodStatus `json:"status,omitempty" protobuf:"bytes,3,opt,name=status"`
+}
+
+// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object
+
+// ResourceBundleStateList contains a list of ResourceBundleState
+type ResourceBundleStateList struct {
+ metav1.TypeMeta `json:",inline"`
+ metav1.ListMeta `json:"metadata,omitempty"`
+ Items []ResourceBundleState `json:"items"`
+}
+
+func init() {
+ SchemeBuilder.Register(&ResourceBundleState{}, &ResourceBundleStateList{})
+}
diff --git a/src/monitor/pkg/apis/k8splugin/v1alpha1/zz_generated.deepcopy.go b/src/monitor/pkg/apis/k8splugin/v1alpha1/zz_generated.deepcopy.go
new file mode 100644
index 00000000..72036ed1
--- /dev/null
+++ b/src/monitor/pkg/apis/k8splugin/v1alpha1/zz_generated.deepcopy.go
@@ -0,0 +1,141 @@
+// +build !ignore_autogenerated
+
+// Code generated by operator-sdk-v0.9.0-x86_64-linux-gnu. DO NOT EDIT.
+
+package v1alpha1
+
+import (
+ corev1 "k8s.io/api/core/v1"
+ v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
+ runtime "k8s.io/apimachinery/pkg/runtime"
+)
+
+// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
+func (in *PodStatus) DeepCopyInto(out *PodStatus) {
+ *out = *in
+ in.ObjectMeta.DeepCopyInto(&out.ObjectMeta)
+ in.Status.DeepCopyInto(&out.Status)
+ return
+}
+
+// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new PodStatus.
+func (in *PodStatus) DeepCopy() *PodStatus {
+ if in == nil {
+ return nil
+ }
+ out := new(PodStatus)
+ in.DeepCopyInto(out)
+ return out
+}
+
+// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
+func (in *ResourceBundleState) DeepCopyInto(out *ResourceBundleState) {
+ *out = *in
+ out.TypeMeta = in.TypeMeta
+ in.ObjectMeta.DeepCopyInto(&out.ObjectMeta)
+ in.Spec.DeepCopyInto(&out.Spec)
+ in.Status.DeepCopyInto(&out.Status)
+ return
+}
+
+// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new ResourceBundleState.
+func (in *ResourceBundleState) DeepCopy() *ResourceBundleState {
+ if in == nil {
+ return nil
+ }
+ out := new(ResourceBundleState)
+ in.DeepCopyInto(out)
+ return out
+}
+
+// DeepCopyObject is an autogenerated deepcopy function, copying the receiver, creating a new runtime.Object.
+func (in *ResourceBundleState) DeepCopyObject() runtime.Object {
+ if c := in.DeepCopy(); c != nil {
+ return c
+ }
+ return nil
+}
+
+// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
+func (in *ResourceBundleStateList) DeepCopyInto(out *ResourceBundleStateList) {
+ *out = *in
+ out.TypeMeta = in.TypeMeta
+ out.ListMeta = in.ListMeta
+ if in.Items != nil {
+ in, out := &in.Items, &out.Items
+ *out = make([]ResourceBundleState, len(*in))
+ for i := range *in {
+ (*in)[i].DeepCopyInto(&(*out)[i])
+ }
+ }
+ return
+}
+
+// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new ResourceBundleStateList.
+func (in *ResourceBundleStateList) DeepCopy() *ResourceBundleStateList {
+ if in == nil {
+ return nil
+ }
+ out := new(ResourceBundleStateList)
+ in.DeepCopyInto(out)
+ return out
+}
+
+// DeepCopyObject is an autogenerated deepcopy function, copying the receiver, creating a new runtime.Object.
+func (in *ResourceBundleStateList) DeepCopyObject() runtime.Object {
+ if c := in.DeepCopy(); c != nil {
+ return c
+ }
+ return nil
+}
+
+// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
+func (in *ResourceBundleStateSpec) DeepCopyInto(out *ResourceBundleStateSpec) {
+ *out = *in
+ if in.Selector != nil {
+ in, out := &in.Selector, &out.Selector
+ *out = new(v1.LabelSelector)
+ (*in).DeepCopyInto(*out)
+ }
+ return
+}
+
+// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new ResourceBundleStateSpec.
+func (in *ResourceBundleStateSpec) DeepCopy() *ResourceBundleStateSpec {
+ if in == nil {
+ return nil
+ }
+ out := new(ResourceBundleStateSpec)
+ in.DeepCopyInto(out)
+ return out
+}
+
+// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
+func (in *ResourceBundleStatus) DeepCopyInto(out *ResourceBundleStatus) {
+ *out = *in
+ if in.PodStatuses != nil {
+ in, out := &in.PodStatuses, &out.PodStatuses
+ *out = make([]PodStatus, len(*in))
+ for i := range *in {
+ (*in)[i].DeepCopyInto(&(*out)[i])
+ }
+ }
+ if in.ServiceStatuses != nil {
+ in, out := &in.ServiceStatuses, &out.ServiceStatuses
+ *out = make([]corev1.Service, len(*in))
+ for i := range *in {
+ (*in)[i].DeepCopyInto(&(*out)[i])
+ }
+ }
+ return
+}
+
+// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new ResourceBundleStatus.
+func (in *ResourceBundleStatus) DeepCopy() *ResourceBundleStatus {
+ if in == nil {
+ return nil
+ }
+ out := new(ResourceBundleStatus)
+ in.DeepCopyInto(out)
+ return out
+}
diff --git a/src/monitor/pkg/apis/k8splugin/v1alpha1/zz_generated.openapi.go b/src/monitor/pkg/apis/k8splugin/v1alpha1/zz_generated.openapi.go
new file mode 100644
index 00000000..232acefb
--- /dev/null
+++ b/src/monitor/pkg/apis/k8splugin/v1alpha1/zz_generated.openapi.go
@@ -0,0 +1,164 @@
+// +build !
+
+// This file was autogenerated by openapi-gen. Do not edit it manually!
+
+package v1alpha1
+
+import (
+ spec "github.com/go-openapi/spec"
+ common "k8s.io/kube-openapi/pkg/common"
+)
+
+func GetOpenAPIDefinitions(ref common.ReferenceCallback) map[string]common.OpenAPIDefinition {
+ return map[string]common.OpenAPIDefinition{
+ "monitor/pkg/apis/k8splugin/v1alpha1.PodStatus": schema_pkg_apis_k8splugin_v1alpha1_PodStatus(ref),
+ "monitor/pkg/apis/k8splugin/v1alpha1.ResourceBundleState": schema_pkg_apis_k8splugin_v1alpha1_ResourceBundleState(ref),
+ "monitor/pkg/apis/k8splugin/v1alpha1.ResourceBundleStateSpec": schema_pkg_apis_k8splugin_v1alpha1_ResourceBundleStateSpec(ref),
+ "monitor/pkg/apis/k8splugin/v1alpha1.ResourceBundleStatus": schema_pkg_apis_k8splugin_v1alpha1_ResourceBundleStatus(ref),
+ }
+}
+
+func schema_pkg_apis_k8splugin_v1alpha1_PodStatus(ref common.ReferenceCallback) common.OpenAPIDefinition {
+ return common.OpenAPIDefinition{
+ Schema: spec.Schema{
+ SchemaProps: spec.SchemaProps{
+ Description: "PodStatus defines the observed state of ResourceBundleState",
+ Properties: map[string]spec.Schema{
+ "metadata": {
+ SchemaProps: spec.SchemaProps{
+ Ref: ref("k8s.io/apimachinery/pkg/apis/meta/v1.ObjectMeta"),
+ },
+ },
+ "ready": {
+ SchemaProps: spec.SchemaProps{
+ Type: []string{"boolean"},
+ Format: "",
+ },
+ },
+ "status": {
+ SchemaProps: spec.SchemaProps{
+ Ref: ref("k8s.io/api/core/v1.PodStatus"),
+ },
+ },
+ },
+ Required: []string{"ready"},
+ },
+ },
+ Dependencies: []string{
+ "k8s.io/api/core/v1.PodStatus", "k8s.io/apimachinery/pkg/apis/meta/v1.ObjectMeta"},
+ }
+}
+
+func schema_pkg_apis_k8splugin_v1alpha1_ResourceBundleState(ref common.ReferenceCallback) common.OpenAPIDefinition {
+ return common.OpenAPIDefinition{
+ Schema: spec.Schema{
+ SchemaProps: spec.SchemaProps{
+ Description: "ResourceBundleState is the Schema for the ResourceBundleStatees API",
+ Properties: map[string]spec.Schema{
+ "kind": {
+ SchemaProps: spec.SchemaProps{
+ Description: "Kind is a string value representing the REST resource this object represents. Servers may infer this from the endpoint the client submits requests to. Cannot be updated. In CamelCase. More info: https://git.k8s.io/community/contributors/devel/api-conventions.md#types-kinds",
+ Type: []string{"string"},
+ Format: "",
+ },
+ },
+ "apiVersion": {
+ SchemaProps: spec.SchemaProps{
+ Description: "APIVersion defines the versioned schema of this representation of an object. Servers should convert recognized schemas to the latest internal value, and may reject unrecognized values. More info: https://git.k8s.io/community/contributors/devel/api-conventions.md#resources",
+ Type: []string{"string"},
+ Format: "",
+ },
+ },
+ "metadata": {
+ SchemaProps: spec.SchemaProps{
+ Ref: ref("k8s.io/apimachinery/pkg/apis/meta/v1.ObjectMeta"),
+ },
+ },
+ "spec": {
+ SchemaProps: spec.SchemaProps{
+ Ref: ref("monitor/pkg/apis/k8splugin/v1alpha1.ResourceBundleStateSpec"),
+ },
+ },
+ "status": {
+ SchemaProps: spec.SchemaProps{
+ Ref: ref("monitor/pkg/apis/k8splugin/v1alpha1.ResourceBundleStatus"),
+ },
+ },
+ },
+ },
+ },
+ Dependencies: []string{
+ "k8s.io/apimachinery/pkg/apis/meta/v1.ObjectMeta", "monitor/pkg/apis/k8splugin/v1alpha1.ResourceBundleStateSpec", "monitor/pkg/apis/k8splugin/v1alpha1.ResourceBundleStatus"},
+ }
+}
+
+func schema_pkg_apis_k8splugin_v1alpha1_ResourceBundleStateSpec(ref common.ReferenceCallback) common.OpenAPIDefinition {
+ return common.OpenAPIDefinition{
+ Schema: spec.Schema{
+ SchemaProps: spec.SchemaProps{
+ Description: "ResourceBundleStateSpec defines the desired state of ResourceBundleState",
+ Properties: map[string]spec.Schema{
+ "selector": {
+ SchemaProps: spec.SchemaProps{
+ Ref: ref("k8s.io/apimachinery/pkg/apis/meta/v1.LabelSelector"),
+ },
+ },
+ },
+ Required: []string{"selector"},
+ },
+ },
+ Dependencies: []string{
+ "k8s.io/apimachinery/pkg/apis/meta/v1.LabelSelector"},
+ }
+}
+
+func schema_pkg_apis_k8splugin_v1alpha1_ResourceBundleStatus(ref common.ReferenceCallback) common.OpenAPIDefinition {
+ return common.OpenAPIDefinition{
+ Schema: spec.Schema{
+ SchemaProps: spec.SchemaProps{
+ Description: "ResourceBundleStatus defines the observed state of ResourceBundleState",
+ Properties: map[string]spec.Schema{
+ "ready": {
+ SchemaProps: spec.SchemaProps{
+ Type: []string{"boolean"},
+ Format: "",
+ },
+ },
+ "resourceCount": {
+ SchemaProps: spec.SchemaProps{
+ Type: []string{"integer"},
+ Format: "int32",
+ },
+ },
+ "podStatuses": {
+ SchemaProps: spec.SchemaProps{
+ Type: []string{"array"},
+ Items: &spec.SchemaOrArray{
+ Schema: &spec.Schema{
+ SchemaProps: spec.SchemaProps{
+ Ref: ref("monitor/pkg/apis/k8splugin/v1alpha1.PodStatus"),
+ },
+ },
+ },
+ },
+ },
+ "serviceStatuses": {
+ SchemaProps: spec.SchemaProps{
+ Type: []string{"array"},
+ Items: &spec.SchemaOrArray{
+ Schema: &spec.Schema{
+ SchemaProps: spec.SchemaProps{
+ Ref: ref("k8s.io/api/core/v1.Service"),
+ },
+ },
+ },
+ },
+ },
+ },
+ Required: []string{"ready", "resourceCount", "podStatuses", "serviceStatuses"},
+ },
+ },
+ Dependencies: []string{
+ "k8s.io/api/core/v1.Service", "monitor/pkg/apis/k8splugin/v1alpha1.PodStatus"},
+ }
+}
diff --git a/src/monitor/pkg/controller/add_resourcebundlestate.go b/src/monitor/pkg/controller/add_resourcebundlestate.go
new file mode 100644
index 00000000..c709dfd4
--- /dev/null
+++ b/src/monitor/pkg/controller/add_resourcebundlestate.go
@@ -0,0 +1,10 @@
+package controller
+
+import (
+ "monitor/pkg/controller/resourcebundlestate"
+)
+
+func init() {
+ AddToManagerFuncs = append(AddToManagerFuncs, resourcebundlestate.Add)
+ AddToManagerFuncs = append(AddToManagerFuncs, resourcebundlestate.AddPodController)
+}
diff --git a/src/monitor/pkg/controller/controller.go b/src/monitor/pkg/controller/controller.go
new file mode 100644
index 00000000..7c069f3e
--- /dev/null
+++ b/src/monitor/pkg/controller/controller.go
@@ -0,0 +1,18 @@
+package controller
+
+import (
+ "sigs.k8s.io/controller-runtime/pkg/manager"
+)
+
+// AddToManagerFuncs is a list of functions to add all Controllers to the Manager
+var AddToManagerFuncs []func(manager.Manager) error
+
+// AddToManager adds all Controllers to the Manager
+func AddToManager(m manager.Manager) error {
+ for _, f := range AddToManagerFuncs {
+ if err := f(m); err != nil {
+ return err
+ }
+ }
+ return nil
+}
diff --git a/src/monitor/pkg/controller/resourcebundlestate/controller.go b/src/monitor/pkg/controller/resourcebundlestate/controller.go
new file mode 100644
index 00000000..debd5f65
--- /dev/null
+++ b/src/monitor/pkg/controller/resourcebundlestate/controller.go
@@ -0,0 +1,126 @@
+package resourcebundlestate
+
+import (
+ "context"
+ "log"
+
+ "monitor/pkg/apis/k8splugin/v1alpha1"
+
+ corev1 "k8s.io/api/core/v1"
+ k8serrors "k8s.io/apimachinery/pkg/api/errors"
+ "sigs.k8s.io/controller-runtime/pkg/client"
+ "sigs.k8s.io/controller-runtime/pkg/controller"
+ "sigs.k8s.io/controller-runtime/pkg/manager"
+ "sigs.k8s.io/controller-runtime/pkg/reconcile"
+ "sigs.k8s.io/controller-runtime/pkg/source"
+)
+
+// Add the new controller to the controller manager
+func Add(mgr manager.Manager) error {
+ return add(mgr, newReconciler(mgr))
+}
+
+func add(mgr manager.Manager, r *reconciler) error {
+ // Create a new controller
+ c, err := controller.New("ResourceBundleState-controller", mgr, controller.Options{Reconciler: r})
+ if err != nil {
+ return err
+ }
+
+ // Watch for changes to primary resource ResourceBundleState
+ err = c.Watch(&source.Kind{Type: &v1alpha1.ResourceBundleState{}}, &EventHandler{})
+ if err != nil {
+ return err
+ }
+
+ return nil
+}
+
+func newReconciler(m manager.Manager) *reconciler {
+ return &reconciler{client: m.GetClient()}
+}
+
+type reconciler struct {
+ // Stores an array of all the ResourceBundleState
+ crList []v1alpha1.ResourceBundleState
+ client client.Client
+}
+
+// Reconcile implements the loop that will manage the ResourceBundleState CR
+// We only accept CREATE events here and any subsequent changes are ignored.
+func (r *reconciler) Reconcile(req reconcile.Request) (reconcile.Result, error) {
+ log.Printf("Reconciling ResourceBundleState %+v\n", req)
+
+ rbstate := &v1alpha1.ResourceBundleState{}
+ err := r.client.Get(context.TODO(), req.NamespacedName, rbstate)
+ if err != nil {
+ if k8serrors.IsNotFound(err) {
+ log.Printf("Object not found: %+v. Ignore as it must have been deleted.\n", req.NamespacedName)
+ return reconcile.Result{}, nil
+ }
+ log.Printf("Failed to get object: %+v\n", req.NamespacedName)
+ return reconcile.Result{}, err
+ }
+
+ err = r.updatePods(rbstate, rbstate.Spec.Selector.MatchLabels)
+ if err != nil {
+ log.Printf("Error adding podstatuses: %v\n", err)
+ return reconcile.Result{}, err
+ }
+
+ err = r.updateServices(rbstate, rbstate.Spec.Selector.MatchLabels)
+ if err != nil {
+ log.Printf("Error adding services: %v\n", err)
+ return reconcile.Result{}, err
+ }
+
+ // TODO: Update this based on the statuses of the lower resources
+ rbstate.Status.Ready = false
+ err = r.client.Status().Update(context.TODO(), rbstate)
+ if err != nil {
+ log.Printf("failed to update rbstate: %v\n", err)
+ return reconcile.Result{}, err
+ }
+
+ return reconcile.Result{}, nil
+}
+
+func (r *reconciler) updateServices(rbstate *v1alpha1.ResourceBundleState,
+ selectors map[string]string) error {
+
+ // Update the CR with the Services created as well
+ serviceList := &corev1.ServiceList{}
+ err := listResources(r.client, rbstate.Namespace, selectors, serviceList)
+ if err != nil {
+ log.Printf("Failed to list services: %v", err)
+ return err
+ }
+
+ rbstate.Status.ServiceStatuses = serviceList.Items
+ return nil
+}
+
+func (r *reconciler) updatePods(rbstate *v1alpha1.ResourceBundleState,
+ selectors map[string]string) error {
+
+ // Update the CR with the pods tracked
+ podList := &corev1.PodList{}
+ err := listResources(r.client, rbstate.Namespace, selectors, podList)
+ if err != nil {
+ log.Printf("Failed to list pods: %v", err)
+ return err
+ }
+
+ rbstate.Status.PodStatuses = []v1alpha1.PodStatus{}
+
+ for _, pod := range podList.Items {
+ resStatus := v1alpha1.PodStatus{
+ ObjectMeta: pod.ObjectMeta,
+ Ready: false,
+ Status: pod.Status,
+ }
+ rbstate.Status.PodStatuses = append(rbstate.Status.PodStatuses, resStatus)
+ }
+
+ return nil
+}
diff --git a/src/monitor/pkg/controller/resourcebundlestate/handler.go b/src/monitor/pkg/controller/resourcebundlestate/handler.go
new file mode 100644
index 00000000..b84af69c
--- /dev/null
+++ b/src/monitor/pkg/controller/resourcebundlestate/handler.go
@@ -0,0 +1,28 @@
+package resourcebundlestate
+
+import (
+ "k8s.io/client-go/util/workqueue"
+ "sigs.k8s.io/controller-runtime/pkg/event"
+ "sigs.k8s.io/controller-runtime/pkg/handler"
+)
+
+// EventHandler adds some specific handling for certain types of events
+// related to the ResourceBundleState CR.
+type EventHandler struct {
+ handler.EnqueueRequestForObject
+}
+
+// Delete ignores any delete operations on a ResourceBundleState CR
+func (p *EventHandler) Delete(evt event.DeleteEvent, q workqueue.RateLimitingInterface) {
+ return
+}
+
+// Update ignores any update operations on a ResourceBundleState CR
+func (p *EventHandler) Update(evt event.UpdateEvent, q workqueue.RateLimitingInterface) {
+ return
+}
+
+// Generic ignores any generic operations on a ResourceBundleState CR
+func (p *EventHandler) Generic(evt event.GenericEvent, q workqueue.RateLimitingInterface) {
+ return
+}
diff --git a/src/monitor/pkg/controller/resourcebundlestate/helpers.go b/src/monitor/pkg/controller/resourcebundlestate/helpers.go
new file mode 100644
index 00000000..dab78825
--- /dev/null
+++ b/src/monitor/pkg/controller/resourcebundlestate/helpers.go
@@ -0,0 +1,54 @@
+package resourcebundlestate
+
+import (
+ "context"
+ "log"
+
+ "k8s.io/apimachinery/pkg/labels"
+ "k8s.io/apimachinery/pkg/runtime"
+ "sigs.k8s.io/controller-runtime/pkg/client"
+)
+
+// checkLabel verifies if the expected label exists and returns bool
+func checkLabel(labels map[string]string) bool {
+
+ _, ok := labels["k8splugin.io/rb-inst-id"]
+ if !ok {
+ log.Printf("Pod does not have label. Filter it.")
+ return false
+ }
+ return true
+}
+
+// returnLabel verifies if the expected label exists and returns a map
+func returnLabel(labels map[string]string) map[string]string {
+
+ l, ok := labels["k8splugin.io/rb-inst-id"]
+ if !ok {
+ log.Printf("Pod does not have label. Filter it.")
+ return nil
+ }
+ return map[string]string{
+ "k8splugin.io/rb-inst-id": l,
+ }
+}
+
+// listResources lists resources based on the selectors provided
+// The data is returned in the pointer to the runtime.Object
+// provided as argument.
+func listResources(cli client.Client, namespace string,
+ labelSelector map[string]string, returnData runtime.Object) error {
+
+ listOptions := &client.ListOptions{
+ Namespace: namespace,
+ LabelSelector: labels.SelectorFromSet(labelSelector),
+ }
+
+ err := cli.List(context.TODO(), listOptions, returnData)
+ if err != nil {
+ log.Printf("Failed to list CRs: %v", err)
+ return err
+ }
+
+ return nil
+}
diff --git a/src/monitor/pkg/controller/resourcebundlestate/pod_controller.go b/src/monitor/pkg/controller/resourcebundlestate/pod_controller.go
new file mode 100644
index 00000000..e0742a13
--- /dev/null
+++ b/src/monitor/pkg/controller/resourcebundlestate/pod_controller.go
@@ -0,0 +1,183 @@
+package resourcebundlestate
+
+import (
+ "context"
+ "log"
+
+ "monitor/pkg/apis/k8splugin/v1alpha1"
+
+ corev1 "k8s.io/api/core/v1"
+ k8serrors "k8s.io/apimachinery/pkg/api/errors"
+ "k8s.io/apimachinery/pkg/types"
+ "sigs.k8s.io/controller-runtime/pkg/client"
+ "sigs.k8s.io/controller-runtime/pkg/controller"
+ "sigs.k8s.io/controller-runtime/pkg/handler"
+ "sigs.k8s.io/controller-runtime/pkg/manager"
+ "sigs.k8s.io/controller-runtime/pkg/reconcile"
+ "sigs.k8s.io/controller-runtime/pkg/source"
+)
+
+// AddPodController the new controller to the controller manager
+func AddPodController(mgr manager.Manager) error {
+ return addPodController(mgr, newPodReconciler(mgr))
+}
+
+func addPodController(mgr manager.Manager, r *podReconciler) error {
+ // Create a new controller
+ c, err := controller.New("ResourceBundleState-controller", mgr, controller.Options{Reconciler: r})
+ if err != nil {
+ return err
+ }
+
+ // Watch for changes to secondar resource Pods
+ // Predicate filters pods which don't have the k8splugin label
+ err = c.Watch(&source.Kind{Type: &corev1.Pod{}}, &handler.EnqueueRequestForObject{}, &podPredicate{})
+ if err != nil {
+ return err
+ }
+
+ return nil
+}
+
+func newPodReconciler(m manager.Manager) *podReconciler {
+ return &podReconciler{client: m.GetClient()}
+}
+
+type podReconciler struct {
+ client client.Client
+}
+
+// Reconcile implements the loop that will update the ResourceBundleState CR
+// whenever we get any updates from all the pods we watch.
+func (r *podReconciler) Reconcile(req reconcile.Request) (reconcile.Result, error) {
+ log.Printf("Updating ResourceBundleState for Pod: %+v\n", req)
+
+ pod := &corev1.Pod{}
+ err := r.client.Get(context.TODO(), req.NamespacedName, pod)
+ if err != nil {
+ if k8serrors.IsNotFound(err) {
+ log.Printf("Pod not found: %+v. Remove from CR if it is stored there.\n", req.NamespacedName)
+ // Remove the Pod's status from StatusList
+ // This can happen if we get the DeletionTimeStamp event
+ // after the POD has been deleted.
+ r.deletePodFromAllCRs(req.NamespacedName)
+ return reconcile.Result{}, nil
+ }
+ log.Printf("Failed to get pod: %+v\n", req.NamespacedName)
+ return reconcile.Result{}, err
+ }
+
+ // Find the CRs which track this pod via the labelselector
+ crSelector := returnLabel(pod.GetLabels())
+ if crSelector == nil {
+ log.Println("We should not be here. The predicate should have filtered this Pod")
+ }
+
+ // Get the CRs which have this label and update them all
+ // Ideally, we will have only one CR, but there is nothing
+ // preventing the creation of multiple.
+ // TODO: Consider using an admission validating webook to prevent multiple
+ rbStatusList := &v1alpha1.ResourceBundleStateList{}
+ err = listResources(r.client, req.Namespace, crSelector, rbStatusList)
+ if err != nil || len(rbStatusList.Items) == 0 {
+ log.Printf("Did not find any CRs tracking this resource\n")
+ return reconcile.Result{}, nil
+ }
+
+ err = r.updateCRs(rbStatusList, pod)
+ if err != nil {
+ // Requeue the update
+ return reconcile.Result{}, err
+ }
+
+ return reconcile.Result{}, nil
+}
+
+// deletePodFromAllCRs deletes pod status from all the CRs when the POD itself has been deleted
+// and we have not handled the updateCRs yet.
+// Since, we don't have the pod's labels, we need to look at all the CRs in this namespace
+func (r *podReconciler) deletePodFromAllCRs(namespacedName types.NamespacedName) error {
+
+ rbStatusList := &v1alpha1.ResourceBundleStateList{}
+ err := listResources(r.client, namespacedName.Namespace, nil, rbStatusList)
+ if err != nil || len(rbStatusList.Items) == 0 {
+ log.Printf("Did not find any CRs tracking this resource\n")
+ return nil
+ }
+ for _, cr := range rbStatusList.Items {
+ r.deleteFromSingleCR(&cr, namespacedName.Name)
+ }
+
+ return nil
+}
+
+func (r *podReconciler) updateCRs(crl *v1alpha1.ResourceBundleStateList, pod *corev1.Pod) error {
+
+ for _, cr := range crl.Items {
+ // Pod is not scheduled for deletion
+ if pod.DeletionTimestamp == nil {
+ err := r.updateSingleCR(&cr, pod)
+ if err != nil {
+ return err
+ }
+ } else {
+ // Pod is scheduled for deletion
+ r.deleteFromSingleCR(&cr, pod.Name)
+ }
+ }
+
+ return nil
+}
+
+func (r *podReconciler) deleteFromSingleCR(cr *v1alpha1.ResourceBundleState, name string) error {
+ cr.Status.ResourceCount--
+ length := len(cr.Status.PodStatuses)
+ for i, rstatus := range cr.Status.PodStatuses {
+ if rstatus.Name == name {
+ //Delete that status from the array
+ cr.Status.PodStatuses[i] = cr.Status.PodStatuses[length-1]
+ cr.Status.PodStatuses[length-1] = v1alpha1.PodStatus{}
+ cr.Status.PodStatuses = cr.Status.PodStatuses[:length-1]
+ return nil
+ }
+ }
+
+ log.Println("Did not find a status for POD in CR")
+ return nil
+}
+
+func (r *podReconciler) updateSingleCR(cr *v1alpha1.ResourceBundleState, pod *corev1.Pod) error {
+
+ // Update status after searching for it in the list of resourceStatuses
+ for i, rstatus := range cr.Status.PodStatuses {
+ // Look for the status if we already have it in the CR
+ if rstatus.Name == pod.Name {
+ pod.Status.DeepCopyInto(&cr.Status.PodStatuses[i].Status)
+ err := r.client.Status().Update(context.TODO(), cr)
+ if err != nil {
+ log.Printf("failed to update rbstate: %v\n", err)
+ return err
+ }
+ return nil
+ }
+ }
+
+ // Exited for loop with no status found
+ // Increment the number of tracked resources
+ cr.Status.ResourceCount++
+
+ // Add it to CR
+ cr.Status.PodStatuses = append(cr.Status.PodStatuses, v1alpha1.PodStatus{
+ ObjectMeta: pod.ObjectMeta,
+ Ready: false,
+ Status: pod.Status,
+ })
+
+ err := r.client.Status().Update(context.TODO(), cr)
+ if err != nil {
+ log.Printf("failed to update rbstate: %v\n", err)
+ return err
+ }
+
+ return nil
+}
diff --git a/src/monitor/pkg/controller/resourcebundlestate/pod_predicate.go b/src/monitor/pkg/controller/resourcebundlestate/pod_predicate.go
new file mode 100644
index 00000000..f1c1960c
--- /dev/null
+++ b/src/monitor/pkg/controller/resourcebundlestate/pod_predicate.go
@@ -0,0 +1,44 @@
+package resourcebundlestate
+
+import (
+ "sigs.k8s.io/controller-runtime/pkg/event"
+)
+
+type podPredicate struct {
+}
+
+func (p *podPredicate) Create(evt event.CreateEvent) bool {
+
+ if evt.Meta == nil {
+ return false
+ }
+
+ labels := evt.Meta.GetLabels()
+ return checkLabel(labels)
+}
+
+func (p *podPredicate) Delete(evt event.DeleteEvent) bool {
+
+ if evt.Meta == nil {
+ return false
+ }
+
+ labels := evt.Meta.GetLabels()
+ return checkLabel(labels)
+}
+
+func (p *podPredicate) Update(evt event.UpdateEvent) bool {
+
+ if evt.MetaNew == nil {
+ return false
+ }
+
+ labels := evt.MetaNew.GetLabels()
+ return checkLabel(labels)
+}
+
+func (p *podPredicate) Generic(evt event.GenericEvent) bool {
+
+ labels := evt.Meta.GetLabels()
+ return checkLabel(labels)
+}