diff options
Diffstat (limited to 'vnfs/DAaaS/microservices/prom-kafka-writer/pkg/api')
3 files changed, 465 insertions, 0 deletions
diff --git a/vnfs/DAaaS/microservices/prom-kafka-writer/pkg/api/handler.go b/vnfs/DAaaS/microservices/prom-kafka-writer/pkg/api/handler.go new file mode 100644 index 00000000..d7a2b898 --- /dev/null +++ b/vnfs/DAaaS/microservices/prom-kafka-writer/pkg/api/handler.go @@ -0,0 +1,143 @@ +/* + * + * Copyright 2019 Intel Corporation. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package api + +import ( + "encoding/json" + "errors" + "io" + "io/ioutil" + "net/http" + + logger "prom-kafka-writer/pkg/config" + kw "prom-kafka-writer/pkg/kafkawriter" + + "github.com/golang/protobuf/proto" + "github.com/golang/snappy" + "github.com/gorilla/mux" + "github.com/prometheus/prometheus/prompb" +) + +type kwResponse struct { + KWid string `json:"kwid,omitempty"` + KWCRespMap kw.KWRespMap `json:"kafkaWriterConfigs,omitempty"` +} + +var log = logger.GetLoggerInstance() + +// CreateKWHandler - Creates and starts a Prometheus to Kafka writer +func CreateKWHandler(w http.ResponseWriter, r *http.Request) { + log.Infow("Received request for Creating Kafka Writer") + kwConfig := kw.NewKWConfig() + dec := json.NewDecoder(r.Body) + dec.DisallowUnknownFields() + err := dec.Decode(kwConfig) + switch { + case err == io.EOF: + http.Error(w, "Body empty", http.StatusBadRequest) + return + case err != nil: + http.Error(w, err.Error(), http.StatusUnprocessableEntity) + return + } + kwid, err := kw.AddKWC(kwConfig) + if err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + + //Send response back + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(http.StatusCreated) + kwResp := kwResponse{ + KWid: kwid, + } + err = json.NewEncoder(w).Encode(kwResp) + if err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } +} + +// ListKWHandler - Lists the KafkaWriters and its config +func ListKWHandler(w http.ResponseWriter, r *http.Request) { + log.Infow("Received request for List Kafka Writers", "url", r.URL) + res := kw.ListKWC() + //Send response back + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(http.StatusOK) + kwResp := kwResponse{ + KWCRespMap: res, + } + err := json.NewEncoder(w).Encode(kwResp) + if err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } +} + +// DeleteKWHandler - Deletes a given Prometheus to Kafka writer +func DeleteKWHandler(w http.ResponseWriter, r *http.Request) { + params := mux.Vars(r) + log.Infow("Received request for Deleting Kafka Writer", "KWID", params["kwid"]) + kw.DeleteKWC(params["kwid"]) + + //Send response back + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(http.StatusOK) +} + +// ReceiveKWHandler - Publish metrics from Prometheus to Kafka +func ReceiveKWHandler(w http.ResponseWriter, r *http.Request) { + params := mux.Vars(r) + kwid := params["kwid"] + if _, ok := kw.KWMap[kwid]; !ok { + notRegisteredErr := errors.New("kafka writer not registered").Error() + log.Error(notRegisteredErr) + http.Error(w, notRegisteredErr, http.StatusNotFound) + return + } + log.Infow("Produce message on Kafka Writer", "kwid", kwid) + + compressed, err := ioutil.ReadAll(r.Body) + defer r.Body.Close() + if err != nil { + log.Error("error", err.Error()) + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + + metricBuf, err := snappy.Decode(nil, compressed) + if err != nil { + log.Error("error", err.Error()) + http.Error(w, err.Error(), http.StatusBadRequest) + return + } + + var metrics prompb.WriteRequest + if err := proto.Unmarshal(metricBuf, &metrics); err != nil { + log.Error("error", err.Error()) + http.Error(w, err.Error(), http.StatusBadRequest) + return + } + + err = kw.PublishTimeSeries(kwid, &metrics) + if err != nil { + log.Error("error", err.Error()) + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } +} diff --git a/vnfs/DAaaS/microservices/prom-kafka-writer/pkg/api/handler_test.go b/vnfs/DAaaS/microservices/prom-kafka-writer/pkg/api/handler_test.go new file mode 100644 index 00000000..19c4c0ab --- /dev/null +++ b/vnfs/DAaaS/microservices/prom-kafka-writer/pkg/api/handler_test.go @@ -0,0 +1,289 @@ +/* + * + * Copyright 2019 Intel Corporation. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package api + +import ( + "bytes" + "encoding/json" + "errors" + "fmt" + "github.com/golang/protobuf/proto" + "github.com/golang/snappy" + "github.com/prometheus/prometheus/prompb" + "github.com/stretchr/testify/assert" + "io" + "net/http" + "net/http/httptest" + "prom-kafka-writer/pkg/kafkawriter" + "testing" +) + +type errReader int + +func (errReader) Read(p []byte) (n int, err error) { + return 0, errors.New("test error") +} + +func TestCreateKWHandler(t *testing.T) { + tests := []struct { + name string + body io.Reader + expectStatus int + expectResp *kwResponse + }{ + { + name: "Test Create Kafka Writer", + body: bytes.NewBuffer([]byte(`{ + "bootstrap.servers": "kafka-cluster-kafka-bootstrap.kafka.svc.cluster.local:9092", + "topic": "adatopic1", + "usePartition": false, + "compression.codec": "snappy" + }`)), + expectStatus: http.StatusCreated, + expectResp: &kwResponse{ + KWid: "pkw0", + }, + }, + { + name: "Test Create Kafka Writer Wrong parameters", + body: bytes.NewBuffer([]byte(`{ + "servers": "kafka-cluster-kafka-bootstrap.kafka.svc.cluster.local:9092", + "kafkatopic": "adatopic1", + "usePartition": false, + "compression.codec": "snappy" + }`)), + expectStatus: http.StatusUnprocessableEntity, + expectResp: &kwResponse{}, + }, + { + name: "Test Create Kafka Writer Empty Body", + body: bytes.NewBuffer([]byte(nil)), + expectStatus: http.StatusBadRequest, + expectResp: &kwResponse{}, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + req := httptest.NewRequest("POST", "/pkw", tt.body) + rec := httptest.NewRecorder() + r := NewRouter() + r.ServeHTTP(rec, req) + resp := rec.Result() + assert.Equal(t, tt.expectStatus, resp.StatusCode) + kwResp := &kwResponse{} + json.NewDecoder(resp.Body).Decode(&kwResp) + assert.Equal(t, tt.expectResp, kwResp) + }) + } +} + +func TestListKWHandler(t *testing.T) { + + tests := []struct { + name string + body string + expectStatus int + expectResp *kwResponse + }{ + { + name: "Test List Kafka Writers", + body: `{ + "bootstrap.servers": "kafka-cluster-kafka-bootstrap.kafka.svc.cluster.local:9092", + "topic": "adatopic1", + "usePartition": false, + "batch.num.messages": 10000, + "compression.codec": "snappy" + }`, + expectStatus: http.StatusOK, + expectResp: &kwResponse{ + KWCRespMap: map[string]kafkawriter.KWConfig{ + "pkw0": { + Broker: "kafka-cluster-kafka-bootstrap.kafka.svc.cluster.local:9092", + Topic: "adatopic1", + UsePartition: false, + BatchMsgNum: 10000, + Compression: "snappy", + }, + }, + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + preCreateKW("pkw0", tt.body) + req := httptest.NewRequest("GET", "/pkw", nil) + rec := httptest.NewRecorder() + r := NewRouter() + r.ServeHTTP(rec, req) + resp := rec.Result() + assert.Equal(t, tt.expectStatus, resp.StatusCode) + kwResp := &kwResponse{} + json.NewDecoder(resp.Body).Decode(&kwResp) + assert.Equal(t, tt.expectResp, kwResp) + }) + } +} + +func TestDeleteKWHandler(t *testing.T) { + tests := []struct { + name string + kwid string + expectStatus int + }{ + { + name: "Test Delete Kafka Writer", + kwid: "pkw777", + expectStatus: http.StatusOK, + }, + } + body := `{ + "bootstrap.servers": "kafka-cluster-kafka-bootstrap.kafka.svc.cluster.local:9092", + "topic": "adatopic1", + "usePartition": false, + "batch.num.messages": 10000, + "compression.codec": "snappy" + }` + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + preCreateKW(tt.kwid, body) + target := fmt.Sprintf("/pkw/%s", tt.kwid) + req := httptest.NewRequest("DELETE", target, nil) + r := NewRouter() + rec := httptest.NewRecorder() + r.ServeHTTP(rec, req) + resp := rec.Result() + assert.Equal(t, tt.expectStatus, resp.StatusCode) + }) + } +} + +func preCreateKW(kwid string, body string) { + kafkawriter.Cleanup() + k := []byte(body) + kwc := &kafkawriter.KWConfig{} + _ = json.Unmarshal(k, kwc) + producer, _ := kafkawriter.NewKafkaWriter(kwc) + kafkawriter.KWMap[kwid] = kafkawriter.KWProducer{Config: *kwc, Producer: producer} +} + +func TestReceiveKWHandler(t *testing.T) { + f, err := buildRemoteWriteRequest() + if err != nil { + t.Fatal("Could not build prompb.WriteRequest") + } + tests := []struct { + name string + kwid string + body io.Reader + preCreate bool + expectStatus int + }{ + { + name: "Test Receive Messages Empty Message", + kwid: "pkw111", + preCreate: true, + expectStatus: http.StatusBadRequest, + }, + { + name: "Test Receive Messages", + kwid: "pkw111", + preCreate: true, + body: bytes.NewReader(f), + expectStatus: http.StatusOK, + }, + { + name: "Test Receive Messages Kafka Writer Not registed", + kwid: "pkw222", + preCreate: false, + expectStatus: http.StatusNotFound, + }, + { + name: "Test Receive Messages Kafka Writer Not registed", + kwid: "pkw111", + preCreate: true, + body: errReader(0), + expectStatus: http.StatusInternalServerError, + }, + } + body := `{ + "bootstrap.servers": "kafka-cluster-kafka-bootstrap.kafka.svc.cluster.local:9092", + "topic": "adatopic1", + "usePartition": false, + "batch.num.messages": 10000, + "compression.codec": "snappy" + }` + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + if tt.preCreate { + preCreateKW(tt.kwid, body) + } + target := fmt.Sprintf("/pkw/%s/receive", tt.kwid) + req := httptest.NewRequest("POST", target, tt.body) + r := NewRouter() + rec := httptest.NewRecorder() + r.ServeHTTP(rec, req) + resp := rec.Result() + assert.Equal(t, tt.expectStatus, resp.StatusCode) + }) + } +} + +func buildRemoteWriteRequest() ([]byte, error) { + var buf []byte + samples := []*prompb.TimeSeries{ + &prompb.TimeSeries{ + Labels: []*prompb.Label{ + &prompb.Label{Name: "__name__", Value: "go_gc_duration_seconds_count"}, + &prompb.Label{Name: "endpoint", Value: "http"}, + &prompb.Label{Name: "instance", Value: "10.42.1.101:8686"}, + &prompb.Label{Name: "job", Value: "prom-kafka-writer"}, + &prompb.Label{Name: "metrics_storage", Value: "kafka_remote"}, + &prompb.Label{Name: "namespace", Value: "edge1"}, + &prompb.Label{Name: "pod", Value: "prom-kafka-writer-696898f47f-bc5fs"}, + &prompb.Label{Name: "prometheus", Value: "edge1/cp-prometheus-prometheus"}, + &prompb.Label{Name: "prometheus_replica", Value: "prometheus-cp-prometheus-prometheus-0"}, + &prompb.Label{Name: "service", Value: "prom-kafka-writer"}, + }, + Samples: []prompb.Sample{ + prompb.Sample{ + Value: 17, + Timestamp: 1572479934007, + }, + prompb.Sample{ + Value: 19, + Timestamp: 1572480144007, + }, + }, + }, + } + req := &prompb.WriteRequest{ + Timeseries: samples, + } + + data, err := proto.Marshal(req) + if err != nil { + return nil, err + } + + // snappy uses len() to see if it needs to allocate a new slice. Make the + // buffer as long as possible. + if buf != nil { + buf = buf[0:cap(buf)] + } + compressed := snappy.Encode(buf, data) + return compressed, nil +} diff --git a/vnfs/DAaaS/microservices/prom-kafka-writer/pkg/api/router.go b/vnfs/DAaaS/microservices/prom-kafka-writer/pkg/api/router.go new file mode 100644 index 00000000..fb78afe2 --- /dev/null +++ b/vnfs/DAaaS/microservices/prom-kafka-writer/pkg/api/router.go @@ -0,0 +1,33 @@ +/* + * + * Copyright 2019 Intel Corporation. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package api + +import ( + "github.com/gorilla/mux" + "github.com/prometheus/client_golang/prometheus/promhttp" +) + +func NewRouter() *mux.Router { + r := mux.NewRouter() + r.HandleFunc("/pkw", ListKWHandler).Methods("GET") + r.HandleFunc("/pkw", CreateKWHandler).Methods("POST") + r.HandleFunc("/pkw/{kwid}", DeleteKWHandler).Methods("DELETE") + r.HandleFunc("/pkw/{kwid}/receive", ReceiveKWHandler).Methods("POST") + + // Metrics Handler for prom-kafka-writer + r.Handle("/metrics", promhttp.Handler()) + return r +} |