aboutsummaryrefslogtreecommitdiffstats
path: root/vnfs/DAaaS/microservices/remote-config-operator/pkg
diff options
context:
space:
mode:
authorDileep Ranganathan <dileep.ranganathan@intel.com>2019-11-08 11:26:47 -0800
committerDileep Ranganathan <dileep.ranganathan@intel.com>2019-11-13 18:08:55 +0000
commitfb9b7baa506e5c92bc243a30364e9f72ecd9c3f1 (patch)
tree427183f34e0f12b56be05c9175dbdf83bb5a1234 /vnfs/DAaaS/microservices/remote-config-operator/pkg
parent2026cb5283fbc44a4f68641f6e85628381ebda04 (diff)
Remote Write Config and Filter operator
Added support for Kafka Remote writer Issue-ID: ONAPARC-393 Signed-off-by: Dileep Ranganathan <dileep.ranganathan@intel.com> Change-Id: I46555a15b0f326ffcd305d28e82d244c86a34644
Diffstat (limited to 'vnfs/DAaaS/microservices/remote-config-operator/pkg')
-rw-r--r--vnfs/DAaaS/microservices/remote-config-operator/pkg/apis/onap/v1alpha1/prometheusremoteendpoint_types.go23
-rw-r--r--vnfs/DAaaS/microservices/remote-config-operator/pkg/apis/onap/v1alpha1/remotefilteraction_types.go6
-rw-r--r--vnfs/DAaaS/microservices/remote-config-operator/pkg/apis/onap/v1alpha1/zz_generated.deepcopy.go25
-rw-r--r--vnfs/DAaaS/microservices/remote-config-operator/pkg/apis/onap/v1alpha1/zz_generated.openapi.go40
-rw-r--r--vnfs/DAaaS/microservices/remote-config-operator/pkg/controller/prometheusremoteendpoint/prometheusremoteendpoint_controller.go136
5 files changed, 172 insertions, 58 deletions
diff --git a/vnfs/DAaaS/microservices/remote-config-operator/pkg/apis/onap/v1alpha1/prometheusremoteendpoint_types.go b/vnfs/DAaaS/microservices/remote-config-operator/pkg/apis/onap/v1alpha1/prometheusremoteendpoint_types.go
index 71b38dc2..bc633c1a 100644
--- a/vnfs/DAaaS/microservices/remote-config-operator/pkg/apis/onap/v1alpha1/prometheusremoteendpoint_types.go
+++ b/vnfs/DAaaS/microservices/remote-config-operator/pkg/apis/onap/v1alpha1/prometheusremoteendpoint_types.go
@@ -4,15 +4,10 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)
-type Metadata struct {
- Name string `json:"Name"`
- Labels []string `json:"Labels"`
-}
-
// PrometheusRemoteEndpointSpec defines the desired state of PrometheusRemoteEndpoint
// +k8s:openapi-gen=true
type PrometheusRemoteEndpointSpec struct {
- AdapterUrl string `json:"adapterUrl"`
+ AdapterURL string `json:"adapterURL"`
FilterSelector *metav1.LabelSelector `json:"filterSelector,omitempty"`
Type string `json:"type"`
KafkaConfig string `json:"kafkaConfig,omitempty"`
@@ -20,12 +15,16 @@ type PrometheusRemoteEndpointSpec struct {
RemoteTimeout string `json:"remoteTimeout,omitempty"`
}
+// KafkaConfig - defines the desired remote kafka writer configurations
type KafkaConfig struct {
- BrokerUrl string `json:"brokerUrl"`
- Group string `json:"group,omitempty"`
- Topic string `json:"topic"`
+ BrokerURL string `json:"bootstrap.servers"`
+ Topic string `json:"topic"`
+ UsePartition bool `json:"usePartition"`
+ BatchMsgNum int `json:"batch.num.messages,omitempty`
+ Compression string `json:"compression.codec,omitempty`
}
+// QueueConfig - defines the prometheus remote write queue configurations
type QueueConfig struct {
BatchSendDeadline string `json:"batchSendDeadline,omitempty"`
Capacity string `json:"capacity,omitempty"`
@@ -39,10 +38,16 @@ type QueueConfig struct {
// PrometheusRemoteEndpointStatus defines the observed state of PrometheusRemoteEndpoint
// +k8s:openapi-gen=true
+// +kubebuilder:subresource:status
type PrometheusRemoteEndpointStatus struct {
// INSERT ADDITIONAL STATUS FIELD - define observed state of cluster
// Important: Run "operator-sdk generate k8s" to regenerate code after modifying this file
// Add custom validation using kubebuilder tags: https://book-v1.book.kubebuilder.io/beyond_basics/generating_crd.html
+ // Status can be Error, Enabled
+ PrometheusInstance string `json:"prometheusInstance,omitempty"`
+ Status string `json:"status"`
+ KafkaWriterID string `json:"kafkaWriterID,omitempty"`
+ RemoteURL string `json:"remoteURL,omitempty"`
}
// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object
diff --git a/vnfs/DAaaS/microservices/remote-config-operator/pkg/apis/onap/v1alpha1/remotefilteraction_types.go b/vnfs/DAaaS/microservices/remote-config-operator/pkg/apis/onap/v1alpha1/remotefilteraction_types.go
index 18b3dc6d..4c09698c 100644
--- a/vnfs/DAaaS/microservices/remote-config-operator/pkg/apis/onap/v1alpha1/remotefilteraction_types.go
+++ b/vnfs/DAaaS/microservices/remote-config-operator/pkg/apis/onap/v1alpha1/remotefilteraction_types.go
@@ -7,8 +7,10 @@ import (
// RemoteFilterActionSpec defines the desired state of RemoteFilterAction
// +k8s:openapi-gen=true
type RemoteFilterActionSpec struct {
- Action string `json:"action,omitempty"`
- Regex string `json:"regex,omitempty"`
+ Action string `json:"action,omitempty"`
+ Regex string `json:"regex,omitempty"`
+ // SourceLabels are the labels of the each metric
+ // +listType=set
SourceLabels []string `json:"sourceLabels,omitempty"`
TargetLabel string `json:"targetLabel,omitempty"`
Replacement string `json:"replacement,omitempty"`
diff --git a/vnfs/DAaaS/microservices/remote-config-operator/pkg/apis/onap/v1alpha1/zz_generated.deepcopy.go b/vnfs/DAaaS/microservices/remote-config-operator/pkg/apis/onap/v1alpha1/zz_generated.deepcopy.go
index 9fb59405..15aef9f7 100644
--- a/vnfs/DAaaS/microservices/remote-config-operator/pkg/apis/onap/v1alpha1/zz_generated.deepcopy.go
+++ b/vnfs/DAaaS/microservices/remote-config-operator/pkg/apis/onap/v1alpha1/zz_generated.deepcopy.go
@@ -26,27 +26,6 @@ func (in *KafkaConfig) DeepCopy() *KafkaConfig {
}
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
-func (in *Metadata) DeepCopyInto(out *Metadata) {
- *out = *in
- if in.Labels != nil {
- in, out := &in.Labels, &out.Labels
- *out = make([]string, len(*in))
- copy(*out, *in)
- }
- return
-}
-
-// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new Metadata.
-func (in *Metadata) DeepCopy() *Metadata {
- if in == nil {
- return nil
- }
- out := new(Metadata)
- in.DeepCopyInto(out)
- return out
-}
-
-// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *PrometheusRemoteEndpoint) DeepCopyInto(out *PrometheusRemoteEndpoint) {
*out = *in
out.TypeMeta = in.TypeMeta
@@ -78,7 +57,7 @@ func (in *PrometheusRemoteEndpoint) DeepCopyObject() runtime.Object {
func (in *PrometheusRemoteEndpointList) DeepCopyInto(out *PrometheusRemoteEndpointList) {
*out = *in
out.TypeMeta = in.TypeMeta
- out.ListMeta = in.ListMeta
+ in.ListMeta.DeepCopyInto(&out.ListMeta)
if in.Items != nil {
in, out := &in.Items, &out.Items
*out = make([]PrometheusRemoteEndpoint, len(*in))
@@ -193,7 +172,7 @@ func (in *RemoteFilterAction) DeepCopyObject() runtime.Object {
func (in *RemoteFilterActionList) DeepCopyInto(out *RemoteFilterActionList) {
*out = *in
out.TypeMeta = in.TypeMeta
- out.ListMeta = in.ListMeta
+ in.ListMeta.DeepCopyInto(&out.ListMeta)
if in.Items != nil {
in, out := &in.Items, &out.Items
*out = make([]RemoteFilterAction, len(*in))
diff --git a/vnfs/DAaaS/microservices/remote-config-operator/pkg/apis/onap/v1alpha1/zz_generated.openapi.go b/vnfs/DAaaS/microservices/remote-config-operator/pkg/apis/onap/v1alpha1/zz_generated.openapi.go
index ca4c9511..3dba690e 100644
--- a/vnfs/DAaaS/microservices/remote-config-operator/pkg/apis/onap/v1alpha1/zz_generated.openapi.go
+++ b/vnfs/DAaaS/microservices/remote-config-operator/pkg/apis/onap/v1alpha1/zz_generated.openapi.go
@@ -71,7 +71,7 @@ func schema_pkg_apis_onap_v1alpha1_PrometheusRemoteEndpointSpec(ref common.Refer
Description: "PrometheusRemoteEndpointSpec defines the desired state of PrometheusRemoteEndpoint",
Type: []string{"object"},
Properties: map[string]spec.Schema{
- "adapterUrl": {
+ "adapterURL": {
SchemaProps: spec.SchemaProps{
Type: []string{"string"},
Format: "",
@@ -106,7 +106,7 @@ func schema_pkg_apis_onap_v1alpha1_PrometheusRemoteEndpointSpec(ref common.Refer
},
},
},
- Required: []string{"adapterUrl", "type"},
+ Required: []string{"adapterURL", "type"},
},
},
Dependencies: []string{
@@ -120,6 +120,34 @@ func schema_pkg_apis_onap_v1alpha1_PrometheusRemoteEndpointStatus(ref common.Ref
SchemaProps: spec.SchemaProps{
Description: "PrometheusRemoteEndpointStatus defines the observed state of PrometheusRemoteEndpoint",
Type: []string{"object"},
+ Properties: map[string]spec.Schema{
+ "prometheusInstance": {
+ SchemaProps: spec.SchemaProps{
+ Description: "INSERT ADDITIONAL STATUS FIELD - define observed state of cluster Important: Run \"operator-sdk generate k8s\" to regenerate code after modifying this file Add custom validation using kubebuilder tags: https://book-v1.book.kubebuilder.io/beyond_basics/generating_crd.html Status can be Error, Enabled",
+ Type: []string{"string"},
+ Format: "",
+ },
+ },
+ "status": {
+ SchemaProps: spec.SchemaProps{
+ Type: []string{"string"},
+ Format: "",
+ },
+ },
+ "kafkaWriterID": {
+ SchemaProps: spec.SchemaProps{
+ Type: []string{"string"},
+ Format: "",
+ },
+ },
+ "remoteURL": {
+ SchemaProps: spec.SchemaProps{
+ Type: []string{"string"},
+ Format: "",
+ },
+ },
+ },
+ Required: []string{"status"},
},
},
}
@@ -189,8 +217,14 @@ func schema_pkg_apis_onap_v1alpha1_RemoteFilterActionSpec(ref common.ReferenceCa
},
},
"sourceLabels": {
+ VendorExtensible: spec.VendorExtensible{
+ Extensions: spec.Extensions{
+ "x-kubernetes-list-type": "set",
+ },
+ },
SchemaProps: spec.SchemaProps{
- Type: []string{"array"},
+ Description: "SourceLabels are the labels of the each metric",
+ Type: []string{"array"},
Items: &spec.SchemaOrArray{
Schema: &spec.Schema{
SchemaProps: spec.SchemaProps{
diff --git a/vnfs/DAaaS/microservices/remote-config-operator/pkg/controller/prometheusremoteendpoint/prometheusremoteendpoint_controller.go b/vnfs/DAaaS/microservices/remote-config-operator/pkg/controller/prometheusremoteendpoint/prometheusremoteendpoint_controller.go
index 2d78a43f..914cb375 100644
--- a/vnfs/DAaaS/microservices/remote-config-operator/pkg/controller/prometheusremoteendpoint/prometheusremoteendpoint_controller.go
+++ b/vnfs/DAaaS/microservices/remote-config-operator/pkg/controller/prometheusremoteendpoint/prometheusremoteendpoint_controller.go
@@ -1,9 +1,12 @@
package prometheusremoteendpoint
import (
+ "bytes"
"context"
"encoding/json"
+ "net/http"
"strconv"
+ "strings"
onapv1alpha1 "remote-config-operator/pkg/apis/onap/v1alpha1"
@@ -79,9 +82,8 @@ type ReconcilePrometheusRemoteEndpoint struct {
scheme *runtime.Scheme
}
-// Reconcile reads that state of the cluster for a PrometheusRemoteEndpoint object and makes changes based on the state read
-// and what is in the PrometheusRemoteEndpoint.Spec
-
+// Reconcile reads that state of the cluster for a PrometheusRemoteEndpoint object
+// and makes changes based on the state read and what is in the PrometheusRemoteEndpoint.Spec
// Note:
// The Controller will requeue the Request to be processed again if the returned error is non-nil or
// Result.Requeue is true, otherwise upon completion it will remove the work from the queue.
@@ -105,8 +107,8 @@ func (r *ReconcilePrometheusRemoteEndpoint) Reconcile(request reconcile.Request)
return reconcile.Result{}, err
}
- isBeingDeleted := checkDeletionTimestamp(reqLogger, instance)
- if isBeingDeleted {
+ // Check if CR is being Deleted
+ if instance.GetDeletionTimestamp() != nil {
//Delete Remote write
if err := r.processDeletionRequest(reqLogger, instance); err != nil {
reqLogger.Error(err, "Error processing deletion request")
@@ -139,7 +141,8 @@ func (r *ReconcilePrometheusRemoteEndpoint) Reconcile(request reconcile.Request)
func (r *ReconcilePrometheusRemoteEndpoint) processPatchRequest(reqLogger logr.Logger, instance *onapv1alpha1.PrometheusRemoteEndpoint) error {
prom := &monitoringv1.Prometheus{}
- if err1 := r.client.Get(context.TODO(), types.NamespacedName{Namespace: instance.Namespace, Name: instance.ObjectMeta.Labels["app"]}, prom); err1 != nil {
+ pName := instance.ObjectMeta.Labels["app"]
+ if err1 := r.client.Get(context.TODO(), types.NamespacedName{Namespace: instance.Namespace, Name: pName}, prom); err1 != nil {
reqLogger.Error(err1, "Error getting prometheus")
return err1
}
@@ -148,13 +151,20 @@ func (r *ReconcilePrometheusRemoteEndpoint) processPatchRequest(reqLogger logr.L
var patch []byte
rws := prom.Spec.RemoteWrite
- adapterURL := instance.Spec.AdapterUrl
+ remoteURL, id, err := getAdapterInfo(instance)
+ instanceKey := types.NamespacedName{Namespace: instance.Namespace, Name: instance.Name}
+ if err != nil {
+ reqLogger.Error(err, "Unable to get adapter url")
+ return err
+ }
isUpdate := false
for i, spec := range rws {
- if spec.URL == instance.Spec.AdapterUrl {
+ // Update event - check the prometheus remote write Url against remoteURL in the Status
+ // to consider the case when URL itself is updated.
+ if spec.URL == instance.Status.RemoteURL {
reqLogger.Info("Remote write already exists, updating it")
- patch, _ = formPatch("replace", strconv.Itoa(i), adapterURL, instance, reqLogger)
+ patch, _ = formPatch("replace", strconv.Itoa(i), remoteURL, instance, reqLogger)
isUpdate = true
break
}
@@ -163,23 +173,21 @@ func (r *ReconcilePrometheusRemoteEndpoint) processPatchRequest(reqLogger logr.L
if !isUpdate {
reqLogger.Info("Remote write does not exist, creating one...")
// rwsLength := len(rws)
- patch, _ = formPatch("add", "-", adapterURL, instance, reqLogger)
+ patch, _ = formPatch("add", "-", remoteURL, instance, reqLogger)
}
patchErr := r.client.Patch(context.TODO(), prom, client.ConstantPatch(types.JSONPatchType, patch))
if patchErr != nil {
reqLogger.Error(patchErr, "Unable to process patch to prometheus")
+ cleanUpExternalResources(instance)
+ r.updateStatus("Error", instanceKey, "", "", "")
return patchErr
}
- reqLogger.Info("Patch merged")
+ r.updateStatus("Enabled", instanceKey, pName, remoteURL, id)
+ reqLogger.V(1).Info("Patch merged")
return nil
}
-func checkDeletionTimestamp(reqlogger logr.Logger, instance *onapv1alpha1.PrometheusRemoteEndpoint) bool {
- isMarkedForDeletion := instance.GetDeletionTimestamp() != nil
- return isMarkedForDeletion
-}
-
func (r *ReconcilePrometheusRemoteEndpoint) processDeletionRequest(reqLogger logr.Logger, instance *onapv1alpha1.PrometheusRemoteEndpoint) error {
prom := &monitoringv1.Prometheus{}
if err := r.client.Get(context.TODO(), types.NamespacedName{Namespace: instance.Namespace, Name: instance.ObjectMeta.Labels["app"]}, prom); err != nil {
@@ -189,13 +197,17 @@ func (r *ReconcilePrometheusRemoteEndpoint) processDeletionRequest(reqLogger log
reqLogger.Info("Found prometheus to update")
var patch []byte
- adapterURL := instance.Spec.AdapterUrl
+ remoteURL, _, err := getAdapterInfo(instance)
+ if err != nil {
+ reqLogger.Error(err, "Unable to get adapter info")
+ return err
+ }
rws := prom.Spec.RemoteWrite
for i, spec := range rws {
- if spec.URL == instance.Spec.AdapterUrl {
+ if spec.URL == remoteURL {
reqLogger.Info("Found remote write to be removed, removing it")
- patch, _ = formPatch("remove", strconv.Itoa(i), adapterURL, instance, reqLogger)
+ patch, _ = formPatch("remove", strconv.Itoa(i), remoteURL, instance, reqLogger)
break
}
}
@@ -204,8 +216,8 @@ func (r *ReconcilePrometheusRemoteEndpoint) processDeletionRequest(reqLogger log
reqLogger.Error(patchErr, "Unable to process patch to prometheus")
return patchErr
}
- reqLogger.Info("Patch merged, remote write removed")
-
+ reqLogger.V(1).Info("Patch merged, remote write removed")
+ cleanUpExternalResources(instance)
//remove Finalizer after deletion
if remoteconfigutils.Contains(instance.GetFinalizers(), remoteconfigutils.RemoteConfigFinalizer) {
if err := removeFinalizer(reqLogger, instance); err != nil {
@@ -252,3 +264,85 @@ func formPatch(method string, index string, adapterURL string, instance *onapv1a
finalMergePatch := append(prependMergePatch, 93)
return finalMergePatch, nil
}
+
+func (r *ReconcilePrometheusRemoteEndpoint) updateStatus(status string, key types.NamespacedName, prom string, remoteURL string, kwid string) error {
+ // Fetch the CollectdGlobal instance
+ instance := &onapv1alpha1.PrometheusRemoteEndpoint{}
+ err := r.client.Get(context.TODO(), key, instance)
+ if err != nil {
+ return err
+ }
+ instance.Status.Status = status
+ instance.Status.PrometheusInstance = prom
+ instance.Status.KafkaWriterID = kwid
+ instance.Status.RemoteURL = remoteURL
+ err = r.client.Status().Update(context.TODO(), instance)
+ return err
+}
+
+func cleanUpExternalResources(instance *onapv1alpha1.PrometheusRemoteEndpoint) {
+ if instance.Spec.Type == "kafka" {
+ deleteKafkaWriter(instance.Spec.AdapterURL + "/pkw/" + instance.Status.KafkaWriterID)
+ }
+}
+
+func getAdapterInfo(instance *onapv1alpha1.PrometheusRemoteEndpoint) (remoteURL string, id string, err error) {
+ switch strings.ToLower(instance.Spec.Type) {
+ case "m3db":
+ return instance.Spec.AdapterURL + "/api/v1/prom/remote/write", "", nil
+ case "kafka":
+ kwid, err := getKafkaWriter(instance)
+ return instance.Spec.AdapterURL + "/pkw/" + kwid + "/receive", kwid, err
+ default:
+ return instance.Spec.AdapterURL, "", nil
+ }
+}
+
+func deleteKafkaWriter(kwURL string) error {
+ client := &http.Client{}
+ req, err := http.NewRequest(http.MethodDelete, kwURL, nil)
+ if err != nil {
+ log.Error(err, "Failed to form delete Kafka Writer request")
+ return err
+ }
+ _, err = client.Do(req)
+ if err != nil {
+ log.Error(err, "Failed to delete Kafka Writer", "Kafka Writer", kwURL)
+ return err
+ }
+ return nil
+}
+
+func getKafkaWriter(instance *onapv1alpha1.PrometheusRemoteEndpoint) (string, error) {
+ // TODO - check update events
+ if instance.Status.KafkaWriterID != "" {
+ return instance.Status.KafkaWriterID, nil
+ }
+ return createKafkaWriter(instance)
+}
+
+func createKafkaWriter(instance *onapv1alpha1.PrometheusRemoteEndpoint) (string, error) {
+
+ log.V(1).Info("Processing Kafka Remote Endpoint", "Kafka Writer Config", instance.Spec)
+ baseURL := instance.Spec.AdapterURL
+ kwc := instance.Spec.KafkaConfig
+ kwURL := baseURL + "/pkw"
+
+ postBody, err := json.Marshal(kwc)
+ if err != nil {
+ log.Error(err, "JSON Marshalling error")
+ return "", err
+ }
+
+ resp, err := http.Post(kwURL, "application/json", bytes.NewBuffer(postBody))
+ if err != nil {
+ log.Error(err, "Failed to create Kafka Writer", "Kafka Writer", kwURL, "Kafka Writer Config", kwc)
+ return "", err
+ }
+ defer resp.Body.Close()
+ var kwid string
+ json.NewDecoder(resp.Body).Decode(&kwid)
+ log.Info("Kafka Writer created", "Kafka Writer Id", kwid)
+
+ return kwid, err
+}