diff options
Diffstat (limited to 'vnfs/DAaaS/microservices/prom-kafka-writer/pkg/api/handler_test.go')
-rw-r--r-- | vnfs/DAaaS/microservices/prom-kafka-writer/pkg/api/handler_test.go | 289 |
1 files changed, 289 insertions, 0 deletions
diff --git a/vnfs/DAaaS/microservices/prom-kafka-writer/pkg/api/handler_test.go b/vnfs/DAaaS/microservices/prom-kafka-writer/pkg/api/handler_test.go new file mode 100644 index 00000000..19c4c0ab --- /dev/null +++ b/vnfs/DAaaS/microservices/prom-kafka-writer/pkg/api/handler_test.go @@ -0,0 +1,289 @@ +/* + * + * Copyright 2019 Intel Corporation. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package api + +import ( + "bytes" + "encoding/json" + "errors" + "fmt" + "github.com/golang/protobuf/proto" + "github.com/golang/snappy" + "github.com/prometheus/prometheus/prompb" + "github.com/stretchr/testify/assert" + "io" + "net/http" + "net/http/httptest" + "prom-kafka-writer/pkg/kafkawriter" + "testing" +) + +type errReader int + +func (errReader) Read(p []byte) (n int, err error) { + return 0, errors.New("test error") +} + +func TestCreateKWHandler(t *testing.T) { + tests := []struct { + name string + body io.Reader + expectStatus int + expectResp *kwResponse + }{ + { + name: "Test Create Kafka Writer", + body: bytes.NewBuffer([]byte(`{ + "bootstrap.servers": "kafka-cluster-kafka-bootstrap.kafka.svc.cluster.local:9092", + "topic": "adatopic1", + "usePartition": false, + "compression.codec": "snappy" + }`)), + expectStatus: http.StatusCreated, + expectResp: &kwResponse{ + KWid: "pkw0", + }, + }, + { + name: "Test Create Kafka Writer Wrong parameters", + body: bytes.NewBuffer([]byte(`{ + "servers": "kafka-cluster-kafka-bootstrap.kafka.svc.cluster.local:9092", + "kafkatopic": "adatopic1", + "usePartition": false, + "compression.codec": "snappy" + }`)), + expectStatus: http.StatusUnprocessableEntity, + expectResp: &kwResponse{}, + }, + { + name: "Test Create Kafka Writer Empty Body", + body: bytes.NewBuffer([]byte(nil)), + expectStatus: http.StatusBadRequest, + expectResp: &kwResponse{}, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + req := httptest.NewRequest("POST", "/pkw", tt.body) + rec := httptest.NewRecorder() + r := NewRouter() + r.ServeHTTP(rec, req) + resp := rec.Result() + assert.Equal(t, tt.expectStatus, resp.StatusCode) + kwResp := &kwResponse{} + json.NewDecoder(resp.Body).Decode(&kwResp) + assert.Equal(t, tt.expectResp, kwResp) + }) + } +} + +func TestListKWHandler(t *testing.T) { + + tests := []struct { + name string + body string + expectStatus int + expectResp *kwResponse + }{ + { + name: "Test List Kafka Writers", + body: `{ + "bootstrap.servers": "kafka-cluster-kafka-bootstrap.kafka.svc.cluster.local:9092", + "topic": "adatopic1", + "usePartition": false, + "batch.num.messages": 10000, + "compression.codec": "snappy" + }`, + expectStatus: http.StatusOK, + expectResp: &kwResponse{ + KWCRespMap: map[string]kafkawriter.KWConfig{ + "pkw0": { + Broker: "kafka-cluster-kafka-bootstrap.kafka.svc.cluster.local:9092", + Topic: "adatopic1", + UsePartition: false, + BatchMsgNum: 10000, + Compression: "snappy", + }, + }, + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + preCreateKW("pkw0", tt.body) + req := httptest.NewRequest("GET", "/pkw", nil) + rec := httptest.NewRecorder() + r := NewRouter() + r.ServeHTTP(rec, req) + resp := rec.Result() + assert.Equal(t, tt.expectStatus, resp.StatusCode) + kwResp := &kwResponse{} + json.NewDecoder(resp.Body).Decode(&kwResp) + assert.Equal(t, tt.expectResp, kwResp) + }) + } +} + +func TestDeleteKWHandler(t *testing.T) { + tests := []struct { + name string + kwid string + expectStatus int + }{ + { + name: "Test Delete Kafka Writer", + kwid: "pkw777", + expectStatus: http.StatusOK, + }, + } + body := `{ + "bootstrap.servers": "kafka-cluster-kafka-bootstrap.kafka.svc.cluster.local:9092", + "topic": "adatopic1", + "usePartition": false, + "batch.num.messages": 10000, + "compression.codec": "snappy" + }` + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + preCreateKW(tt.kwid, body) + target := fmt.Sprintf("/pkw/%s", tt.kwid) + req := httptest.NewRequest("DELETE", target, nil) + r := NewRouter() + rec := httptest.NewRecorder() + r.ServeHTTP(rec, req) + resp := rec.Result() + assert.Equal(t, tt.expectStatus, resp.StatusCode) + }) + } +} + +func preCreateKW(kwid string, body string) { + kafkawriter.Cleanup() + k := []byte(body) + kwc := &kafkawriter.KWConfig{} + _ = json.Unmarshal(k, kwc) + producer, _ := kafkawriter.NewKafkaWriter(kwc) + kafkawriter.KWMap[kwid] = kafkawriter.KWProducer{Config: *kwc, Producer: producer} +} + +func TestReceiveKWHandler(t *testing.T) { + f, err := buildRemoteWriteRequest() + if err != nil { + t.Fatal("Could not build prompb.WriteRequest") + } + tests := []struct { + name string + kwid string + body io.Reader + preCreate bool + expectStatus int + }{ + { + name: "Test Receive Messages Empty Message", + kwid: "pkw111", + preCreate: true, + expectStatus: http.StatusBadRequest, + }, + { + name: "Test Receive Messages", + kwid: "pkw111", + preCreate: true, + body: bytes.NewReader(f), + expectStatus: http.StatusOK, + }, + { + name: "Test Receive Messages Kafka Writer Not registed", + kwid: "pkw222", + preCreate: false, + expectStatus: http.StatusNotFound, + }, + { + name: "Test Receive Messages Kafka Writer Not registed", + kwid: "pkw111", + preCreate: true, + body: errReader(0), + expectStatus: http.StatusInternalServerError, + }, + } + body := `{ + "bootstrap.servers": "kafka-cluster-kafka-bootstrap.kafka.svc.cluster.local:9092", + "topic": "adatopic1", + "usePartition": false, + "batch.num.messages": 10000, + "compression.codec": "snappy" + }` + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + if tt.preCreate { + preCreateKW(tt.kwid, body) + } + target := fmt.Sprintf("/pkw/%s/receive", tt.kwid) + req := httptest.NewRequest("POST", target, tt.body) + r := NewRouter() + rec := httptest.NewRecorder() + r.ServeHTTP(rec, req) + resp := rec.Result() + assert.Equal(t, tt.expectStatus, resp.StatusCode) + }) + } +} + +func buildRemoteWriteRequest() ([]byte, error) { + var buf []byte + samples := []*prompb.TimeSeries{ + &prompb.TimeSeries{ + Labels: []*prompb.Label{ + &prompb.Label{Name: "__name__", Value: "go_gc_duration_seconds_count"}, + &prompb.Label{Name: "endpoint", Value: "http"}, + &prompb.Label{Name: "instance", Value: "10.42.1.101:8686"}, + &prompb.Label{Name: "job", Value: "prom-kafka-writer"}, + &prompb.Label{Name: "metrics_storage", Value: "kafka_remote"}, + &prompb.Label{Name: "namespace", Value: "edge1"}, + &prompb.Label{Name: "pod", Value: "prom-kafka-writer-696898f47f-bc5fs"}, + &prompb.Label{Name: "prometheus", Value: "edge1/cp-prometheus-prometheus"}, + &prompb.Label{Name: "prometheus_replica", Value: "prometheus-cp-prometheus-prometheus-0"}, + &prompb.Label{Name: "service", Value: "prom-kafka-writer"}, + }, + Samples: []prompb.Sample{ + prompb.Sample{ + Value: 17, + Timestamp: 1572479934007, + }, + prompb.Sample{ + Value: 19, + Timestamp: 1572480144007, + }, + }, + }, + } + req := &prompb.WriteRequest{ + Timeseries: samples, + } + + data, err := proto.Marshal(req) + if err != nil { + return nil, err + } + + // snappy uses len() to see if it needs to allocate a new slice. Make the + // buffer as long as possible. + if buf != nil { + buf = buf[0:cap(buf)] + } + compressed := snappy.Encode(buf, data) + return compressed, nil +} |