From d11cb99fb101a6798fd57fa44d332737d6637e75 Mon Sep 17 00:00:00 2001 From: Dileep Ranganathan Date: Thu, 31 Oct 2019 15:46:37 -0700 Subject: Prometheus Kafka Writer Microservice This patch implents Prometheus to remote Kafka Writer Microservice Added sample day-2 config to configure prometheus PS4: Fixed Review comments Issue-ID: ONAPARC-393 Signed-off-by: Dileep Ranganathan Change-Id: I0bc77175593a165effd7bb1cb4802c988a5ef4ec --- .../prom-kafka-writer/pkg/api/handler.go | 143 ++++++++++ .../prom-kafka-writer/pkg/api/handler_test.go | 289 +++++++++++++++++++++ .../prom-kafka-writer/pkg/api/router.go | 33 +++ .../prom-kafka-writer/pkg/config/logger.go | 38 +++ .../pkg/kafkawriter/kafkawriter.go | 129 +++++++++ .../pkg/kafkawriter/kafkawriter_test.go | 229 ++++++++++++++++ .../prom-kafka-writer/pkg/kafkawriter/producer.go | 83 ++++++ 7 files changed, 944 insertions(+) create mode 100644 vnfs/DAaaS/microservices/prom-kafka-writer/pkg/api/handler.go create mode 100644 vnfs/DAaaS/microservices/prom-kafka-writer/pkg/api/handler_test.go create mode 100644 vnfs/DAaaS/microservices/prom-kafka-writer/pkg/api/router.go create mode 100644 vnfs/DAaaS/microservices/prom-kafka-writer/pkg/config/logger.go create mode 100644 vnfs/DAaaS/microservices/prom-kafka-writer/pkg/kafkawriter/kafkawriter.go create mode 100644 vnfs/DAaaS/microservices/prom-kafka-writer/pkg/kafkawriter/kafkawriter_test.go create mode 100644 vnfs/DAaaS/microservices/prom-kafka-writer/pkg/kafkawriter/producer.go (limited to 'vnfs/DAaaS/microservices/prom-kafka-writer/pkg') diff --git a/vnfs/DAaaS/microservices/prom-kafka-writer/pkg/api/handler.go b/vnfs/DAaaS/microservices/prom-kafka-writer/pkg/api/handler.go new file mode 100644 index 00000000..d7a2b898 --- /dev/null +++ b/vnfs/DAaaS/microservices/prom-kafka-writer/pkg/api/handler.go @@ -0,0 +1,143 @@ +/* + * + * Copyright 2019 Intel Corporation. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package api + +import ( + "encoding/json" + "errors" + "io" + "io/ioutil" + "net/http" + + logger "prom-kafka-writer/pkg/config" + kw "prom-kafka-writer/pkg/kafkawriter" + + "github.com/golang/protobuf/proto" + "github.com/golang/snappy" + "github.com/gorilla/mux" + "github.com/prometheus/prometheus/prompb" +) + +type kwResponse struct { + KWid string `json:"kwid,omitempty"` + KWCRespMap kw.KWRespMap `json:"kafkaWriterConfigs,omitempty"` +} + +var log = logger.GetLoggerInstance() + +// CreateKWHandler - Creates and starts a Prometheus to Kafka writer +func CreateKWHandler(w http.ResponseWriter, r *http.Request) { + log.Infow("Received request for Creating Kafka Writer") + kwConfig := kw.NewKWConfig() + dec := json.NewDecoder(r.Body) + dec.DisallowUnknownFields() + err := dec.Decode(kwConfig) + switch { + case err == io.EOF: + http.Error(w, "Body empty", http.StatusBadRequest) + return + case err != nil: + http.Error(w, err.Error(), http.StatusUnprocessableEntity) + return + } + kwid, err := kw.AddKWC(kwConfig) + if err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + + //Send response back + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(http.StatusCreated) + kwResp := kwResponse{ + KWid: kwid, + } + err = json.NewEncoder(w).Encode(kwResp) + if err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } +} + +// ListKWHandler - Lists the KafkaWriters and its config +func ListKWHandler(w http.ResponseWriter, r *http.Request) { + log.Infow("Received request for List Kafka Writers", "url", r.URL) + res := kw.ListKWC() + //Send response back + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(http.StatusOK) + kwResp := kwResponse{ + KWCRespMap: res, + } + err := json.NewEncoder(w).Encode(kwResp) + if err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } +} + +// DeleteKWHandler - Deletes a given Prometheus to Kafka writer +func DeleteKWHandler(w http.ResponseWriter, r *http.Request) { + params := mux.Vars(r) + log.Infow("Received request for Deleting Kafka Writer", "KWID", params["kwid"]) + kw.DeleteKWC(params["kwid"]) + + //Send response back + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(http.StatusOK) +} + +// ReceiveKWHandler - Publish metrics from Prometheus to Kafka +func ReceiveKWHandler(w http.ResponseWriter, r *http.Request) { + params := mux.Vars(r) + kwid := params["kwid"] + if _, ok := kw.KWMap[kwid]; !ok { + notRegisteredErr := errors.New("kafka writer not registered").Error() + log.Error(notRegisteredErr) + http.Error(w, notRegisteredErr, http.StatusNotFound) + return + } + log.Infow("Produce message on Kafka Writer", "kwid", kwid) + + compressed, err := ioutil.ReadAll(r.Body) + defer r.Body.Close() + if err != nil { + log.Error("error", err.Error()) + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + + metricBuf, err := snappy.Decode(nil, compressed) + if err != nil { + log.Error("error", err.Error()) + http.Error(w, err.Error(), http.StatusBadRequest) + return + } + + var metrics prompb.WriteRequest + if err := proto.Unmarshal(metricBuf, &metrics); err != nil { + log.Error("error", err.Error()) + http.Error(w, err.Error(), http.StatusBadRequest) + return + } + + err = kw.PublishTimeSeries(kwid, &metrics) + if err != nil { + log.Error("error", err.Error()) + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } +} diff --git a/vnfs/DAaaS/microservices/prom-kafka-writer/pkg/api/handler_test.go b/vnfs/DAaaS/microservices/prom-kafka-writer/pkg/api/handler_test.go new file mode 100644 index 00000000..19c4c0ab --- /dev/null +++ b/vnfs/DAaaS/microservices/prom-kafka-writer/pkg/api/handler_test.go @@ -0,0 +1,289 @@ +/* + * + * Copyright 2019 Intel Corporation. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package api + +import ( + "bytes" + "encoding/json" + "errors" + "fmt" + "github.com/golang/protobuf/proto" + "github.com/golang/snappy" + "github.com/prometheus/prometheus/prompb" + "github.com/stretchr/testify/assert" + "io" + "net/http" + "net/http/httptest" + "prom-kafka-writer/pkg/kafkawriter" + "testing" +) + +type errReader int + +func (errReader) Read(p []byte) (n int, err error) { + return 0, errors.New("test error") +} + +func TestCreateKWHandler(t *testing.T) { + tests := []struct { + name string + body io.Reader + expectStatus int + expectResp *kwResponse + }{ + { + name: "Test Create Kafka Writer", + body: bytes.NewBuffer([]byte(`{ + "bootstrap.servers": "kafka-cluster-kafka-bootstrap.kafka.svc.cluster.local:9092", + "topic": "adatopic1", + "usePartition": false, + "compression.codec": "snappy" + }`)), + expectStatus: http.StatusCreated, + expectResp: &kwResponse{ + KWid: "pkw0", + }, + }, + { + name: "Test Create Kafka Writer Wrong parameters", + body: bytes.NewBuffer([]byte(`{ + "servers": "kafka-cluster-kafka-bootstrap.kafka.svc.cluster.local:9092", + "kafkatopic": "adatopic1", + "usePartition": false, + "compression.codec": "snappy" + }`)), + expectStatus: http.StatusUnprocessableEntity, + expectResp: &kwResponse{}, + }, + { + name: "Test Create Kafka Writer Empty Body", + body: bytes.NewBuffer([]byte(nil)), + expectStatus: http.StatusBadRequest, + expectResp: &kwResponse{}, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + req := httptest.NewRequest("POST", "/pkw", tt.body) + rec := httptest.NewRecorder() + r := NewRouter() + r.ServeHTTP(rec, req) + resp := rec.Result() + assert.Equal(t, tt.expectStatus, resp.StatusCode) + kwResp := &kwResponse{} + json.NewDecoder(resp.Body).Decode(&kwResp) + assert.Equal(t, tt.expectResp, kwResp) + }) + } +} + +func TestListKWHandler(t *testing.T) { + + tests := []struct { + name string + body string + expectStatus int + expectResp *kwResponse + }{ + { + name: "Test List Kafka Writers", + body: `{ + "bootstrap.servers": "kafka-cluster-kafka-bootstrap.kafka.svc.cluster.local:9092", + "topic": "adatopic1", + "usePartition": false, + "batch.num.messages": 10000, + "compression.codec": "snappy" + }`, + expectStatus: http.StatusOK, + expectResp: &kwResponse{ + KWCRespMap: map[string]kafkawriter.KWConfig{ + "pkw0": { + Broker: "kafka-cluster-kafka-bootstrap.kafka.svc.cluster.local:9092", + Topic: "adatopic1", + UsePartition: false, + BatchMsgNum: 10000, + Compression: "snappy", + }, + }, + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + preCreateKW("pkw0", tt.body) + req := httptest.NewRequest("GET", "/pkw", nil) + rec := httptest.NewRecorder() + r := NewRouter() + r.ServeHTTP(rec, req) + resp := rec.Result() + assert.Equal(t, tt.expectStatus, resp.StatusCode) + kwResp := &kwResponse{} + json.NewDecoder(resp.Body).Decode(&kwResp) + assert.Equal(t, tt.expectResp, kwResp) + }) + } +} + +func TestDeleteKWHandler(t *testing.T) { + tests := []struct { + name string + kwid string + expectStatus int + }{ + { + name: "Test Delete Kafka Writer", + kwid: "pkw777", + expectStatus: http.StatusOK, + }, + } + body := `{ + "bootstrap.servers": "kafka-cluster-kafka-bootstrap.kafka.svc.cluster.local:9092", + "topic": "adatopic1", + "usePartition": false, + "batch.num.messages": 10000, + "compression.codec": "snappy" + }` + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + preCreateKW(tt.kwid, body) + target := fmt.Sprintf("/pkw/%s", tt.kwid) + req := httptest.NewRequest("DELETE", target, nil) + r := NewRouter() + rec := httptest.NewRecorder() + r.ServeHTTP(rec, req) + resp := rec.Result() + assert.Equal(t, tt.expectStatus, resp.StatusCode) + }) + } +} + +func preCreateKW(kwid string, body string) { + kafkawriter.Cleanup() + k := []byte(body) + kwc := &kafkawriter.KWConfig{} + _ = json.Unmarshal(k, kwc) + producer, _ := kafkawriter.NewKafkaWriter(kwc) + kafkawriter.KWMap[kwid] = kafkawriter.KWProducer{Config: *kwc, Producer: producer} +} + +func TestReceiveKWHandler(t *testing.T) { + f, err := buildRemoteWriteRequest() + if err != nil { + t.Fatal("Could not build prompb.WriteRequest") + } + tests := []struct { + name string + kwid string + body io.Reader + preCreate bool + expectStatus int + }{ + { + name: "Test Receive Messages Empty Message", + kwid: "pkw111", + preCreate: true, + expectStatus: http.StatusBadRequest, + }, + { + name: "Test Receive Messages", + kwid: "pkw111", + preCreate: true, + body: bytes.NewReader(f), + expectStatus: http.StatusOK, + }, + { + name: "Test Receive Messages Kafka Writer Not registed", + kwid: "pkw222", + preCreate: false, + expectStatus: http.StatusNotFound, + }, + { + name: "Test Receive Messages Kafka Writer Not registed", + kwid: "pkw111", + preCreate: true, + body: errReader(0), + expectStatus: http.StatusInternalServerError, + }, + } + body := `{ + "bootstrap.servers": "kafka-cluster-kafka-bootstrap.kafka.svc.cluster.local:9092", + "topic": "adatopic1", + "usePartition": false, + "batch.num.messages": 10000, + "compression.codec": "snappy" + }` + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + if tt.preCreate { + preCreateKW(tt.kwid, body) + } + target := fmt.Sprintf("/pkw/%s/receive", tt.kwid) + req := httptest.NewRequest("POST", target, tt.body) + r := NewRouter() + rec := httptest.NewRecorder() + r.ServeHTTP(rec, req) + resp := rec.Result() + assert.Equal(t, tt.expectStatus, resp.StatusCode) + }) + } +} + +func buildRemoteWriteRequest() ([]byte, error) { + var buf []byte + samples := []*prompb.TimeSeries{ + &prompb.TimeSeries{ + Labels: []*prompb.Label{ + &prompb.Label{Name: "__name__", Value: "go_gc_duration_seconds_count"}, + &prompb.Label{Name: "endpoint", Value: "http"}, + &prompb.Label{Name: "instance", Value: "10.42.1.101:8686"}, + &prompb.Label{Name: "job", Value: "prom-kafka-writer"}, + &prompb.Label{Name: "metrics_storage", Value: "kafka_remote"}, + &prompb.Label{Name: "namespace", Value: "edge1"}, + &prompb.Label{Name: "pod", Value: "prom-kafka-writer-696898f47f-bc5fs"}, + &prompb.Label{Name: "prometheus", Value: "edge1/cp-prometheus-prometheus"}, + &prompb.Label{Name: "prometheus_replica", Value: "prometheus-cp-prometheus-prometheus-0"}, + &prompb.Label{Name: "service", Value: "prom-kafka-writer"}, + }, + Samples: []prompb.Sample{ + prompb.Sample{ + Value: 17, + Timestamp: 1572479934007, + }, + prompb.Sample{ + Value: 19, + Timestamp: 1572480144007, + }, + }, + }, + } + req := &prompb.WriteRequest{ + Timeseries: samples, + } + + data, err := proto.Marshal(req) + if err != nil { + return nil, err + } + + // snappy uses len() to see if it needs to allocate a new slice. Make the + // buffer as long as possible. + if buf != nil { + buf = buf[0:cap(buf)] + } + compressed := snappy.Encode(buf, data) + return compressed, nil +} diff --git a/vnfs/DAaaS/microservices/prom-kafka-writer/pkg/api/router.go b/vnfs/DAaaS/microservices/prom-kafka-writer/pkg/api/router.go new file mode 100644 index 00000000..fb78afe2 --- /dev/null +++ b/vnfs/DAaaS/microservices/prom-kafka-writer/pkg/api/router.go @@ -0,0 +1,33 @@ +/* + * + * Copyright 2019 Intel Corporation. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package api + +import ( + "github.com/gorilla/mux" + "github.com/prometheus/client_golang/prometheus/promhttp" +) + +func NewRouter() *mux.Router { + r := mux.NewRouter() + r.HandleFunc("/pkw", ListKWHandler).Methods("GET") + r.HandleFunc("/pkw", CreateKWHandler).Methods("POST") + r.HandleFunc("/pkw/{kwid}", DeleteKWHandler).Methods("DELETE") + r.HandleFunc("/pkw/{kwid}/receive", ReceiveKWHandler).Methods("POST") + + // Metrics Handler for prom-kafka-writer + r.Handle("/metrics", promhttp.Handler()) + return r +} diff --git a/vnfs/DAaaS/microservices/prom-kafka-writer/pkg/config/logger.go b/vnfs/DAaaS/microservices/prom-kafka-writer/pkg/config/logger.go new file mode 100644 index 00000000..2a5921f1 --- /dev/null +++ b/vnfs/DAaaS/microservices/prom-kafka-writer/pkg/config/logger.go @@ -0,0 +1,38 @@ +/* + * + * Copyright 2019 Intel Corporation. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package config + +import ( + "go.uber.org/zap" + "log" + "sync" +) + +var logOnce sync.Once +var slogger *zap.SugaredLogger + +//GetLoggerInstance returns a singleton instance of logger +func GetLoggerInstance() *zap.SugaredLogger { + logOnce.Do(func() { + logger, err := zap.NewProduction() + if err != nil { + log.Fatalf("can't initialize zap logger: %v", err) + } + defer logger.Sync() + slogger = logger.Sugar() + }) + return slogger +} diff --git a/vnfs/DAaaS/microservices/prom-kafka-writer/pkg/kafkawriter/kafkawriter.go b/vnfs/DAaaS/microservices/prom-kafka-writer/pkg/kafkawriter/kafkawriter.go new file mode 100644 index 00000000..f56f66aa --- /dev/null +++ b/vnfs/DAaaS/microservices/prom-kafka-writer/pkg/kafkawriter/kafkawriter.go @@ -0,0 +1,129 @@ +/* + * + * Copyright 2019 Intel Corporation. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package kafkawriter + +import ( + "gopkg.in/confluentinc/confluent-kafka-go.v1/kafka" + "strconv" + "sync" +) + +//KWConfig - serialized type for config related to Kafka +type KWConfig struct { + //Broker - Kafka Bootstrap servers (comma separated) + Broker string `json:"bootstrap.servers"` + //Topic - kafka topic name + Topic string `json:"topic"` + //UsePartition - Enforce use of partitions + UsePartition bool `json:"usePartition"` + BatchMsgNum int `json:"batch.num.messages,omitempty"` + Compression string `json:"compression.codec,omitempty"` +} + +//KWProducer - holds the Kafka Config and associated Kafka Producer +type KWProducer struct { + Config KWConfig + Producer *kafka.Producer +} + +//KWRespMap packs the KWConfig and kwid for List Api +type KWRespMap map[string]KWConfig + +//KWMap - Stores the Kafka Writer to Kafka Producer Mapping +// This is used to uniquely identify a Kafka Writer - Producer mapping. +var ( + KWMap = make(map[string]KWProducer) + kwMutex sync.Mutex + id int +) + +// NewKafkaWriter - creates a new producer using kafka config. +// Handles the remote write from prometheus and send to kafka topic +func NewKafkaWriter(kwc *KWConfig) (*kafka.Producer, error) { + producer, err := kafka.NewProducer(&kafka.ConfigMap{ + "bootstrap.servers": kwc.Broker, + "compression.codec": kwc.Compression, + "batch.num.messages": kwc.BatchMsgNum, + "go.batch.producer": true, + "go.delivery.reports": false, + }) + if err != nil { + return nil, err + } + return producer, nil +} + +//NewKWConfig - creates a KWConfig object with default values +func NewKWConfig() *KWConfig { + return &KWConfig{ + UsePartition: false, + BatchMsgNum: 10000, + Compression: "none", + } +} + +//NewKWRespMap - packs the KWConfig and kwid for List Api +func newKWRespMap() KWRespMap { + kwr := make(KWRespMap) + return kwr +} + +//AddKWC - Method to add KafkaWriterConfig request to KWMap +func AddKWC(kwc *KWConfig) (string, error) { + kwMutex.Lock() + defer kwMutex.Unlock() + //TODO: Generate kwid + kwid := "pkw" + strconv.Itoa(id) + id++ + producer, err := NewKafkaWriter(kwc) + if err != nil { + log.Error("Error", err) + id-- + return "", err + } + + KWMap[kwid] = KWProducer{ + Config: *kwc, + Producer: producer, + } + return kwid, nil +} + +//DeleteKWC - Method to add KafkaWriter request to KWMap +func DeleteKWC(kwid string) { + kwMutex.Lock() + defer kwMutex.Unlock() + if _, ok := KWMap[kwid]; ok { + KWMap[kwid].Producer.Close() + } + delete(KWMap, kwid) +} + +//ListKWC - Method to add KafkaWriter request to KWMap +func ListKWC() KWRespMap { + kwr := newKWRespMap() + for k, v := range KWMap { + kwr[k] = v.Config + } + return kwr +} + +//Cleanup - Method to cleanup resources +func Cleanup() { + for k := range KWMap { + DeleteKWC(k) + } +} diff --git a/vnfs/DAaaS/microservices/prom-kafka-writer/pkg/kafkawriter/kafkawriter_test.go b/vnfs/DAaaS/microservices/prom-kafka-writer/pkg/kafkawriter/kafkawriter_test.go new file mode 100644 index 00000000..6869452f --- /dev/null +++ b/vnfs/DAaaS/microservices/prom-kafka-writer/pkg/kafkawriter/kafkawriter_test.go @@ -0,0 +1,229 @@ +/* + * + * Copyright 2019 Intel Corporation. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package kafkawriter + +import ( + "github.com/stretchr/testify/assert" + "reflect" + "testing" +) + +func TestNewKafkaWriter(t *testing.T) { + type args struct { + kwc *KWConfig + } + kwc := NewKWConfig() + kwc.Broker = "localhost:9092" + kwc.Topic = "metrics" + + kwc2 := NewKWConfig() + kwc2.Broker = "localhost:9092" + kwc2.Topic = "metrics" + kwc2.BatchMsgNum = 0 + + tests := []struct { + name string + args args + want interface{} + }{ + { + name: "Test New Kafka Writer", + args: args{kwc}, + want: "rdkafka#producer-1", + }, + { + name: "Test New Kafka Writer Wrong Config", + args: args{kwc2}, + want: nil, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got, _ := NewKafkaWriter(tt.args.kwc) + if tt.want == nil { + assert.Equal(t, tt.want, nil) + } else { + if !reflect.DeepEqual(got.String(), tt.want) { + t.Errorf("NewKafkaWriter() = %v, want %v", got, tt.want) + } + } + }) + } +} + +func TestNewKWConfig(t *testing.T) { + tests := []struct { + name string + want *KWConfig + }{ + { + name: "Test New Kafka Config", + want: &KWConfig{ + UsePartition: false, + BatchMsgNum: 10000, + Compression: "none", + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + if got := NewKWConfig(); !reflect.DeepEqual(got, tt.want) { + t.Errorf("NewKWConfig() = %v, want %v", got, tt.want) + } + }) + } +} + +func TestAddKWC(t *testing.T) { + type args struct { + kwc *KWConfig + } + kwc := NewKWConfig() + kwc.Broker = "localhost:9092" + kwc.Topic = "metrics" + tests := []struct { + name string + args args + want string + }{ + { + name: "Test Add Kafka Writer 1", + args: args{kwc}, + want: "pkw0", + }, + { + name: "Test Add Kafka Writer 2 ", + args: args{kwc}, + want: "pkw1", + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got, _ := AddKWC(tt.args.kwc) + assert.Equal(t, tt.want, got) + + }) + } + assert.Equal(t, 2, len(KWMap)) +} + +func TestDeleteKWC(t *testing.T) { + type args struct { + kwid string + } + tests := []struct { + name string + args args + delcount int + }{ + { + name: "Test Delete Kafka Writer 1", + args: args{"pkw0"}, + delcount: 1, + }, + { + name: "Test Delete Kafka Writer Non existent", + args: args{"pkw3"}, + delcount: 0, + }, + { + name: "Test Delete Kafka Writer 2", + args: args{"pkw1"}, + delcount: 1, + }, + } + for _, tt := range tests { + l := len(KWMap) + t.Run(tt.name, func(t *testing.T) { + DeleteKWC(tt.args.kwid) + }) + assert.Equal(t, l-tt.delcount, len(KWMap)) + } + assert.Equal(t, 0, len(KWMap)) +} + +func TestListKWC(t *testing.T) { + tests := []struct { + name string + init func() string + want KWRespMap + clean func(string) + }{ + { + name: "Test List Kafka Writers Empty", + want: KWRespMap{"pkw2": { + Broker: "localhost:9092", + Topic: "metrics", + UsePartition: false, + BatchMsgNum: 10000, + Compression: "none", + }}, + init: func() string { + kwc := NewKWConfig() + kwc.Broker = "localhost:9092" + kwc.Topic = "metrics" + id, _ := AddKWC(kwc) + return id + }, + clean: func(id string) { + DeleteKWC(id) + }, + }, + { + name: "Test List Kafka Writers Empty", + want: KWRespMap{}, + init: func() string { + return "" + }, + clean: func(string) {}, + }, + } + for _, tt := range tests { + id := tt.init() + t.Run(tt.name, func(t *testing.T) { + if got := ListKWC(); !reflect.DeepEqual(got, tt.want) { + t.Errorf("ListKWC() = %v, want %v", got, tt.want) + } + }) + tt.clean(id) + } +} + +func TestCleanup(t *testing.T) { + tests := []struct { + name string + init func() + }{ + { + name: "Test List Kafka Writers Empty", + init: func() { + kwc := NewKWConfig() + kwc.Broker = "localhost:9092" + kwc.Topic = "metrics" + AddKWC(kwc) + AddKWC(kwc) + AddKWC(kwc) + }, + }, + } + for _, tt := range tests { + tt.init() + t.Run(tt.name, func(t *testing.T) { + Cleanup() + }) + assert.Equal(t, 0, len(KWMap)) + } +} diff --git a/vnfs/DAaaS/microservices/prom-kafka-writer/pkg/kafkawriter/producer.go b/vnfs/DAaaS/microservices/prom-kafka-writer/pkg/kafkawriter/producer.go new file mode 100644 index 00000000..42804321 --- /dev/null +++ b/vnfs/DAaaS/microservices/prom-kafka-writer/pkg/kafkawriter/producer.go @@ -0,0 +1,83 @@ +/* + * + * Copyright 2019 Intel Corporation. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package kafkawriter + +import ( + "encoding/json" + "github.com/prometheus/common/model" + "github.com/prometheus/prometheus/prompb" + "gopkg.in/confluentinc/confluent-kafka-go.v1/kafka" + logger "prom-kafka-writer/pkg/config" +) + +var log = logger.GetLoggerInstance() + +func PublishTimeSeries(kwid string, metrics *prompb.WriteRequest) error { + log.Debugw("Remote write Time Series", "length", len(metrics.Timeseries), "TimeSeries", metrics.Timeseries) + for _, ts := range metrics.Timeseries { + m := make(model.Metric, len(ts.Labels)) + for _, l := range ts.Labels { + m[model.LabelName(l.Name)] = model.LabelValue(l.Value) + } + log.Debugw("Labels", "Labelset", m) + + for _, s := range ts.Samples { + log.Debugf(" %f %d\n", s.Value, s.Timestamp) + metric := map[string]interface{}{ + "name": m["__name__"], + "labels": m, + "timestamp": s.Timestamp, + "value": s.Value, + } + key := string(m["__name__"]) + jsonMetric, err := json.Marshal(metric) + if err != nil { + log.Errorw("Marshal error", "error", err.Error()) + continue + } + err = publish(kwid, key, jsonMetric) + if err != nil { + log.Error("Failed to produce message") + return err + } + } + } + return nil +} + +func publish(kwid string, key string, jsonMetric []byte) error { + var ( + kwp = KWMap[kwid].Producer + kwc = KWMap[kwid].Config + ) + + tp := getTopicPartition(kwc) + kwMsg := kafka.Message{TopicPartition: tp, Key: []byte(key), Value: jsonMetric} + err := kwp.Produce(&kwMsg, nil) + if err != nil { + log.Errorw("Kafka Producer Error", "error", err.Error()) + } + return err +} + +func getTopicPartition(kwc KWConfig) kafka.TopicPartition { + p := kafka.PartitionAny + if kwc.UsePartition { + // TODO: Implement partition strategy + p = kafka.PartitionAny + } + return kafka.TopicPartition{Topic: &kwc.Topic, Partition: p} +} -- cgit 1.2.3-korg