diff options
Diffstat (limited to 'vnfs/DAaaS/microservices/prom-kafka-writer/pkg/api/handler.go')
-rw-r--r-- | vnfs/DAaaS/microservices/prom-kafka-writer/pkg/api/handler.go | 143 |
1 files changed, 143 insertions, 0 deletions
diff --git a/vnfs/DAaaS/microservices/prom-kafka-writer/pkg/api/handler.go b/vnfs/DAaaS/microservices/prom-kafka-writer/pkg/api/handler.go new file mode 100644 index 00000000..d7a2b898 --- /dev/null +++ b/vnfs/DAaaS/microservices/prom-kafka-writer/pkg/api/handler.go @@ -0,0 +1,143 @@ +/* + * + * Copyright 2019 Intel Corporation. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package api + +import ( + "encoding/json" + "errors" + "io" + "io/ioutil" + "net/http" + + logger "prom-kafka-writer/pkg/config" + kw "prom-kafka-writer/pkg/kafkawriter" + + "github.com/golang/protobuf/proto" + "github.com/golang/snappy" + "github.com/gorilla/mux" + "github.com/prometheus/prometheus/prompb" +) + +type kwResponse struct { + KWid string `json:"kwid,omitempty"` + KWCRespMap kw.KWRespMap `json:"kafkaWriterConfigs,omitempty"` +} + +var log = logger.GetLoggerInstance() + +// CreateKWHandler - Creates and starts a Prometheus to Kafka writer +func CreateKWHandler(w http.ResponseWriter, r *http.Request) { + log.Infow("Received request for Creating Kafka Writer") + kwConfig := kw.NewKWConfig() + dec := json.NewDecoder(r.Body) + dec.DisallowUnknownFields() + err := dec.Decode(kwConfig) + switch { + case err == io.EOF: + http.Error(w, "Body empty", http.StatusBadRequest) + return + case err != nil: + http.Error(w, err.Error(), http.StatusUnprocessableEntity) + return + } + kwid, err := kw.AddKWC(kwConfig) + if err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + + //Send response back + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(http.StatusCreated) + kwResp := kwResponse{ + KWid: kwid, + } + err = json.NewEncoder(w).Encode(kwResp) + if err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } +} + +// ListKWHandler - Lists the KafkaWriters and its config +func ListKWHandler(w http.ResponseWriter, r *http.Request) { + log.Infow("Received request for List Kafka Writers", "url", r.URL) + res := kw.ListKWC() + //Send response back + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(http.StatusOK) + kwResp := kwResponse{ + KWCRespMap: res, + } + err := json.NewEncoder(w).Encode(kwResp) + if err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } +} + +// DeleteKWHandler - Deletes a given Prometheus to Kafka writer +func DeleteKWHandler(w http.ResponseWriter, r *http.Request) { + params := mux.Vars(r) + log.Infow("Received request for Deleting Kafka Writer", "KWID", params["kwid"]) + kw.DeleteKWC(params["kwid"]) + + //Send response back + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(http.StatusOK) +} + +// ReceiveKWHandler - Publish metrics from Prometheus to Kafka +func ReceiveKWHandler(w http.ResponseWriter, r *http.Request) { + params := mux.Vars(r) + kwid := params["kwid"] + if _, ok := kw.KWMap[kwid]; !ok { + notRegisteredErr := errors.New("kafka writer not registered").Error() + log.Error(notRegisteredErr) + http.Error(w, notRegisteredErr, http.StatusNotFound) + return + } + log.Infow("Produce message on Kafka Writer", "kwid", kwid) + + compressed, err := ioutil.ReadAll(r.Body) + defer r.Body.Close() + if err != nil { + log.Error("error", err.Error()) + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + + metricBuf, err := snappy.Decode(nil, compressed) + if err != nil { + log.Error("error", err.Error()) + http.Error(w, err.Error(), http.StatusBadRequest) + return + } + + var metrics prompb.WriteRequest + if err := proto.Unmarshal(metricBuf, &metrics); err != nil { + log.Error("error", err.Error()) + http.Error(w, err.Error(), http.StatusBadRequest) + return + } + + err = kw.PublishTimeSeries(kwid, &metrics) + if err != nil { + log.Error("error", err.Error()) + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } +} |