aboutsummaryrefslogtreecommitdiffstats
path: root/vnfs/DAaaS/microservices/remote-config-operator/pkg/controller/prometheusremoteendpoint/prometheusremoteendpoint_controller.go
diff options
context:
space:
mode:
Diffstat (limited to 'vnfs/DAaaS/microservices/remote-config-operator/pkg/controller/prometheusremoteendpoint/prometheusremoteendpoint_controller.go')
-rw-r--r--vnfs/DAaaS/microservices/remote-config-operator/pkg/controller/prometheusremoteendpoint/prometheusremoteendpoint_controller.go136
1 files changed, 115 insertions, 21 deletions
diff --git a/vnfs/DAaaS/microservices/remote-config-operator/pkg/controller/prometheusremoteendpoint/prometheusremoteendpoint_controller.go b/vnfs/DAaaS/microservices/remote-config-operator/pkg/controller/prometheusremoteendpoint/prometheusremoteendpoint_controller.go
index 2d78a43f..914cb375 100644
--- a/vnfs/DAaaS/microservices/remote-config-operator/pkg/controller/prometheusremoteendpoint/prometheusremoteendpoint_controller.go
+++ b/vnfs/DAaaS/microservices/remote-config-operator/pkg/controller/prometheusremoteendpoint/prometheusremoteendpoint_controller.go
@@ -1,9 +1,12 @@
package prometheusremoteendpoint
import (
+ "bytes"
"context"
"encoding/json"
+ "net/http"
"strconv"
+ "strings"
onapv1alpha1 "remote-config-operator/pkg/apis/onap/v1alpha1"
@@ -79,9 +82,8 @@ type ReconcilePrometheusRemoteEndpoint struct {
scheme *runtime.Scheme
}
-// Reconcile reads that state of the cluster for a PrometheusRemoteEndpoint object and makes changes based on the state read
-// and what is in the PrometheusRemoteEndpoint.Spec
-
+// Reconcile reads that state of the cluster for a PrometheusRemoteEndpoint object
+// and makes changes based on the state read and what is in the PrometheusRemoteEndpoint.Spec
// Note:
// The Controller will requeue the Request to be processed again if the returned error is non-nil or
// Result.Requeue is true, otherwise upon completion it will remove the work from the queue.
@@ -105,8 +107,8 @@ func (r *ReconcilePrometheusRemoteEndpoint) Reconcile(request reconcile.Request)
return reconcile.Result{}, err
}
- isBeingDeleted := checkDeletionTimestamp(reqLogger, instance)
- if isBeingDeleted {
+ // Check if CR is being Deleted
+ if instance.GetDeletionTimestamp() != nil {
//Delete Remote write
if err := r.processDeletionRequest(reqLogger, instance); err != nil {
reqLogger.Error(err, "Error processing deletion request")
@@ -139,7 +141,8 @@ func (r *ReconcilePrometheusRemoteEndpoint) Reconcile(request reconcile.Request)
func (r *ReconcilePrometheusRemoteEndpoint) processPatchRequest(reqLogger logr.Logger, instance *onapv1alpha1.PrometheusRemoteEndpoint) error {
prom := &monitoringv1.Prometheus{}
- if err1 := r.client.Get(context.TODO(), types.NamespacedName{Namespace: instance.Namespace, Name: instance.ObjectMeta.Labels["app"]}, prom); err1 != nil {
+ pName := instance.ObjectMeta.Labels["app"]
+ if err1 := r.client.Get(context.TODO(), types.NamespacedName{Namespace: instance.Namespace, Name: pName}, prom); err1 != nil {
reqLogger.Error(err1, "Error getting prometheus")
return err1
}
@@ -148,13 +151,20 @@ func (r *ReconcilePrometheusRemoteEndpoint) processPatchRequest(reqLogger logr.L
var patch []byte
rws := prom.Spec.RemoteWrite
- adapterURL := instance.Spec.AdapterUrl
+ remoteURL, id, err := getAdapterInfo(instance)
+ instanceKey := types.NamespacedName{Namespace: instance.Namespace, Name: instance.Name}
+ if err != nil {
+ reqLogger.Error(err, "Unable to get adapter url")
+ return err
+ }
isUpdate := false
for i, spec := range rws {
- if spec.URL == instance.Spec.AdapterUrl {
+ // Update event - check the prometheus remote write Url against remoteURL in the Status
+ // to consider the case when URL itself is updated.
+ if spec.URL == instance.Status.RemoteURL {
reqLogger.Info("Remote write already exists, updating it")
- patch, _ = formPatch("replace", strconv.Itoa(i), adapterURL, instance, reqLogger)
+ patch, _ = formPatch("replace", strconv.Itoa(i), remoteURL, instance, reqLogger)
isUpdate = true
break
}
@@ -163,23 +173,21 @@ func (r *ReconcilePrometheusRemoteEndpoint) processPatchRequest(reqLogger logr.L
if !isUpdate {
reqLogger.Info("Remote write does not exist, creating one...")
// rwsLength := len(rws)
- patch, _ = formPatch("add", "-", adapterURL, instance, reqLogger)
+ patch, _ = formPatch("add", "-", remoteURL, instance, reqLogger)
}
patchErr := r.client.Patch(context.TODO(), prom, client.ConstantPatch(types.JSONPatchType, patch))
if patchErr != nil {
reqLogger.Error(patchErr, "Unable to process patch to prometheus")
+ cleanUpExternalResources(instance)
+ r.updateStatus("Error", instanceKey, "", "", "")
return patchErr
}
- reqLogger.Info("Patch merged")
+ r.updateStatus("Enabled", instanceKey, pName, remoteURL, id)
+ reqLogger.V(1).Info("Patch merged")
return nil
}
-func checkDeletionTimestamp(reqlogger logr.Logger, instance *onapv1alpha1.PrometheusRemoteEndpoint) bool {
- isMarkedForDeletion := instance.GetDeletionTimestamp() != nil
- return isMarkedForDeletion
-}
-
func (r *ReconcilePrometheusRemoteEndpoint) processDeletionRequest(reqLogger logr.Logger, instance *onapv1alpha1.PrometheusRemoteEndpoint) error {
prom := &monitoringv1.Prometheus{}
if err := r.client.Get(context.TODO(), types.NamespacedName{Namespace: instance.Namespace, Name: instance.ObjectMeta.Labels["app"]}, prom); err != nil {
@@ -189,13 +197,17 @@ func (r *ReconcilePrometheusRemoteEndpoint) processDeletionRequest(reqLogger log
reqLogger.Info("Found prometheus to update")
var patch []byte
- adapterURL := instance.Spec.AdapterUrl
+ remoteURL, _, err := getAdapterInfo(instance)
+ if err != nil {
+ reqLogger.Error(err, "Unable to get adapter info")
+ return err
+ }
rws := prom.Spec.RemoteWrite
for i, spec := range rws {
- if spec.URL == instance.Spec.AdapterUrl {
+ if spec.URL == remoteURL {
reqLogger.Info("Found remote write to be removed, removing it")
- patch, _ = formPatch("remove", strconv.Itoa(i), adapterURL, instance, reqLogger)
+ patch, _ = formPatch("remove", strconv.Itoa(i), remoteURL, instance, reqLogger)
break
}
}
@@ -204,8 +216,8 @@ func (r *ReconcilePrometheusRemoteEndpoint) processDeletionRequest(reqLogger log
reqLogger.Error(patchErr, "Unable to process patch to prometheus")
return patchErr
}
- reqLogger.Info("Patch merged, remote write removed")
-
+ reqLogger.V(1).Info("Patch merged, remote write removed")
+ cleanUpExternalResources(instance)
//remove Finalizer after deletion
if remoteconfigutils.Contains(instance.GetFinalizers(), remoteconfigutils.RemoteConfigFinalizer) {
if err := removeFinalizer(reqLogger, instance); err != nil {
@@ -252,3 +264,85 @@ func formPatch(method string, index string, adapterURL string, instance *onapv1a
finalMergePatch := append(prependMergePatch, 93)
return finalMergePatch, nil
}
+
+func (r *ReconcilePrometheusRemoteEndpoint) updateStatus(status string, key types.NamespacedName, prom string, remoteURL string, kwid string) error {
+ // Fetch the CollectdGlobal instance
+ instance := &onapv1alpha1.PrometheusRemoteEndpoint{}
+ err := r.client.Get(context.TODO(), key, instance)
+ if err != nil {
+ return err
+ }
+ instance.Status.Status = status
+ instance.Status.PrometheusInstance = prom
+ instance.Status.KafkaWriterID = kwid
+ instance.Status.RemoteURL = remoteURL
+ err = r.client.Status().Update(context.TODO(), instance)
+ return err
+}
+
+func cleanUpExternalResources(instance *onapv1alpha1.PrometheusRemoteEndpoint) {
+ if instance.Spec.Type == "kafka" {
+ deleteKafkaWriter(instance.Spec.AdapterURL + "/pkw/" + instance.Status.KafkaWriterID)
+ }
+}
+
+func getAdapterInfo(instance *onapv1alpha1.PrometheusRemoteEndpoint) (remoteURL string, id string, err error) {
+ switch strings.ToLower(instance.Spec.Type) {
+ case "m3db":
+ return instance.Spec.AdapterURL + "/api/v1/prom/remote/write", "", nil
+ case "kafka":
+ kwid, err := getKafkaWriter(instance)
+ return instance.Spec.AdapterURL + "/pkw/" + kwid + "/receive", kwid, err
+ default:
+ return instance.Spec.AdapterURL, "", nil
+ }
+}
+
+func deleteKafkaWriter(kwURL string) error {
+ client := &http.Client{}
+ req, err := http.NewRequest(http.MethodDelete, kwURL, nil)
+ if err != nil {
+ log.Error(err, "Failed to form delete Kafka Writer request")
+ return err
+ }
+ _, err = client.Do(req)
+ if err != nil {
+ log.Error(err, "Failed to delete Kafka Writer", "Kafka Writer", kwURL)
+ return err
+ }
+ return nil
+}
+
+func getKafkaWriter(instance *onapv1alpha1.PrometheusRemoteEndpoint) (string, error) {
+ // TODO - check update events
+ if instance.Status.KafkaWriterID != "" {
+ return instance.Status.KafkaWriterID, nil
+ }
+ return createKafkaWriter(instance)
+}
+
+func createKafkaWriter(instance *onapv1alpha1.PrometheusRemoteEndpoint) (string, error) {
+
+ log.V(1).Info("Processing Kafka Remote Endpoint", "Kafka Writer Config", instance.Spec)
+ baseURL := instance.Spec.AdapterURL
+ kwc := instance.Spec.KafkaConfig
+ kwURL := baseURL + "/pkw"
+
+ postBody, err := json.Marshal(kwc)
+ if err != nil {
+ log.Error(err, "JSON Marshalling error")
+ return "", err
+ }
+
+ resp, err := http.Post(kwURL, "application/json", bytes.NewBuffer(postBody))
+ if err != nil {
+ log.Error(err, "Failed to create Kafka Writer", "Kafka Writer", kwURL, "Kafka Writer Config", kwc)
+ return "", err
+ }
+ defer resp.Body.Close()
+ var kwid string
+ json.NewDecoder(resp.Body).Decode(&kwid)
+ log.Info("Kafka Writer created", "Kafka Writer Id", kwid)
+
+ return kwid, err
+}