aboutsummaryrefslogtreecommitdiffstats
path: root/vnfs/DAaaS/microservices/prom-kafka-writer/pkg/kafkawriter/producer.go
diff options
context:
space:
mode:
Diffstat (limited to 'vnfs/DAaaS/microservices/prom-kafka-writer/pkg/kafkawriter/producer.go')
-rw-r--r--vnfs/DAaaS/microservices/prom-kafka-writer/pkg/kafkawriter/producer.go83
1 files changed, 83 insertions, 0 deletions
diff --git a/vnfs/DAaaS/microservices/prom-kafka-writer/pkg/kafkawriter/producer.go b/vnfs/DAaaS/microservices/prom-kafka-writer/pkg/kafkawriter/producer.go
new file mode 100644
index 00000000..42804321
--- /dev/null
+++ b/vnfs/DAaaS/microservices/prom-kafka-writer/pkg/kafkawriter/producer.go
@@ -0,0 +1,83 @@
+/*
+ *
+ * Copyright 2019 Intel Corporation.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package kafkawriter
+
+import (
+ "encoding/json"
+ "github.com/prometheus/common/model"
+ "github.com/prometheus/prometheus/prompb"
+ "gopkg.in/confluentinc/confluent-kafka-go.v1/kafka"
+ logger "prom-kafka-writer/pkg/config"
+)
+
+var log = logger.GetLoggerInstance()
+
+func PublishTimeSeries(kwid string, metrics *prompb.WriteRequest) error {
+ log.Debugw("Remote write Time Series", "length", len(metrics.Timeseries), "TimeSeries", metrics.Timeseries)
+ for _, ts := range metrics.Timeseries {
+ m := make(model.Metric, len(ts.Labels))
+ for _, l := range ts.Labels {
+ m[model.LabelName(l.Name)] = model.LabelValue(l.Value)
+ }
+ log.Debugw("Labels", "Labelset", m)
+
+ for _, s := range ts.Samples {
+ log.Debugf(" %f %d\n", s.Value, s.Timestamp)
+ metric := map[string]interface{}{
+ "name": m["__name__"],
+ "labels": m,
+ "timestamp": s.Timestamp,
+ "value": s.Value,
+ }
+ key := string(m["__name__"])
+ jsonMetric, err := json.Marshal(metric)
+ if err != nil {
+ log.Errorw("Marshal error", "error", err.Error())
+ continue
+ }
+ err = publish(kwid, key, jsonMetric)
+ if err != nil {
+ log.Error("Failed to produce message")
+ return err
+ }
+ }
+ }
+ return nil
+}
+
+func publish(kwid string, key string, jsonMetric []byte) error {
+ var (
+ kwp = KWMap[kwid].Producer
+ kwc = KWMap[kwid].Config
+ )
+
+ tp := getTopicPartition(kwc)
+ kwMsg := kafka.Message{TopicPartition: tp, Key: []byte(key), Value: jsonMetric}
+ err := kwp.Produce(&kwMsg, nil)
+ if err != nil {
+ log.Errorw("Kafka Producer Error", "error", err.Error())
+ }
+ return err
+}
+
+func getTopicPartition(kwc KWConfig) kafka.TopicPartition {
+ p := kafka.PartitionAny
+ if kwc.UsePartition {
+ // TODO: Implement partition strategy
+ p = kafka.PartitionAny
+ }
+ return kafka.TopicPartition{Topic: &kwc.Topic, Partition: p}
+}