diff options
Diffstat (limited to 'vnfs/DAaaS/microservices/prom-kafka-writer/pkg/kafkawriter')
3 files changed, 441 insertions, 0 deletions
diff --git a/vnfs/DAaaS/microservices/prom-kafka-writer/pkg/kafkawriter/kafkawriter.go b/vnfs/DAaaS/microservices/prom-kafka-writer/pkg/kafkawriter/kafkawriter.go new file mode 100644 index 00000000..f56f66aa --- /dev/null +++ b/vnfs/DAaaS/microservices/prom-kafka-writer/pkg/kafkawriter/kafkawriter.go @@ -0,0 +1,129 @@ +/* + * + * Copyright 2019 Intel Corporation. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package kafkawriter + +import ( + "gopkg.in/confluentinc/confluent-kafka-go.v1/kafka" + "strconv" + "sync" +) + +//KWConfig - serialized type for config related to Kafka +type KWConfig struct { + //Broker - Kafka Bootstrap servers (comma separated) + Broker string `json:"bootstrap.servers"` + //Topic - kafka topic name + Topic string `json:"topic"` + //UsePartition - Enforce use of partitions + UsePartition bool `json:"usePartition"` + BatchMsgNum int `json:"batch.num.messages,omitempty"` + Compression string `json:"compression.codec,omitempty"` +} + +//KWProducer - holds the Kafka Config and associated Kafka Producer +type KWProducer struct { + Config KWConfig + Producer *kafka.Producer +} + +//KWRespMap packs the KWConfig and kwid for List Api +type KWRespMap map[string]KWConfig + +//KWMap - Stores the Kafka Writer to Kafka Producer Mapping +// This is used to uniquely identify a Kafka Writer - Producer mapping. +var ( + KWMap = make(map[string]KWProducer) + kwMutex sync.Mutex + id int +) + +// NewKafkaWriter - creates a new producer using kafka config. +// Handles the remote write from prometheus and send to kafka topic +func NewKafkaWriter(kwc *KWConfig) (*kafka.Producer, error) { + producer, err := kafka.NewProducer(&kafka.ConfigMap{ + "bootstrap.servers": kwc.Broker, + "compression.codec": kwc.Compression, + "batch.num.messages": kwc.BatchMsgNum, + "go.batch.producer": true, + "go.delivery.reports": false, + }) + if err != nil { + return nil, err + } + return producer, nil +} + +//NewKWConfig - creates a KWConfig object with default values +func NewKWConfig() *KWConfig { + return &KWConfig{ + UsePartition: false, + BatchMsgNum: 10000, + Compression: "none", + } +} + +//NewKWRespMap - packs the KWConfig and kwid for List Api +func newKWRespMap() KWRespMap { + kwr := make(KWRespMap) + return kwr +} + +//AddKWC - Method to add KafkaWriterConfig request to KWMap +func AddKWC(kwc *KWConfig) (string, error) { + kwMutex.Lock() + defer kwMutex.Unlock() + //TODO: Generate kwid + kwid := "pkw" + strconv.Itoa(id) + id++ + producer, err := NewKafkaWriter(kwc) + if err != nil { + log.Error("Error", err) + id-- + return "", err + } + + KWMap[kwid] = KWProducer{ + Config: *kwc, + Producer: producer, + } + return kwid, nil +} + +//DeleteKWC - Method to add KafkaWriter request to KWMap +func DeleteKWC(kwid string) { + kwMutex.Lock() + defer kwMutex.Unlock() + if _, ok := KWMap[kwid]; ok { + KWMap[kwid].Producer.Close() + } + delete(KWMap, kwid) +} + +//ListKWC - Method to add KafkaWriter request to KWMap +func ListKWC() KWRespMap { + kwr := newKWRespMap() + for k, v := range KWMap { + kwr[k] = v.Config + } + return kwr +} + +//Cleanup - Method to cleanup resources +func Cleanup() { + for k := range KWMap { + DeleteKWC(k) + } +} diff --git a/vnfs/DAaaS/microservices/prom-kafka-writer/pkg/kafkawriter/kafkawriter_test.go b/vnfs/DAaaS/microservices/prom-kafka-writer/pkg/kafkawriter/kafkawriter_test.go new file mode 100644 index 00000000..6869452f --- /dev/null +++ b/vnfs/DAaaS/microservices/prom-kafka-writer/pkg/kafkawriter/kafkawriter_test.go @@ -0,0 +1,229 @@ +/* + * + * Copyright 2019 Intel Corporation. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package kafkawriter + +import ( + "github.com/stretchr/testify/assert" + "reflect" + "testing" +) + +func TestNewKafkaWriter(t *testing.T) { + type args struct { + kwc *KWConfig + } + kwc := NewKWConfig() + kwc.Broker = "localhost:9092" + kwc.Topic = "metrics" + + kwc2 := NewKWConfig() + kwc2.Broker = "localhost:9092" + kwc2.Topic = "metrics" + kwc2.BatchMsgNum = 0 + + tests := []struct { + name string + args args + want interface{} + }{ + { + name: "Test New Kafka Writer", + args: args{kwc}, + want: "rdkafka#producer-1", + }, + { + name: "Test New Kafka Writer Wrong Config", + args: args{kwc2}, + want: nil, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got, _ := NewKafkaWriter(tt.args.kwc) + if tt.want == nil { + assert.Equal(t, tt.want, nil) + } else { + if !reflect.DeepEqual(got.String(), tt.want) { + t.Errorf("NewKafkaWriter() = %v, want %v", got, tt.want) + } + } + }) + } +} + +func TestNewKWConfig(t *testing.T) { + tests := []struct { + name string + want *KWConfig + }{ + { + name: "Test New Kafka Config", + want: &KWConfig{ + UsePartition: false, + BatchMsgNum: 10000, + Compression: "none", + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + if got := NewKWConfig(); !reflect.DeepEqual(got, tt.want) { + t.Errorf("NewKWConfig() = %v, want %v", got, tt.want) + } + }) + } +} + +func TestAddKWC(t *testing.T) { + type args struct { + kwc *KWConfig + } + kwc := NewKWConfig() + kwc.Broker = "localhost:9092" + kwc.Topic = "metrics" + tests := []struct { + name string + args args + want string + }{ + { + name: "Test Add Kafka Writer 1", + args: args{kwc}, + want: "pkw0", + }, + { + name: "Test Add Kafka Writer 2 ", + args: args{kwc}, + want: "pkw1", + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got, _ := AddKWC(tt.args.kwc) + assert.Equal(t, tt.want, got) + + }) + } + assert.Equal(t, 2, len(KWMap)) +} + +func TestDeleteKWC(t *testing.T) { + type args struct { + kwid string + } + tests := []struct { + name string + args args + delcount int + }{ + { + name: "Test Delete Kafka Writer 1", + args: args{"pkw0"}, + delcount: 1, + }, + { + name: "Test Delete Kafka Writer Non existent", + args: args{"pkw3"}, + delcount: 0, + }, + { + name: "Test Delete Kafka Writer 2", + args: args{"pkw1"}, + delcount: 1, + }, + } + for _, tt := range tests { + l := len(KWMap) + t.Run(tt.name, func(t *testing.T) { + DeleteKWC(tt.args.kwid) + }) + assert.Equal(t, l-tt.delcount, len(KWMap)) + } + assert.Equal(t, 0, len(KWMap)) +} + +func TestListKWC(t *testing.T) { + tests := []struct { + name string + init func() string + want KWRespMap + clean func(string) + }{ + { + name: "Test List Kafka Writers Empty", + want: KWRespMap{"pkw2": { + Broker: "localhost:9092", + Topic: "metrics", + UsePartition: false, + BatchMsgNum: 10000, + Compression: "none", + }}, + init: func() string { + kwc := NewKWConfig() + kwc.Broker = "localhost:9092" + kwc.Topic = "metrics" + id, _ := AddKWC(kwc) + return id + }, + clean: func(id string) { + DeleteKWC(id) + }, + }, + { + name: "Test List Kafka Writers Empty", + want: KWRespMap{}, + init: func() string { + return "" + }, + clean: func(string) {}, + }, + } + for _, tt := range tests { + id := tt.init() + t.Run(tt.name, func(t *testing.T) { + if got := ListKWC(); !reflect.DeepEqual(got, tt.want) { + t.Errorf("ListKWC() = %v, want %v", got, tt.want) + } + }) + tt.clean(id) + } +} + +func TestCleanup(t *testing.T) { + tests := []struct { + name string + init func() + }{ + { + name: "Test List Kafka Writers Empty", + init: func() { + kwc := NewKWConfig() + kwc.Broker = "localhost:9092" + kwc.Topic = "metrics" + AddKWC(kwc) + AddKWC(kwc) + AddKWC(kwc) + }, + }, + } + for _, tt := range tests { + tt.init() + t.Run(tt.name, func(t *testing.T) { + Cleanup() + }) + assert.Equal(t, 0, len(KWMap)) + } +} diff --git a/vnfs/DAaaS/microservices/prom-kafka-writer/pkg/kafkawriter/producer.go b/vnfs/DAaaS/microservices/prom-kafka-writer/pkg/kafkawriter/producer.go new file mode 100644 index 00000000..42804321 --- /dev/null +++ b/vnfs/DAaaS/microservices/prom-kafka-writer/pkg/kafkawriter/producer.go @@ -0,0 +1,83 @@ +/* + * + * Copyright 2019 Intel Corporation. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package kafkawriter + +import ( + "encoding/json" + "github.com/prometheus/common/model" + "github.com/prometheus/prometheus/prompb" + "gopkg.in/confluentinc/confluent-kafka-go.v1/kafka" + logger "prom-kafka-writer/pkg/config" +) + +var log = logger.GetLoggerInstance() + +func PublishTimeSeries(kwid string, metrics *prompb.WriteRequest) error { + log.Debugw("Remote write Time Series", "length", len(metrics.Timeseries), "TimeSeries", metrics.Timeseries) + for _, ts := range metrics.Timeseries { + m := make(model.Metric, len(ts.Labels)) + for _, l := range ts.Labels { + m[model.LabelName(l.Name)] = model.LabelValue(l.Value) + } + log.Debugw("Labels", "Labelset", m) + + for _, s := range ts.Samples { + log.Debugf(" %f %d\n", s.Value, s.Timestamp) + metric := map[string]interface{}{ + "name": m["__name__"], + "labels": m, + "timestamp": s.Timestamp, + "value": s.Value, + } + key := string(m["__name__"]) + jsonMetric, err := json.Marshal(metric) + if err != nil { + log.Errorw("Marshal error", "error", err.Error()) + continue + } + err = publish(kwid, key, jsonMetric) + if err != nil { + log.Error("Failed to produce message") + return err + } + } + } + return nil +} + +func publish(kwid string, key string, jsonMetric []byte) error { + var ( + kwp = KWMap[kwid].Producer + kwc = KWMap[kwid].Config + ) + + tp := getTopicPartition(kwc) + kwMsg := kafka.Message{TopicPartition: tp, Key: []byte(key), Value: jsonMetric} + err := kwp.Produce(&kwMsg, nil) + if err != nil { + log.Errorw("Kafka Producer Error", "error", err.Error()) + } + return err +} + +func getTopicPartition(kwc KWConfig) kafka.TopicPartition { + p := kafka.PartitionAny + if kwc.UsePartition { + // TODO: Implement partition strategy + p = kafka.PartitionAny + } + return kafka.TopicPartition{Topic: &kwc.Topic, Partition: p} +} |