diff options
author | Dileep Ranganathan <dileep.ranganathan@intel.com> | 2019-11-08 11:26:47 -0800 |
---|---|---|
committer | Dileep Ranganathan <dileep.ranganathan@intel.com> | 2019-11-13 18:08:55 +0000 |
commit | fb9b7baa506e5c92bc243a30364e9f72ecd9c3f1 (patch) | |
tree | 427183f34e0f12b56be05c9175dbdf83bb5a1234 /vnfs/DAaaS/microservices/remote-config-operator/pkg | |
parent | 2026cb5283fbc44a4f68641f6e85628381ebda04 (diff) |
Remote Write Config and Filter operator
Added support for Kafka Remote writer
Issue-ID: ONAPARC-393
Signed-off-by: Dileep Ranganathan <dileep.ranganathan@intel.com>
Change-Id: I46555a15b0f326ffcd305d28e82d244c86a34644
Diffstat (limited to 'vnfs/DAaaS/microservices/remote-config-operator/pkg')
5 files changed, 172 insertions, 58 deletions
diff --git a/vnfs/DAaaS/microservices/remote-config-operator/pkg/apis/onap/v1alpha1/prometheusremoteendpoint_types.go b/vnfs/DAaaS/microservices/remote-config-operator/pkg/apis/onap/v1alpha1/prometheusremoteendpoint_types.go index 71b38dc2..bc633c1a 100644 --- a/vnfs/DAaaS/microservices/remote-config-operator/pkg/apis/onap/v1alpha1/prometheusremoteendpoint_types.go +++ b/vnfs/DAaaS/microservices/remote-config-operator/pkg/apis/onap/v1alpha1/prometheusremoteendpoint_types.go @@ -4,15 +4,10 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) -type Metadata struct { - Name string `json:"Name"` - Labels []string `json:"Labels"` -} - // PrometheusRemoteEndpointSpec defines the desired state of PrometheusRemoteEndpoint // +k8s:openapi-gen=true type PrometheusRemoteEndpointSpec struct { - AdapterUrl string `json:"adapterUrl"` + AdapterURL string `json:"adapterURL"` FilterSelector *metav1.LabelSelector `json:"filterSelector,omitempty"` Type string `json:"type"` KafkaConfig string `json:"kafkaConfig,omitempty"` @@ -20,12 +15,16 @@ type PrometheusRemoteEndpointSpec struct { RemoteTimeout string `json:"remoteTimeout,omitempty"` } +// KafkaConfig - defines the desired remote kafka writer configurations type KafkaConfig struct { - BrokerUrl string `json:"brokerUrl"` - Group string `json:"group,omitempty"` - Topic string `json:"topic"` + BrokerURL string `json:"bootstrap.servers"` + Topic string `json:"topic"` + UsePartition bool `json:"usePartition"` + BatchMsgNum int `json:"batch.num.messages,omitempty` + Compression string `json:"compression.codec,omitempty` } +// QueueConfig - defines the prometheus remote write queue configurations type QueueConfig struct { BatchSendDeadline string `json:"batchSendDeadline,omitempty"` Capacity string `json:"capacity,omitempty"` @@ -39,10 +38,16 @@ type QueueConfig struct { // PrometheusRemoteEndpointStatus defines the observed state of PrometheusRemoteEndpoint // +k8s:openapi-gen=true +// +kubebuilder:subresource:status type PrometheusRemoteEndpointStatus struct { // INSERT ADDITIONAL STATUS FIELD - define observed state of cluster // Important: Run "operator-sdk generate k8s" to regenerate code after modifying this file // Add custom validation using kubebuilder tags: https://book-v1.book.kubebuilder.io/beyond_basics/generating_crd.html + // Status can be Error, Enabled + PrometheusInstance string `json:"prometheusInstance,omitempty"` + Status string `json:"status"` + KafkaWriterID string `json:"kafkaWriterID,omitempty"` + RemoteURL string `json:"remoteURL,omitempty"` } // +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object diff --git a/vnfs/DAaaS/microservices/remote-config-operator/pkg/apis/onap/v1alpha1/remotefilteraction_types.go b/vnfs/DAaaS/microservices/remote-config-operator/pkg/apis/onap/v1alpha1/remotefilteraction_types.go index 18b3dc6d..4c09698c 100644 --- a/vnfs/DAaaS/microservices/remote-config-operator/pkg/apis/onap/v1alpha1/remotefilteraction_types.go +++ b/vnfs/DAaaS/microservices/remote-config-operator/pkg/apis/onap/v1alpha1/remotefilteraction_types.go @@ -7,8 +7,10 @@ import ( // RemoteFilterActionSpec defines the desired state of RemoteFilterAction // +k8s:openapi-gen=true type RemoteFilterActionSpec struct { - Action string `json:"action,omitempty"` - Regex string `json:"regex,omitempty"` + Action string `json:"action,omitempty"` + Regex string `json:"regex,omitempty"` + // SourceLabels are the labels of the each metric + // +listType=set SourceLabels []string `json:"sourceLabels,omitempty"` TargetLabel string `json:"targetLabel,omitempty"` Replacement string `json:"replacement,omitempty"` diff --git a/vnfs/DAaaS/microservices/remote-config-operator/pkg/apis/onap/v1alpha1/zz_generated.deepcopy.go b/vnfs/DAaaS/microservices/remote-config-operator/pkg/apis/onap/v1alpha1/zz_generated.deepcopy.go index 9fb59405..15aef9f7 100644 --- a/vnfs/DAaaS/microservices/remote-config-operator/pkg/apis/onap/v1alpha1/zz_generated.deepcopy.go +++ b/vnfs/DAaaS/microservices/remote-config-operator/pkg/apis/onap/v1alpha1/zz_generated.deepcopy.go @@ -26,27 +26,6 @@ func (in *KafkaConfig) DeepCopy() *KafkaConfig { } // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. -func (in *Metadata) DeepCopyInto(out *Metadata) { - *out = *in - if in.Labels != nil { - in, out := &in.Labels, &out.Labels - *out = make([]string, len(*in)) - copy(*out, *in) - } - return -} - -// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new Metadata. -func (in *Metadata) DeepCopy() *Metadata { - if in == nil { - return nil - } - out := new(Metadata) - in.DeepCopyInto(out) - return out -} - -// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *PrometheusRemoteEndpoint) DeepCopyInto(out *PrometheusRemoteEndpoint) { *out = *in out.TypeMeta = in.TypeMeta @@ -78,7 +57,7 @@ func (in *PrometheusRemoteEndpoint) DeepCopyObject() runtime.Object { func (in *PrometheusRemoteEndpointList) DeepCopyInto(out *PrometheusRemoteEndpointList) { *out = *in out.TypeMeta = in.TypeMeta - out.ListMeta = in.ListMeta + in.ListMeta.DeepCopyInto(&out.ListMeta) if in.Items != nil { in, out := &in.Items, &out.Items *out = make([]PrometheusRemoteEndpoint, len(*in)) @@ -193,7 +172,7 @@ func (in *RemoteFilterAction) DeepCopyObject() runtime.Object { func (in *RemoteFilterActionList) DeepCopyInto(out *RemoteFilterActionList) { *out = *in out.TypeMeta = in.TypeMeta - out.ListMeta = in.ListMeta + in.ListMeta.DeepCopyInto(&out.ListMeta) if in.Items != nil { in, out := &in.Items, &out.Items *out = make([]RemoteFilterAction, len(*in)) diff --git a/vnfs/DAaaS/microservices/remote-config-operator/pkg/apis/onap/v1alpha1/zz_generated.openapi.go b/vnfs/DAaaS/microservices/remote-config-operator/pkg/apis/onap/v1alpha1/zz_generated.openapi.go index ca4c9511..3dba690e 100644 --- a/vnfs/DAaaS/microservices/remote-config-operator/pkg/apis/onap/v1alpha1/zz_generated.openapi.go +++ b/vnfs/DAaaS/microservices/remote-config-operator/pkg/apis/onap/v1alpha1/zz_generated.openapi.go @@ -71,7 +71,7 @@ func schema_pkg_apis_onap_v1alpha1_PrometheusRemoteEndpointSpec(ref common.Refer Description: "PrometheusRemoteEndpointSpec defines the desired state of PrometheusRemoteEndpoint", Type: []string{"object"}, Properties: map[string]spec.Schema{ - "adapterUrl": { + "adapterURL": { SchemaProps: spec.SchemaProps{ Type: []string{"string"}, Format: "", @@ -106,7 +106,7 @@ func schema_pkg_apis_onap_v1alpha1_PrometheusRemoteEndpointSpec(ref common.Refer }, }, }, - Required: []string{"adapterUrl", "type"}, + Required: []string{"adapterURL", "type"}, }, }, Dependencies: []string{ @@ -120,6 +120,34 @@ func schema_pkg_apis_onap_v1alpha1_PrometheusRemoteEndpointStatus(ref common.Ref SchemaProps: spec.SchemaProps{ Description: "PrometheusRemoteEndpointStatus defines the observed state of PrometheusRemoteEndpoint", Type: []string{"object"}, + Properties: map[string]spec.Schema{ + "prometheusInstance": { + SchemaProps: spec.SchemaProps{ + Description: "INSERT ADDITIONAL STATUS FIELD - define observed state of cluster Important: Run \"operator-sdk generate k8s\" to regenerate code after modifying this file Add custom validation using kubebuilder tags: https://book-v1.book.kubebuilder.io/beyond_basics/generating_crd.html Status can be Error, Enabled", + Type: []string{"string"}, + Format: "", + }, + }, + "status": { + SchemaProps: spec.SchemaProps{ + Type: []string{"string"}, + Format: "", + }, + }, + "kafkaWriterID": { + SchemaProps: spec.SchemaProps{ + Type: []string{"string"}, + Format: "", + }, + }, + "remoteURL": { + SchemaProps: spec.SchemaProps{ + Type: []string{"string"}, + Format: "", + }, + }, + }, + Required: []string{"status"}, }, }, } @@ -189,8 +217,14 @@ func schema_pkg_apis_onap_v1alpha1_RemoteFilterActionSpec(ref common.ReferenceCa }, }, "sourceLabels": { + VendorExtensible: spec.VendorExtensible{ + Extensions: spec.Extensions{ + "x-kubernetes-list-type": "set", + }, + }, SchemaProps: spec.SchemaProps{ - Type: []string{"array"}, + Description: "SourceLabels are the labels of the each metric", + Type: []string{"array"}, Items: &spec.SchemaOrArray{ Schema: &spec.Schema{ SchemaProps: spec.SchemaProps{ diff --git a/vnfs/DAaaS/microservices/remote-config-operator/pkg/controller/prometheusremoteendpoint/prometheusremoteendpoint_controller.go b/vnfs/DAaaS/microservices/remote-config-operator/pkg/controller/prometheusremoteendpoint/prometheusremoteendpoint_controller.go index 2d78a43f..914cb375 100644 --- a/vnfs/DAaaS/microservices/remote-config-operator/pkg/controller/prometheusremoteendpoint/prometheusremoteendpoint_controller.go +++ b/vnfs/DAaaS/microservices/remote-config-operator/pkg/controller/prometheusremoteendpoint/prometheusremoteendpoint_controller.go @@ -1,9 +1,12 @@ package prometheusremoteendpoint import ( + "bytes" "context" "encoding/json" + "net/http" "strconv" + "strings" onapv1alpha1 "remote-config-operator/pkg/apis/onap/v1alpha1" @@ -79,9 +82,8 @@ type ReconcilePrometheusRemoteEndpoint struct { scheme *runtime.Scheme } -// Reconcile reads that state of the cluster for a PrometheusRemoteEndpoint object and makes changes based on the state read -// and what is in the PrometheusRemoteEndpoint.Spec - +// Reconcile reads that state of the cluster for a PrometheusRemoteEndpoint object +// and makes changes based on the state read and what is in the PrometheusRemoteEndpoint.Spec // Note: // The Controller will requeue the Request to be processed again if the returned error is non-nil or // Result.Requeue is true, otherwise upon completion it will remove the work from the queue. @@ -105,8 +107,8 @@ func (r *ReconcilePrometheusRemoteEndpoint) Reconcile(request reconcile.Request) return reconcile.Result{}, err } - isBeingDeleted := checkDeletionTimestamp(reqLogger, instance) - if isBeingDeleted { + // Check if CR is being Deleted + if instance.GetDeletionTimestamp() != nil { //Delete Remote write if err := r.processDeletionRequest(reqLogger, instance); err != nil { reqLogger.Error(err, "Error processing deletion request") @@ -139,7 +141,8 @@ func (r *ReconcilePrometheusRemoteEndpoint) Reconcile(request reconcile.Request) func (r *ReconcilePrometheusRemoteEndpoint) processPatchRequest(reqLogger logr.Logger, instance *onapv1alpha1.PrometheusRemoteEndpoint) error { prom := &monitoringv1.Prometheus{} - if err1 := r.client.Get(context.TODO(), types.NamespacedName{Namespace: instance.Namespace, Name: instance.ObjectMeta.Labels["app"]}, prom); err1 != nil { + pName := instance.ObjectMeta.Labels["app"] + if err1 := r.client.Get(context.TODO(), types.NamespacedName{Namespace: instance.Namespace, Name: pName}, prom); err1 != nil { reqLogger.Error(err1, "Error getting prometheus") return err1 } @@ -148,13 +151,20 @@ func (r *ReconcilePrometheusRemoteEndpoint) processPatchRequest(reqLogger logr.L var patch []byte rws := prom.Spec.RemoteWrite - adapterURL := instance.Spec.AdapterUrl + remoteURL, id, err := getAdapterInfo(instance) + instanceKey := types.NamespacedName{Namespace: instance.Namespace, Name: instance.Name} + if err != nil { + reqLogger.Error(err, "Unable to get adapter url") + return err + } isUpdate := false for i, spec := range rws { - if spec.URL == instance.Spec.AdapterUrl { + // Update event - check the prometheus remote write Url against remoteURL in the Status + // to consider the case when URL itself is updated. + if spec.URL == instance.Status.RemoteURL { reqLogger.Info("Remote write already exists, updating it") - patch, _ = formPatch("replace", strconv.Itoa(i), adapterURL, instance, reqLogger) + patch, _ = formPatch("replace", strconv.Itoa(i), remoteURL, instance, reqLogger) isUpdate = true break } @@ -163,23 +173,21 @@ func (r *ReconcilePrometheusRemoteEndpoint) processPatchRequest(reqLogger logr.L if !isUpdate { reqLogger.Info("Remote write does not exist, creating one...") // rwsLength := len(rws) - patch, _ = formPatch("add", "-", adapterURL, instance, reqLogger) + patch, _ = formPatch("add", "-", remoteURL, instance, reqLogger) } patchErr := r.client.Patch(context.TODO(), prom, client.ConstantPatch(types.JSONPatchType, patch)) if patchErr != nil { reqLogger.Error(patchErr, "Unable to process patch to prometheus") + cleanUpExternalResources(instance) + r.updateStatus("Error", instanceKey, "", "", "") return patchErr } - reqLogger.Info("Patch merged") + r.updateStatus("Enabled", instanceKey, pName, remoteURL, id) + reqLogger.V(1).Info("Patch merged") return nil } -func checkDeletionTimestamp(reqlogger logr.Logger, instance *onapv1alpha1.PrometheusRemoteEndpoint) bool { - isMarkedForDeletion := instance.GetDeletionTimestamp() != nil - return isMarkedForDeletion -} - func (r *ReconcilePrometheusRemoteEndpoint) processDeletionRequest(reqLogger logr.Logger, instance *onapv1alpha1.PrometheusRemoteEndpoint) error { prom := &monitoringv1.Prometheus{} if err := r.client.Get(context.TODO(), types.NamespacedName{Namespace: instance.Namespace, Name: instance.ObjectMeta.Labels["app"]}, prom); err != nil { @@ -189,13 +197,17 @@ func (r *ReconcilePrometheusRemoteEndpoint) processDeletionRequest(reqLogger log reqLogger.Info("Found prometheus to update") var patch []byte - adapterURL := instance.Spec.AdapterUrl + remoteURL, _, err := getAdapterInfo(instance) + if err != nil { + reqLogger.Error(err, "Unable to get adapter info") + return err + } rws := prom.Spec.RemoteWrite for i, spec := range rws { - if spec.URL == instance.Spec.AdapterUrl { + if spec.URL == remoteURL { reqLogger.Info("Found remote write to be removed, removing it") - patch, _ = formPatch("remove", strconv.Itoa(i), adapterURL, instance, reqLogger) + patch, _ = formPatch("remove", strconv.Itoa(i), remoteURL, instance, reqLogger) break } } @@ -204,8 +216,8 @@ func (r *ReconcilePrometheusRemoteEndpoint) processDeletionRequest(reqLogger log reqLogger.Error(patchErr, "Unable to process patch to prometheus") return patchErr } - reqLogger.Info("Patch merged, remote write removed") - + reqLogger.V(1).Info("Patch merged, remote write removed") + cleanUpExternalResources(instance) //remove Finalizer after deletion if remoteconfigutils.Contains(instance.GetFinalizers(), remoteconfigutils.RemoteConfigFinalizer) { if err := removeFinalizer(reqLogger, instance); err != nil { @@ -252,3 +264,85 @@ func formPatch(method string, index string, adapterURL string, instance *onapv1a finalMergePatch := append(prependMergePatch, 93) return finalMergePatch, nil } + +func (r *ReconcilePrometheusRemoteEndpoint) updateStatus(status string, key types.NamespacedName, prom string, remoteURL string, kwid string) error { + // Fetch the CollectdGlobal instance + instance := &onapv1alpha1.PrometheusRemoteEndpoint{} + err := r.client.Get(context.TODO(), key, instance) + if err != nil { + return err + } + instance.Status.Status = status + instance.Status.PrometheusInstance = prom + instance.Status.KafkaWriterID = kwid + instance.Status.RemoteURL = remoteURL + err = r.client.Status().Update(context.TODO(), instance) + return err +} + +func cleanUpExternalResources(instance *onapv1alpha1.PrometheusRemoteEndpoint) { + if instance.Spec.Type == "kafka" { + deleteKafkaWriter(instance.Spec.AdapterURL + "/pkw/" + instance.Status.KafkaWriterID) + } +} + +func getAdapterInfo(instance *onapv1alpha1.PrometheusRemoteEndpoint) (remoteURL string, id string, err error) { + switch strings.ToLower(instance.Spec.Type) { + case "m3db": + return instance.Spec.AdapterURL + "/api/v1/prom/remote/write", "", nil + case "kafka": + kwid, err := getKafkaWriter(instance) + return instance.Spec.AdapterURL + "/pkw/" + kwid + "/receive", kwid, err + default: + return instance.Spec.AdapterURL, "", nil + } +} + +func deleteKafkaWriter(kwURL string) error { + client := &http.Client{} + req, err := http.NewRequest(http.MethodDelete, kwURL, nil) + if err != nil { + log.Error(err, "Failed to form delete Kafka Writer request") + return err + } + _, err = client.Do(req) + if err != nil { + log.Error(err, "Failed to delete Kafka Writer", "Kafka Writer", kwURL) + return err + } + return nil +} + +func getKafkaWriter(instance *onapv1alpha1.PrometheusRemoteEndpoint) (string, error) { + // TODO - check update events + if instance.Status.KafkaWriterID != "" { + return instance.Status.KafkaWriterID, nil + } + return createKafkaWriter(instance) +} + +func createKafkaWriter(instance *onapv1alpha1.PrometheusRemoteEndpoint) (string, error) { + + log.V(1).Info("Processing Kafka Remote Endpoint", "Kafka Writer Config", instance.Spec) + baseURL := instance.Spec.AdapterURL + kwc := instance.Spec.KafkaConfig + kwURL := baseURL + "/pkw" + + postBody, err := json.Marshal(kwc) + if err != nil { + log.Error(err, "JSON Marshalling error") + return "", err + } + + resp, err := http.Post(kwURL, "application/json", bytes.NewBuffer(postBody)) + if err != nil { + log.Error(err, "Failed to create Kafka Writer", "Kafka Writer", kwURL, "Kafka Writer Config", kwc) + return "", err + } + defer resp.Body.Close() + var kwid string + json.NewDecoder(resp.Body).Decode(&kwid) + log.Info("Kafka Writer created", "Kafka Writer Id", kwid) + + return kwid, err +} |