aboutsummaryrefslogtreecommitdiffstats
path: root/vnfs/DAaaS/microservices/prom-kafka-writer/pkg
diff options
context:
space:
mode:
Diffstat (limited to 'vnfs/DAaaS/microservices/prom-kafka-writer/pkg')
-rw-r--r--vnfs/DAaaS/microservices/prom-kafka-writer/pkg/api/handler.go143
-rw-r--r--vnfs/DAaaS/microservices/prom-kafka-writer/pkg/api/handler_test.go289
-rw-r--r--vnfs/DAaaS/microservices/prom-kafka-writer/pkg/api/router.go33
-rw-r--r--vnfs/DAaaS/microservices/prom-kafka-writer/pkg/config/logger.go38
-rw-r--r--vnfs/DAaaS/microservices/prom-kafka-writer/pkg/kafkawriter/kafkawriter.go129
-rw-r--r--vnfs/DAaaS/microservices/prom-kafka-writer/pkg/kafkawriter/kafkawriter_test.go229
-rw-r--r--vnfs/DAaaS/microservices/prom-kafka-writer/pkg/kafkawriter/producer.go83
7 files changed, 944 insertions, 0 deletions
diff --git a/vnfs/DAaaS/microservices/prom-kafka-writer/pkg/api/handler.go b/vnfs/DAaaS/microservices/prom-kafka-writer/pkg/api/handler.go
new file mode 100644
index 00000000..d7a2b898
--- /dev/null
+++ b/vnfs/DAaaS/microservices/prom-kafka-writer/pkg/api/handler.go
@@ -0,0 +1,143 @@
+/*
+ *
+ * Copyright 2019 Intel Corporation.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package api
+
+import (
+ "encoding/json"
+ "errors"
+ "io"
+ "io/ioutil"
+ "net/http"
+
+ logger "prom-kafka-writer/pkg/config"
+ kw "prom-kafka-writer/pkg/kafkawriter"
+
+ "github.com/golang/protobuf/proto"
+ "github.com/golang/snappy"
+ "github.com/gorilla/mux"
+ "github.com/prometheus/prometheus/prompb"
+)
+
+type kwResponse struct {
+ KWid string `json:"kwid,omitempty"`
+ KWCRespMap kw.KWRespMap `json:"kafkaWriterConfigs,omitempty"`
+}
+
+var log = logger.GetLoggerInstance()
+
+// CreateKWHandler - Creates and starts a Prometheus to Kafka writer
+func CreateKWHandler(w http.ResponseWriter, r *http.Request) {
+ log.Infow("Received request for Creating Kafka Writer")
+ kwConfig := kw.NewKWConfig()
+ dec := json.NewDecoder(r.Body)
+ dec.DisallowUnknownFields()
+ err := dec.Decode(kwConfig)
+ switch {
+ case err == io.EOF:
+ http.Error(w, "Body empty", http.StatusBadRequest)
+ return
+ case err != nil:
+ http.Error(w, err.Error(), http.StatusUnprocessableEntity)
+ return
+ }
+ kwid, err := kw.AddKWC(kwConfig)
+ if err != nil {
+ http.Error(w, err.Error(), http.StatusInternalServerError)
+ return
+ }
+
+ //Send response back
+ w.Header().Set("Content-Type", "application/json")
+ w.WriteHeader(http.StatusCreated)
+ kwResp := kwResponse{
+ KWid: kwid,
+ }
+ err = json.NewEncoder(w).Encode(kwResp)
+ if err != nil {
+ http.Error(w, err.Error(), http.StatusInternalServerError)
+ return
+ }
+}
+
+// ListKWHandler - Lists the KafkaWriters and its config
+func ListKWHandler(w http.ResponseWriter, r *http.Request) {
+ log.Infow("Received request for List Kafka Writers", "url", r.URL)
+ res := kw.ListKWC()
+ //Send response back
+ w.Header().Set("Content-Type", "application/json")
+ w.WriteHeader(http.StatusOK)
+ kwResp := kwResponse{
+ KWCRespMap: res,
+ }
+ err := json.NewEncoder(w).Encode(kwResp)
+ if err != nil {
+ http.Error(w, err.Error(), http.StatusInternalServerError)
+ return
+ }
+}
+
+// DeleteKWHandler - Deletes a given Prometheus to Kafka writer
+func DeleteKWHandler(w http.ResponseWriter, r *http.Request) {
+ params := mux.Vars(r)
+ log.Infow("Received request for Deleting Kafka Writer", "KWID", params["kwid"])
+ kw.DeleteKWC(params["kwid"])
+
+ //Send response back
+ w.Header().Set("Content-Type", "application/json")
+ w.WriteHeader(http.StatusOK)
+}
+
+// ReceiveKWHandler - Publish metrics from Prometheus to Kafka
+func ReceiveKWHandler(w http.ResponseWriter, r *http.Request) {
+ params := mux.Vars(r)
+ kwid := params["kwid"]
+ if _, ok := kw.KWMap[kwid]; !ok {
+ notRegisteredErr := errors.New("kafka writer not registered").Error()
+ log.Error(notRegisteredErr)
+ http.Error(w, notRegisteredErr, http.StatusNotFound)
+ return
+ }
+ log.Infow("Produce message on Kafka Writer", "kwid", kwid)
+
+ compressed, err := ioutil.ReadAll(r.Body)
+ defer r.Body.Close()
+ if err != nil {
+ log.Error("error", err.Error())
+ http.Error(w, err.Error(), http.StatusInternalServerError)
+ return
+ }
+
+ metricBuf, err := snappy.Decode(nil, compressed)
+ if err != nil {
+ log.Error("error", err.Error())
+ http.Error(w, err.Error(), http.StatusBadRequest)
+ return
+ }
+
+ var metrics prompb.WriteRequest
+ if err := proto.Unmarshal(metricBuf, &metrics); err != nil {
+ log.Error("error", err.Error())
+ http.Error(w, err.Error(), http.StatusBadRequest)
+ return
+ }
+
+ err = kw.PublishTimeSeries(kwid, &metrics)
+ if err != nil {
+ log.Error("error", err.Error())
+ http.Error(w, err.Error(), http.StatusInternalServerError)
+ return
+ }
+}
diff --git a/vnfs/DAaaS/microservices/prom-kafka-writer/pkg/api/handler_test.go b/vnfs/DAaaS/microservices/prom-kafka-writer/pkg/api/handler_test.go
new file mode 100644
index 00000000..19c4c0ab
--- /dev/null
+++ b/vnfs/DAaaS/microservices/prom-kafka-writer/pkg/api/handler_test.go
@@ -0,0 +1,289 @@
+/*
+ *
+ * Copyright 2019 Intel Corporation.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package api
+
+import (
+ "bytes"
+ "encoding/json"
+ "errors"
+ "fmt"
+ "github.com/golang/protobuf/proto"
+ "github.com/golang/snappy"
+ "github.com/prometheus/prometheus/prompb"
+ "github.com/stretchr/testify/assert"
+ "io"
+ "net/http"
+ "net/http/httptest"
+ "prom-kafka-writer/pkg/kafkawriter"
+ "testing"
+)
+
+type errReader int
+
+func (errReader) Read(p []byte) (n int, err error) {
+ return 0, errors.New("test error")
+}
+
+func TestCreateKWHandler(t *testing.T) {
+ tests := []struct {
+ name string
+ body io.Reader
+ expectStatus int
+ expectResp *kwResponse
+ }{
+ {
+ name: "Test Create Kafka Writer",
+ body: bytes.NewBuffer([]byte(`{
+ "bootstrap.servers": "kafka-cluster-kafka-bootstrap.kafka.svc.cluster.local:9092",
+ "topic": "adatopic1",
+ "usePartition": false,
+ "compression.codec": "snappy"
+ }`)),
+ expectStatus: http.StatusCreated,
+ expectResp: &kwResponse{
+ KWid: "pkw0",
+ },
+ },
+ {
+ name: "Test Create Kafka Writer Wrong parameters",
+ body: bytes.NewBuffer([]byte(`{
+ "servers": "kafka-cluster-kafka-bootstrap.kafka.svc.cluster.local:9092",
+ "kafkatopic": "adatopic1",
+ "usePartition": false,
+ "compression.codec": "snappy"
+ }`)),
+ expectStatus: http.StatusUnprocessableEntity,
+ expectResp: &kwResponse{},
+ },
+ {
+ name: "Test Create Kafka Writer Empty Body",
+ body: bytes.NewBuffer([]byte(nil)),
+ expectStatus: http.StatusBadRequest,
+ expectResp: &kwResponse{},
+ },
+ }
+
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ req := httptest.NewRequest("POST", "/pkw", tt.body)
+ rec := httptest.NewRecorder()
+ r := NewRouter()
+ r.ServeHTTP(rec, req)
+ resp := rec.Result()
+ assert.Equal(t, tt.expectStatus, resp.StatusCode)
+ kwResp := &kwResponse{}
+ json.NewDecoder(resp.Body).Decode(&kwResp)
+ assert.Equal(t, tt.expectResp, kwResp)
+ })
+ }
+}
+
+func TestListKWHandler(t *testing.T) {
+
+ tests := []struct {
+ name string
+ body string
+ expectStatus int
+ expectResp *kwResponse
+ }{
+ {
+ name: "Test List Kafka Writers",
+ body: `{
+ "bootstrap.servers": "kafka-cluster-kafka-bootstrap.kafka.svc.cluster.local:9092",
+ "topic": "adatopic1",
+ "usePartition": false,
+ "batch.num.messages": 10000,
+ "compression.codec": "snappy"
+ }`,
+ expectStatus: http.StatusOK,
+ expectResp: &kwResponse{
+ KWCRespMap: map[string]kafkawriter.KWConfig{
+ "pkw0": {
+ Broker: "kafka-cluster-kafka-bootstrap.kafka.svc.cluster.local:9092",
+ Topic: "adatopic1",
+ UsePartition: false,
+ BatchMsgNum: 10000,
+ Compression: "snappy",
+ },
+ },
+ },
+ },
+ }
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ preCreateKW("pkw0", tt.body)
+ req := httptest.NewRequest("GET", "/pkw", nil)
+ rec := httptest.NewRecorder()
+ r := NewRouter()
+ r.ServeHTTP(rec, req)
+ resp := rec.Result()
+ assert.Equal(t, tt.expectStatus, resp.StatusCode)
+ kwResp := &kwResponse{}
+ json.NewDecoder(resp.Body).Decode(&kwResp)
+ assert.Equal(t, tt.expectResp, kwResp)
+ })
+ }
+}
+
+func TestDeleteKWHandler(t *testing.T) {
+ tests := []struct {
+ name string
+ kwid string
+ expectStatus int
+ }{
+ {
+ name: "Test Delete Kafka Writer",
+ kwid: "pkw777",
+ expectStatus: http.StatusOK,
+ },
+ }
+ body := `{
+ "bootstrap.servers": "kafka-cluster-kafka-bootstrap.kafka.svc.cluster.local:9092",
+ "topic": "adatopic1",
+ "usePartition": false,
+ "batch.num.messages": 10000,
+ "compression.codec": "snappy"
+ }`
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ preCreateKW(tt.kwid, body)
+ target := fmt.Sprintf("/pkw/%s", tt.kwid)
+ req := httptest.NewRequest("DELETE", target, nil)
+ r := NewRouter()
+ rec := httptest.NewRecorder()
+ r.ServeHTTP(rec, req)
+ resp := rec.Result()
+ assert.Equal(t, tt.expectStatus, resp.StatusCode)
+ })
+ }
+}
+
+func preCreateKW(kwid string, body string) {
+ kafkawriter.Cleanup()
+ k := []byte(body)
+ kwc := &kafkawriter.KWConfig{}
+ _ = json.Unmarshal(k, kwc)
+ producer, _ := kafkawriter.NewKafkaWriter(kwc)
+ kafkawriter.KWMap[kwid] = kafkawriter.KWProducer{Config: *kwc, Producer: producer}
+}
+
+func TestReceiveKWHandler(t *testing.T) {
+ f, err := buildRemoteWriteRequest()
+ if err != nil {
+ t.Fatal("Could not build prompb.WriteRequest")
+ }
+ tests := []struct {
+ name string
+ kwid string
+ body io.Reader
+ preCreate bool
+ expectStatus int
+ }{
+ {
+ name: "Test Receive Messages Empty Message",
+ kwid: "pkw111",
+ preCreate: true,
+ expectStatus: http.StatusBadRequest,
+ },
+ {
+ name: "Test Receive Messages",
+ kwid: "pkw111",
+ preCreate: true,
+ body: bytes.NewReader(f),
+ expectStatus: http.StatusOK,
+ },
+ {
+ name: "Test Receive Messages Kafka Writer Not registed",
+ kwid: "pkw222",
+ preCreate: false,
+ expectStatus: http.StatusNotFound,
+ },
+ {
+ name: "Test Receive Messages Kafka Writer Not registed",
+ kwid: "pkw111",
+ preCreate: true,
+ body: errReader(0),
+ expectStatus: http.StatusInternalServerError,
+ },
+ }
+ body := `{
+ "bootstrap.servers": "kafka-cluster-kafka-bootstrap.kafka.svc.cluster.local:9092",
+ "topic": "adatopic1",
+ "usePartition": false,
+ "batch.num.messages": 10000,
+ "compression.codec": "snappy"
+ }`
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ if tt.preCreate {
+ preCreateKW(tt.kwid, body)
+ }
+ target := fmt.Sprintf("/pkw/%s/receive", tt.kwid)
+ req := httptest.NewRequest("POST", target, tt.body)
+ r := NewRouter()
+ rec := httptest.NewRecorder()
+ r.ServeHTTP(rec, req)
+ resp := rec.Result()
+ assert.Equal(t, tt.expectStatus, resp.StatusCode)
+ })
+ }
+}
+
+func buildRemoteWriteRequest() ([]byte, error) {
+ var buf []byte
+ samples := []*prompb.TimeSeries{
+ &prompb.TimeSeries{
+ Labels: []*prompb.Label{
+ &prompb.Label{Name: "__name__", Value: "go_gc_duration_seconds_count"},
+ &prompb.Label{Name: "endpoint", Value: "http"},
+ &prompb.Label{Name: "instance", Value: "10.42.1.101:8686"},
+ &prompb.Label{Name: "job", Value: "prom-kafka-writer"},
+ &prompb.Label{Name: "metrics_storage", Value: "kafka_remote"},
+ &prompb.Label{Name: "namespace", Value: "edge1"},
+ &prompb.Label{Name: "pod", Value: "prom-kafka-writer-696898f47f-bc5fs"},
+ &prompb.Label{Name: "prometheus", Value: "edge1/cp-prometheus-prometheus"},
+ &prompb.Label{Name: "prometheus_replica", Value: "prometheus-cp-prometheus-prometheus-0"},
+ &prompb.Label{Name: "service", Value: "prom-kafka-writer"},
+ },
+ Samples: []prompb.Sample{
+ prompb.Sample{
+ Value: 17,
+ Timestamp: 1572479934007,
+ },
+ prompb.Sample{
+ Value: 19,
+ Timestamp: 1572480144007,
+ },
+ },
+ },
+ }
+ req := &prompb.WriteRequest{
+ Timeseries: samples,
+ }
+
+ data, err := proto.Marshal(req)
+ if err != nil {
+ return nil, err
+ }
+
+ // snappy uses len() to see if it needs to allocate a new slice. Make the
+ // buffer as long as possible.
+ if buf != nil {
+ buf = buf[0:cap(buf)]
+ }
+ compressed := snappy.Encode(buf, data)
+ return compressed, nil
+}
diff --git a/vnfs/DAaaS/microservices/prom-kafka-writer/pkg/api/router.go b/vnfs/DAaaS/microservices/prom-kafka-writer/pkg/api/router.go
new file mode 100644
index 00000000..fb78afe2
--- /dev/null
+++ b/vnfs/DAaaS/microservices/prom-kafka-writer/pkg/api/router.go
@@ -0,0 +1,33 @@
+/*
+ *
+ * Copyright 2019 Intel Corporation.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package api
+
+import (
+ "github.com/gorilla/mux"
+ "github.com/prometheus/client_golang/prometheus/promhttp"
+)
+
+func NewRouter() *mux.Router {
+ r := mux.NewRouter()
+ r.HandleFunc("/pkw", ListKWHandler).Methods("GET")
+ r.HandleFunc("/pkw", CreateKWHandler).Methods("POST")
+ r.HandleFunc("/pkw/{kwid}", DeleteKWHandler).Methods("DELETE")
+ r.HandleFunc("/pkw/{kwid}/receive", ReceiveKWHandler).Methods("POST")
+
+ // Metrics Handler for prom-kafka-writer
+ r.Handle("/metrics", promhttp.Handler())
+ return r
+}
diff --git a/vnfs/DAaaS/microservices/prom-kafka-writer/pkg/config/logger.go b/vnfs/DAaaS/microservices/prom-kafka-writer/pkg/config/logger.go
new file mode 100644
index 00000000..2a5921f1
--- /dev/null
+++ b/vnfs/DAaaS/microservices/prom-kafka-writer/pkg/config/logger.go
@@ -0,0 +1,38 @@
+/*
+ *
+ * Copyright 2019 Intel Corporation.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package config
+
+import (
+ "go.uber.org/zap"
+ "log"
+ "sync"
+)
+
+var logOnce sync.Once
+var slogger *zap.SugaredLogger
+
+//GetLoggerInstance returns a singleton instance of logger
+func GetLoggerInstance() *zap.SugaredLogger {
+ logOnce.Do(func() {
+ logger, err := zap.NewProduction()
+ if err != nil {
+ log.Fatalf("can't initialize zap logger: %v", err)
+ }
+ defer logger.Sync()
+ slogger = logger.Sugar()
+ })
+ return slogger
+}
diff --git a/vnfs/DAaaS/microservices/prom-kafka-writer/pkg/kafkawriter/kafkawriter.go b/vnfs/DAaaS/microservices/prom-kafka-writer/pkg/kafkawriter/kafkawriter.go
new file mode 100644
index 00000000..f56f66aa
--- /dev/null
+++ b/vnfs/DAaaS/microservices/prom-kafka-writer/pkg/kafkawriter/kafkawriter.go
@@ -0,0 +1,129 @@
+/*
+ *
+ * Copyright 2019 Intel Corporation.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package kafkawriter
+
+import (
+ "gopkg.in/confluentinc/confluent-kafka-go.v1/kafka"
+ "strconv"
+ "sync"
+)
+
+//KWConfig - serialized type for config related to Kafka
+type KWConfig struct {
+ //Broker - Kafka Bootstrap servers (comma separated)
+ Broker string `json:"bootstrap.servers"`
+ //Topic - kafka topic name
+ Topic string `json:"topic"`
+ //UsePartition - Enforce use of partitions
+ UsePartition bool `json:"usePartition"`
+ BatchMsgNum int `json:"batch.num.messages,omitempty"`
+ Compression string `json:"compression.codec,omitempty"`
+}
+
+//KWProducer - holds the Kafka Config and associated Kafka Producer
+type KWProducer struct {
+ Config KWConfig
+ Producer *kafka.Producer
+}
+
+//KWRespMap packs the KWConfig and kwid for List Api
+type KWRespMap map[string]KWConfig
+
+//KWMap - Stores the Kafka Writer to Kafka Producer Mapping
+// This is used to uniquely identify a Kafka Writer - Producer mapping.
+var (
+ KWMap = make(map[string]KWProducer)
+ kwMutex sync.Mutex
+ id int
+)
+
+// NewKafkaWriter - creates a new producer using kafka config.
+// Handles the remote write from prometheus and send to kafka topic
+func NewKafkaWriter(kwc *KWConfig) (*kafka.Producer, error) {
+ producer, err := kafka.NewProducer(&kafka.ConfigMap{
+ "bootstrap.servers": kwc.Broker,
+ "compression.codec": kwc.Compression,
+ "batch.num.messages": kwc.BatchMsgNum,
+ "go.batch.producer": true,
+ "go.delivery.reports": false,
+ })
+ if err != nil {
+ return nil, err
+ }
+ return producer, nil
+}
+
+//NewKWConfig - creates a KWConfig object with default values
+func NewKWConfig() *KWConfig {
+ return &KWConfig{
+ UsePartition: false,
+ BatchMsgNum: 10000,
+ Compression: "none",
+ }
+}
+
+//NewKWRespMap - packs the KWConfig and kwid for List Api
+func newKWRespMap() KWRespMap {
+ kwr := make(KWRespMap)
+ return kwr
+}
+
+//AddKWC - Method to add KafkaWriterConfig request to KWMap
+func AddKWC(kwc *KWConfig) (string, error) {
+ kwMutex.Lock()
+ defer kwMutex.Unlock()
+ //TODO: Generate kwid
+ kwid := "pkw" + strconv.Itoa(id)
+ id++
+ producer, err := NewKafkaWriter(kwc)
+ if err != nil {
+ log.Error("Error", err)
+ id--
+ return "", err
+ }
+
+ KWMap[kwid] = KWProducer{
+ Config: *kwc,
+ Producer: producer,
+ }
+ return kwid, nil
+}
+
+//DeleteKWC - Method to add KafkaWriter request to KWMap
+func DeleteKWC(kwid string) {
+ kwMutex.Lock()
+ defer kwMutex.Unlock()
+ if _, ok := KWMap[kwid]; ok {
+ KWMap[kwid].Producer.Close()
+ }
+ delete(KWMap, kwid)
+}
+
+//ListKWC - Method to add KafkaWriter request to KWMap
+func ListKWC() KWRespMap {
+ kwr := newKWRespMap()
+ for k, v := range KWMap {
+ kwr[k] = v.Config
+ }
+ return kwr
+}
+
+//Cleanup - Method to cleanup resources
+func Cleanup() {
+ for k := range KWMap {
+ DeleteKWC(k)
+ }
+}
diff --git a/vnfs/DAaaS/microservices/prom-kafka-writer/pkg/kafkawriter/kafkawriter_test.go b/vnfs/DAaaS/microservices/prom-kafka-writer/pkg/kafkawriter/kafkawriter_test.go
new file mode 100644
index 00000000..6869452f
--- /dev/null
+++ b/vnfs/DAaaS/microservices/prom-kafka-writer/pkg/kafkawriter/kafkawriter_test.go
@@ -0,0 +1,229 @@
+/*
+ *
+ * Copyright 2019 Intel Corporation.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package kafkawriter
+
+import (
+ "github.com/stretchr/testify/assert"
+ "reflect"
+ "testing"
+)
+
+func TestNewKafkaWriter(t *testing.T) {
+ type args struct {
+ kwc *KWConfig
+ }
+ kwc := NewKWConfig()
+ kwc.Broker = "localhost:9092"
+ kwc.Topic = "metrics"
+
+ kwc2 := NewKWConfig()
+ kwc2.Broker = "localhost:9092"
+ kwc2.Topic = "metrics"
+ kwc2.BatchMsgNum = 0
+
+ tests := []struct {
+ name string
+ args args
+ want interface{}
+ }{
+ {
+ name: "Test New Kafka Writer",
+ args: args{kwc},
+ want: "rdkafka#producer-1",
+ },
+ {
+ name: "Test New Kafka Writer Wrong Config",
+ args: args{kwc2},
+ want: nil,
+ },
+ }
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ got, _ := NewKafkaWriter(tt.args.kwc)
+ if tt.want == nil {
+ assert.Equal(t, tt.want, nil)
+ } else {
+ if !reflect.DeepEqual(got.String(), tt.want) {
+ t.Errorf("NewKafkaWriter() = %v, want %v", got, tt.want)
+ }
+ }
+ })
+ }
+}
+
+func TestNewKWConfig(t *testing.T) {
+ tests := []struct {
+ name string
+ want *KWConfig
+ }{
+ {
+ name: "Test New Kafka Config",
+ want: &KWConfig{
+ UsePartition: false,
+ BatchMsgNum: 10000,
+ Compression: "none",
+ },
+ },
+ }
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ if got := NewKWConfig(); !reflect.DeepEqual(got, tt.want) {
+ t.Errorf("NewKWConfig() = %v, want %v", got, tt.want)
+ }
+ })
+ }
+}
+
+func TestAddKWC(t *testing.T) {
+ type args struct {
+ kwc *KWConfig
+ }
+ kwc := NewKWConfig()
+ kwc.Broker = "localhost:9092"
+ kwc.Topic = "metrics"
+ tests := []struct {
+ name string
+ args args
+ want string
+ }{
+ {
+ name: "Test Add Kafka Writer 1",
+ args: args{kwc},
+ want: "pkw0",
+ },
+ {
+ name: "Test Add Kafka Writer 2 ",
+ args: args{kwc},
+ want: "pkw1",
+ },
+ }
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ got, _ := AddKWC(tt.args.kwc)
+ assert.Equal(t, tt.want, got)
+
+ })
+ }
+ assert.Equal(t, 2, len(KWMap))
+}
+
+func TestDeleteKWC(t *testing.T) {
+ type args struct {
+ kwid string
+ }
+ tests := []struct {
+ name string
+ args args
+ delcount int
+ }{
+ {
+ name: "Test Delete Kafka Writer 1",
+ args: args{"pkw0"},
+ delcount: 1,
+ },
+ {
+ name: "Test Delete Kafka Writer Non existent",
+ args: args{"pkw3"},
+ delcount: 0,
+ },
+ {
+ name: "Test Delete Kafka Writer 2",
+ args: args{"pkw1"},
+ delcount: 1,
+ },
+ }
+ for _, tt := range tests {
+ l := len(KWMap)
+ t.Run(tt.name, func(t *testing.T) {
+ DeleteKWC(tt.args.kwid)
+ })
+ assert.Equal(t, l-tt.delcount, len(KWMap))
+ }
+ assert.Equal(t, 0, len(KWMap))
+}
+
+func TestListKWC(t *testing.T) {
+ tests := []struct {
+ name string
+ init func() string
+ want KWRespMap
+ clean func(string)
+ }{
+ {
+ name: "Test List Kafka Writers Empty",
+ want: KWRespMap{"pkw2": {
+ Broker: "localhost:9092",
+ Topic: "metrics",
+ UsePartition: false,
+ BatchMsgNum: 10000,
+ Compression: "none",
+ }},
+ init: func() string {
+ kwc := NewKWConfig()
+ kwc.Broker = "localhost:9092"
+ kwc.Topic = "metrics"
+ id, _ := AddKWC(kwc)
+ return id
+ },
+ clean: func(id string) {
+ DeleteKWC(id)
+ },
+ },
+ {
+ name: "Test List Kafka Writers Empty",
+ want: KWRespMap{},
+ init: func() string {
+ return ""
+ },
+ clean: func(string) {},
+ },
+ }
+ for _, tt := range tests {
+ id := tt.init()
+ t.Run(tt.name, func(t *testing.T) {
+ if got := ListKWC(); !reflect.DeepEqual(got, tt.want) {
+ t.Errorf("ListKWC() = %v, want %v", got, tt.want)
+ }
+ })
+ tt.clean(id)
+ }
+}
+
+func TestCleanup(t *testing.T) {
+ tests := []struct {
+ name string
+ init func()
+ }{
+ {
+ name: "Test List Kafka Writers Empty",
+ init: func() {
+ kwc := NewKWConfig()
+ kwc.Broker = "localhost:9092"
+ kwc.Topic = "metrics"
+ AddKWC(kwc)
+ AddKWC(kwc)
+ AddKWC(kwc)
+ },
+ },
+ }
+ for _, tt := range tests {
+ tt.init()
+ t.Run(tt.name, func(t *testing.T) {
+ Cleanup()
+ })
+ assert.Equal(t, 0, len(KWMap))
+ }
+}
diff --git a/vnfs/DAaaS/microservices/prom-kafka-writer/pkg/kafkawriter/producer.go b/vnfs/DAaaS/microservices/prom-kafka-writer/pkg/kafkawriter/producer.go
new file mode 100644
index 00000000..42804321
--- /dev/null
+++ b/vnfs/DAaaS/microservices/prom-kafka-writer/pkg/kafkawriter/producer.go
@@ -0,0 +1,83 @@
+/*
+ *
+ * Copyright 2019 Intel Corporation.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package kafkawriter
+
+import (
+ "encoding/json"
+ "github.com/prometheus/common/model"
+ "github.com/prometheus/prometheus/prompb"
+ "gopkg.in/confluentinc/confluent-kafka-go.v1/kafka"
+ logger "prom-kafka-writer/pkg/config"
+)
+
+var log = logger.GetLoggerInstance()
+
+func PublishTimeSeries(kwid string, metrics *prompb.WriteRequest) error {
+ log.Debugw("Remote write Time Series", "length", len(metrics.Timeseries), "TimeSeries", metrics.Timeseries)
+ for _, ts := range metrics.Timeseries {
+ m := make(model.Metric, len(ts.Labels))
+ for _, l := range ts.Labels {
+ m[model.LabelName(l.Name)] = model.LabelValue(l.Value)
+ }
+ log.Debugw("Labels", "Labelset", m)
+
+ for _, s := range ts.Samples {
+ log.Debugf(" %f %d\n", s.Value, s.Timestamp)
+ metric := map[string]interface{}{
+ "name": m["__name__"],
+ "labels": m,
+ "timestamp": s.Timestamp,
+ "value": s.Value,
+ }
+ key := string(m["__name__"])
+ jsonMetric, err := json.Marshal(metric)
+ if err != nil {
+ log.Errorw("Marshal error", "error", err.Error())
+ continue
+ }
+ err = publish(kwid, key, jsonMetric)
+ if err != nil {
+ log.Error("Failed to produce message")
+ return err
+ }
+ }
+ }
+ return nil
+}
+
+func publish(kwid string, key string, jsonMetric []byte) error {
+ var (
+ kwp = KWMap[kwid].Producer
+ kwc = KWMap[kwid].Config
+ )
+
+ tp := getTopicPartition(kwc)
+ kwMsg := kafka.Message{TopicPartition: tp, Key: []byte(key), Value: jsonMetric}
+ err := kwp.Produce(&kwMsg, nil)
+ if err != nil {
+ log.Errorw("Kafka Producer Error", "error", err.Error())
+ }
+ return err
+}
+
+func getTopicPartition(kwc KWConfig) kafka.TopicPartition {
+ p := kafka.PartitionAny
+ if kwc.UsePartition {
+ // TODO: Implement partition strategy
+ p = kafka.PartitionAny
+ }
+ return kafka.TopicPartition{Topic: &kwc.Topic, Partition: p}
+}