diff options
author | Dileep Ranganathan <dileep.ranganathan@intel.com> | 2019-10-31 15:46:37 -0700 |
---|---|---|
committer | Dileep Ranganathan <dileep.ranganathan@intel.com> | 2019-11-11 14:39:44 -0800 |
commit | d11cb99fb101a6798fd57fa44d332737d6637e75 (patch) | |
tree | 0d7ae93ac67186a9bd4dd81db85eab9ccd64334f /vnfs | |
parent | 68d118176bb53c36b31a7060cfa16ad5acac1765 (diff) |
Prometheus Kafka Writer Microservice
This patch implents Prometheus to remote Kafka Writer Microservice
Added sample day-2 config to configure prometheus
PS4: Fixed Review comments
Issue-ID: ONAPARC-393
Signed-off-by: Dileep Ranganathan <dileep.ranganathan@intel.com>
Change-Id: I0bc77175593a165effd7bb1cb4802c988a5ef4ec
Diffstat (limited to 'vnfs')
20 files changed, 1582 insertions, 0 deletions
diff --git a/vnfs/DAaaS/deploy/day2_configs/prometheus/add_remote_write_kafka.yaml b/vnfs/DAaaS/deploy/day2_configs/prometheus/add_remote_write_kafka.yaml new file mode 100644 index 00000000..090e2a1e --- /dev/null +++ b/vnfs/DAaaS/deploy/day2_configs/prometheus/add_remote_write_kafka.yaml @@ -0,0 +1,10 @@ +spec: + remoteWrite: + - url: "http://prom-kafka-writer.edge1.svc.cluster.local:8686/pkw/pkw0/receive" + writeRelabelConfigs: + - targetLabel: metrics_storage + replacement: kafka_remote + - action: keep + regex: go_gc_duration_seconds_count + sourceLabels: + - __name__ diff --git a/vnfs/DAaaS/microservices/prom-kafka-writer/.gitignore b/vnfs/DAaaS/microservices/prom-kafka-writer/.gitignore new file mode 100644 index 00000000..ec6f4674 --- /dev/null +++ b/vnfs/DAaaS/microservices/prom-kafka-writer/.gitignore @@ -0,0 +1,99 @@ +# Common +.DS_Store +.vscode +*-workspace +.tox/ +.*.swp +*.log +coverage.html +docs/build +*.so + +# Tests +*.test +*.out + +# # Directories +target +vendor +src/github.com +src/golang.org + +# Temporary Build Files +build/_output +build/_test +# Created by https://www.gitignore.io/api/go,vim,emacs,visualstudiocode +### Emacs ### +# -*- mode: gitignore; -*- +*~ +\#*\# +/.emacs.desktop +/.emacs.desktop.lock +*.elc +auto-save-list +tramp +.\#* +# Org-mode +.org-id-locations +*_archive +# flymake-mode +*_flymake.* +# eshell files +/eshell/history +/eshell/lastdir +# elpa packages +/elpa/ +# reftex files +*.rel +# AUCTeX auto folder +/auto/ +# cask packages +.cask/ +dist/ +# Flycheck +flycheck_*.el +# server auth directory +/server/ +# projectiles files +.projectile +projectile-bookmarks.eld +# directory configuration +.dir-locals.el +# saveplace +places +# url cache +url/cache/ +# cedet +ede-projects.el +# smex +smex-items +# company-statistics +company-statistics-cache.el +# anaconda-mode +anaconda-mode/ +### Go ### +# Binaries for programs and plugins +*.exe +*.exe~ +*.dll +*.so +*.dylib +# Test binary, build with 'go test -c' +*.test +# Output of the go coverage tool, specifically when used with LiteIDE +*.out +### Vim ### +# swap +.sw[a-p] +.*.sw[a-p] +# session +Session.vim +# temporary +.netrwhist +# auto-generated tag files +tags +### VisualStudioCode ### +.vscode/* +.history +# End of https://www.gitignore.io/api/go,vim,emacs,visualstudiocode +bin
\ No newline at end of file diff --git a/vnfs/DAaaS/microservices/prom-kafka-writer/Dockerfile b/vnfs/DAaaS/microservices/prom-kafka-writer/Dockerfile new file mode 100644 index 00000000..dc8f5fe2 --- /dev/null +++ b/vnfs/DAaaS/microservices/prom-kafka-writer/Dockerfile @@ -0,0 +1,36 @@ +# Use base golang image from Docker Hub +FROM golang:1.12.7 + +# Download the dlv (delve) debugger for go (you can comment this out if unused) +RUN go get -u -v github.com/go-delve/delve/cmd/dlv + +WORKDIR /src/prom-kafka-writer + +RUN mkdir /librdkafka-dir && cd /librdkafka-dir +RUN git clone https://github.com/edenhill/librdkafka.git && \ +cd librdkafka && \ +./configure --prefix /usr && \ +make && \ +make install + +# Install dependencies in go.mod and go.sum +COPY go.mod go.sum ./ +RUN go mod download + +# Copy rest of the application source code +COPY . ./ + +# Compile the application to /app. +RUN go build -o ./bin/prom-kafka-writer -v ./cmd/prom-kafka-writer + +# If you want to use the debugger, you need to modify the entrypoint to the +# container and point it to the "dlv debug" command: +# * UNCOMMENT the following ENTRYPOINT statement, +# * COMMENT OUT the last ENTRYPOINT statement +# Start the "dlv debug" server on port 3000 of the container. +ENTRYPOINT ["dlv", "exec", "./bin/prom-kafka-writer", "--continue", "--accept-multiclient", "--api-version=2", "--headless", "--listen=:3000", "--log"] + +# If you want to run WITHOUT the debugging server: +# * COMMENT OUT the previous ENTRYPOINT statements, +# * UNCOMMENT the following ENTRYPOINT statement. +# ENTRYPOINT ["/prom-kafka-writer"] diff --git a/vnfs/DAaaS/microservices/prom-kafka-writer/Dockerfile.ci b/vnfs/DAaaS/microservices/prom-kafka-writer/Dockerfile.ci new file mode 100644 index 00000000..a135dbbd --- /dev/null +++ b/vnfs/DAaaS/microservices/prom-kafka-writer/Dockerfile.ci @@ -0,0 +1,43 @@ +# Use base golang image from Docker Hub +FROM golang:1.12.7 as builder + +# Download the dlv (delve) debugger for go (you can comment this out if unused) +RUN go get -u -v github.com/go-delve/delve/cmd/dlv + +WORKDIR /src/prom-kafka-writer + +RUN mkdir /librdkafka-dir && cd /librdkafka-dir +RUN git clone https://github.com/edenhill/librdkafka.git && \ +cd librdkafka && \ +./configure --prefix /usr && \ +make && \ +make install + +# Install dependencies in go.mod and go.sum +COPY go.mod go.sum ./ +RUN go mod download + +# Copy rest of the application source code +COPY . ./ + +# Compile the application to /app. +RUN go build -o ./bin/prom-kafka-writer -v ./cmd/prom-kafka-writer + +# If you want to use the debugger, you need to modify the entrypoint to the +# container and point it to the "dlv debug" command: +# * UNCOMMENT the following ENTRYPOINT statement, +# * COMMENT OUT the last ENTRYPOINT statement +# Start the "dlv debug" server on port 3000 of the container. +#ENTRYPOINT ["dlv", "exec", "./bin/prom-kafka-writer", "--continue", "--accept-multiclient", "--api-version=2", "--headless", "--listen=:3000", "--log"] + +# If you want to run WITHOUT the debugging server: +# * COMMENT OUT the previous ENTRYPOINT statements, +# * UNCOMMENT the following ENTRYPOINT statement. + +# final stage +FROM ubuntu:18.04 +COPY --from=builder /usr/lib/pkgconfig /usr/lib/pkgconfig +COPY --from=builder /usr/lib/librdkafka* /usr/lib/ +COPY --from=builder //src/prom-kafka-writer/* /prom-kafka-writer/ +WORKDIR /prom-kafka-writer +ENTRYPOINT ["./bin/prom-kafka-writer"] diff --git a/vnfs/DAaaS/microservices/prom-kafka-writer/Makefile b/vnfs/DAaaS/microservices/prom-kafka-writer/Makefile new file mode 100644 index 00000000..5d8ba317 --- /dev/null +++ b/vnfs/DAaaS/microservices/prom-kafka-writer/Makefile @@ -0,0 +1,73 @@ +# SPDX-license-identifier: Apache-2.0 +############################################################################## +# Copyright (c) 2019 Intel Corporation +# All rights reserved. This program and the accompanying materials +# are made available under the terms of the Apache License, Version 2.0 +# which accompanies this distribution, and is available at +# http://www.apache.org/licenses/LICENSE-2.0 +############################################################################## + +# Common +VERSION := $(shell git describe --tags) +BUILD := $(shell git rev-parse --short HEAD) +PROJECTNAME := $(shell basename "$(PWD)") + +ROOTPATH := $(shell realpath "$(PWD)/../") +PKA := prom-kafka-writer +ifndef IMAGE_NAME +override IMAGE_NAME := dcr.cluster.local:32644/prom-kafka-writer:latest +endif + +export GO111MODULE=on + +.PHONY: clean plugins + +## build: Build the binary for prom-kafka-writer +build: clean format + GOOS=linux GOARCH=amd64 + @go mod download + @go build -o ./bin/prom-kafka-writer -v ./cmd/prom-kafka-writer + +## all: Delete the image, binary, complete build, test and run coverage +all: build test cover +## debug: Build local binary for debugging +debug: + @echo "Use Cloud Code VSCode plugin or Skaffold to debug" + +## deploy: Build Dockerfile and publish to repository +deploy: build test publish + +## publish: Push docker image to repository +publish: + @docker push ${IMAGE_NAME} +.PHONY: test +## test: run tests +test: clean + @go test -v ./... +## format: format source code +format: + @go fmt ./... + +## clean: clean build artifacts, image, binary +clean: format + @echo "Deleting the prom-kafka-writer binary" + @rm -rf ${PKA} + @echo "Deleting the prom-kafka-writer docker image" + @docker 2>/dev/null rmi ${IMAGE_NAME} | true + +.PHONY: cover +## cover: run tests and generate coverage report +cover: + @go test ./... -coverprofile=coverage.out + @go tool cover -html=coverage.out -o coverage.html + +.PHONY: help +## help: Print help message +help: Makefile + @echo + @echo " Requires librdkafka v1.1.0 or later, go1.12.5+" + @echo + @echo " Choose a command run in "$(PROJECTNAME)":" + @echo + @sed -n 's/^## //p' $< | column -t -s ':' | sed -e 's/^/ /' + @echo diff --git a/vnfs/DAaaS/microservices/prom-kafka-writer/README.md b/vnfs/DAaaS/microservices/prom-kafka-writer/README.md new file mode 100644 index 00000000..8d981e1d --- /dev/null +++ b/vnfs/DAaaS/microservices/prom-kafka-writer/README.md @@ -0,0 +1,66 @@ +# Prometheus Kafka Remote Writer + +![Architecture Diagram](./diagram.png) + +Prometheus Kafka remote writer is a remote writer/adapter to publish metrics from prometheus to a remote storage like kafka. +The prom-kafka-writer microservice enables to register the remote endpoint and configures the kafka producer endpoint. +Prometheus can then be configured to set the remote write to this endpoint and the metrics can be published. + +---- +## Install +### Build docker images +```bash +git clone https://github.com/onap/demo.git +DA_WORKING_DIR=$PWD/demo/vnfs/DAaaS +cd $DA_WORKING_DIR/microservices/prom-kafka-writer +docker build -f Dockerfile.ci . -t prom-kafka-writer:latest +``` +### Deploy using Helm chart +```bash +cd $DA_WORKING_DIR/deploy/collection/charts/prom-kafka-writer + +# Modify the values.yaml +helm install -n pkw . -f values.yaml --namespace=edge1 +``` +Verify the deployment +```bash +kubectl get pods -n edge1 | grep prom-kafka-writer + +NAME READY STATUS RESTARTS AGE +pkw-prom-kafka-writer-c6c89c579-jv6jt 1/1 Running 0 11m +``` + +## API +* Create Prometheus Kafka Writer endpoint + +```bash + //Request + curl -X POST \ + http://prom-kafka-writer/pkw \ + -H 'Content-Type: application/json' \ + -d '{ + "bootstrap.servers": "kafka-cluster-kafka-bootstrap.kafka.svc.cluster.local:9091", + "topic": "topic1", + "usePartition": false, + "compression.codec": "snappy" + }' +``` + +* Delete Prometheus Kafka Writer endpoint + +```bash + //Request + curl -X DELETE http://prom-kafka-writer/pkw/pkw0 +``` + +* List Prometheus Kafka Writers +```bash + //Request + curl -X GET http://prom-kafka-writer/pkw +``` + +* Publish to Kafka Writer +```bash + //Request + curl -X POST http://prom-kafka-writer/pkw/pkw0/receive +``` diff --git a/vnfs/DAaaS/microservices/prom-kafka-writer/cmd/prom-kafka-writer/main.go b/vnfs/DAaaS/microservices/prom-kafka-writer/cmd/prom-kafka-writer/main.go new file mode 100644 index 00000000..08dc38b5 --- /dev/null +++ b/vnfs/DAaaS/microservices/prom-kafka-writer/cmd/prom-kafka-writer/main.go @@ -0,0 +1,71 @@ +/* + * + * Copyright 2019 Intel Corporation. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package main + +import ( + "context" + "net/http" + "os" + "os/signal" + "time" + + "prom-kafka-writer/pkg/api" + logger "prom-kafka-writer/pkg/config" + kw "prom-kafka-writer/pkg/kafkawriter" +) + +const defaultAddr = ":8686" + +// main starts an http server on the $PORT environment variable. +func main() { + log := logger.GetLoggerInstance() + + addr := defaultAddr + // $PORT environment variable is provided in the Kubernetes deployment. + if p := os.Getenv("PORT"); p != "" { + addr = ":" + p + } + + log.Infow("Starting Prometheus Kafka writer", "addr", addr) + defer log.Infow("Prometheus Kafka writer Terminated") + + s := &http.Server{ + Handler: api.NewRouter(), + Addr: addr, + } + + // shutdown hook. Wait for clean up if the pod/container is killed + shutdownChannel := make(chan struct{}) + go func() { + log.Debug("msg", "Creating shutdown hooks") + sigChan := make(chan os.Signal, 1) + signal.Notify(sigChan, os.Interrupt) + <-sigChan + log.Debug("msg", "Received os.Interrupt") + log.Debug("msg", "Initiate cleanup") + //TODO: Cleanup here + kw.Cleanup() + time.Sleep(time.Second * 3) + _ = s.Shutdown(context.Background()) + close(shutdownChannel) + }() + + err := s.ListenAndServe() + if err != nil { + log.Fatalw("Server Error - Shutting down", "error", err) + } + <-shutdownChannel +} diff --git a/vnfs/DAaaS/microservices/prom-kafka-writer/diagram.png b/vnfs/DAaaS/microservices/prom-kafka-writer/diagram.png Binary files differnew file mode 100644 index 00000000..14cff60e --- /dev/null +++ b/vnfs/DAaaS/microservices/prom-kafka-writer/diagram.png diff --git a/vnfs/DAaaS/microservices/prom-kafka-writer/go.mod b/vnfs/DAaaS/microservices/prom-kafka-writer/go.mod new file mode 100644 index 00000000..3c0be213 --- /dev/null +++ b/vnfs/DAaaS/microservices/prom-kafka-writer/go.mod @@ -0,0 +1,18 @@ +module prom-kafka-writer + +go 1.13 + +require ( + github.com/golang/protobuf v1.3.2 + github.com/golang/snappy v0.0.1 + github.com/gorilla/mux v1.7.3 + github.com/grpc-ecosystem/grpc-gateway v1.11.3 // indirect + github.com/prometheus/client_golang v1.2.1 + github.com/prometheus/common v0.7.0 + github.com/prometheus/prometheus v2.5.0+incompatible + github.com/stretchr/testify v1.4.0 + go.uber.org/zap v1.12.0 + google.golang.org/genproto v0.0.0-20191028173616-919d9bdd9fe6 // indirect + google.golang.org/grpc v1.24.0 // indirect + gopkg.in/confluentinc/confluent-kafka-go.v1 v1.1.0 +) diff --git a/vnfs/DAaaS/microservices/prom-kafka-writer/go.sum b/vnfs/DAaaS/microservices/prom-kafka-writer/go.sum new file mode 100644 index 00000000..eaf935dd --- /dev/null +++ b/vnfs/DAaaS/microservices/prom-kafka-writer/go.sum @@ -0,0 +1,159 @@ +cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw= +github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= +github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc= +github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc= +github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0= +github.com/alecthomas/units v0.0.0-20190717042225-c3de453c63f4/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0= +github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q= +github.com/beorn7/perks v1.0.0/go.mod h1:KWe93zE9D1o94FZ5RNwFwVgaQK1VOXiVxmqh+CedLV8= +github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM= +github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw= +github.com/cespare/xxhash/v2 v2.1.0 h1:yTUvW7Vhb89inJ+8irsUqiWjh8iT6sQPZiQzI6ReGkA= +github.com/cespare/xxhash/v2 v2.1.0/go.mod h1:dgIUBU3pDso/gPgZ1osOZ0iQf77oPR28Tjxl5dIMyVM= +github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw= +github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/ghodss/yaml v1.0.0/go.mod h1:4dBDuWmgqj2HViK6kFavaiC9ZROes6MMH2rRYeMEF04= +github.com/go-kit/kit v0.8.0/go.mod h1:xBxKIO96dXMWWy0MnWVtmwkA9/13aqxPnvrjFYMA2as= +github.com/go-kit/kit v0.9.0/go.mod h1:xBxKIO96dXMWWy0MnWVtmwkA9/13aqxPnvrjFYMA2as= +github.com/go-logfmt/logfmt v0.3.0/go.mod h1:Qt1PoO58o5twSAckw1HlFXLmHsOX5/0LbT9GBnD5lWE= +github.com/go-logfmt/logfmt v0.4.0/go.mod h1:3RMwSq7FuexP4Kalkev3ejPJsZTpXXBr9+V4qmtdjCk= +github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY= +github.com/gogo/protobuf v1.1.1 h1:72R+M5VuhED/KujmZVcIquuo8mBgX4oVda//DQb3PXo= +github.com/gogo/protobuf v1.1.1/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7atdtwQ= +github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q= +github.com/golang/mock v1.1.1/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfbm0A= +github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= +github.com/golang/protobuf v1.3.1/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= +github.com/golang/protobuf v1.3.2 h1:6nsPYzhq5kReh6QImI3k5qWzO4PEbvbIW2cwSfR/6xs= +github.com/golang/protobuf v1.3.2/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= +github.com/golang/snappy v0.0.1 h1:Qgr9rKW7uDUkrbSmQeiDsGa8SjGyCOGtuasMWwvp2P4= +github.com/golang/snappy v0.0.1/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= +github.com/google/go-cmp v0.2.0/go.mod h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5aqRK0M= +github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= +github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= +github.com/google/renameio v0.1.0/go.mod h1:KWCgfxg9yswjAJkECMjeO8J8rahYeXnNhOm40UhjYkI= +github.com/gorilla/mux v1.7.3 h1:gnP5JzjVOuiZD07fKKToCAOjS0yOpj/qPETTXCCS6hw= +github.com/gorilla/mux v1.7.3/go.mod h1:1lud6UwP+6orDFRuTfBEV8e9/aOM/c4fVVCaMa2zaAs= +github.com/grpc-ecosystem/grpc-gateway v1.11.3 h1:h8+NsYENhxNTuq+dobk3+ODoJtwY4Fu0WQXsxJfL8aM= +github.com/grpc-ecosystem/grpc-gateway v1.11.3/go.mod h1:vNeuVxBJEsws4ogUvrchl83t/GYV9WGTSLVdBhOQFDY= +github.com/json-iterator/go v1.1.6/go.mod h1:+SdeFBvtyEkXs7REEP0seUULqWtbJapLOCVDaaPEHmU= +github.com/json-iterator/go v1.1.7/go.mod h1:KdQUCv79m/52Kvf8AW2vK1V8akMuk1QjK/uOdHXbAo4= +github.com/julienschmidt/httprouter v1.2.0/go.mod h1:SYymIcj16QtmaHHD7aYtjjsJG7VTCxuUUipMqKk8s4w= +github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= +github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ= +github.com/kr/logfmt v0.0.0-20140226030751-b84e30acd515/go.mod h1:+0opPa2QZZtGFBFZlji/RkVcI2GknAs/DXo4wKdlNEc= +github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= +github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= +github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= +github.com/matttproud/golang_protobuf_extensions v1.0.1 h1:4hp9jkHxhMHkqkrB3Ix0jegS5sx/RkqARlsWZ6pIwiU= +github.com/matttproud/golang_protobuf_extensions v1.0.1/go.mod h1:D8He9yQNgCq6Z5Ld7szi9bcBfOoFv/3dc6xSMkL2PC0= +github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= +github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= +github.com/modern-go/reflect2 v0.0.0-20180701023420-4b7aa43c6742/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3RllmbCylyMrvgv0= +github.com/modern-go/reflect2 v1.0.1/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3RllmbCylyMrvgv0= +github.com/mwitkow/go-conntrack v0.0.0-20161129095857-cc309e4a2223/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U= +github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= +github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/prometheus/client_golang v0.9.1/go.mod h1:7SWBe2y4D6OKWSNQJUaRYU/AaXPKyh/dDVn+NZz0KFw= +github.com/prometheus/client_golang v1.0.0/go.mod h1:db9x61etRT2tGnBNRi70OPL5FsnadC4Ky3P0J6CfImo= +github.com/prometheus/client_golang v1.2.1 h1:JnMpQc6ppsNgw9QPAGF6Dod479itz7lvlsMzzNayLOI= +github.com/prometheus/client_golang v1.2.1/go.mod h1:XMU6Z2MjaRKVu/dC1qupJI9SiNkDYzz3xecMgSW/F+U= +github.com/prometheus/client_model v0.0.0-20180712105110-5c3871d89910/go.mod h1:MbSGuTsp3dbXC40dX6PRTWyKYBIrTGTE9sqQNg2J8bo= +github.com/prometheus/client_model v0.0.0-20190129233127-fd36f4220a90/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA= +github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4 h1:gQz4mCbXsO+nc9n1hCxHcGA3Zx3Eo+UHZoInFGUIXNM= +github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA= +github.com/prometheus/common v0.4.1/go.mod h1:TNfzLD0ON7rHzMJeJkieUDPYmFC7Snx/y86RQel1bk4= +github.com/prometheus/common v0.7.0 h1:L+1lyG48J1zAQXA3RBX/nG/B3gjlHq0zTt2tlbJLyCY= +github.com/prometheus/common v0.7.0/go.mod h1:DjGbpBbp5NYNiECxcL/VnbXCCaQpKd3tt26CguLLsqA= +github.com/prometheus/procfs v0.0.0-20181005140218-185b4288413d/go.mod h1:c3At6R/oaqEKCNdg8wHV1ftS6bRYblBhIjjI8uT2IGk= +github.com/prometheus/procfs v0.0.2/go.mod h1:TjEm7ze935MbeOT/UhFTIMYKhuLP4wbCsTZCD3I8kEA= +github.com/prometheus/procfs v0.0.5 h1:3+auTFlqw+ZaQYJARz6ArODtkaIwtvBTx3N2NehQlL8= +github.com/prometheus/procfs v0.0.5/go.mod h1:4A/X28fw3Fc593LaREMrKMqOKvUAntwMDaekg4FpcdQ= +github.com/prometheus/prometheus v2.5.0+incompatible h1:7QPitgO2kOFG8ecuRn9O/4L9+10He72rVRJvMXrE9Hg= +github.com/prometheus/prometheus v2.5.0+incompatible/go.mod h1:oAIUtOny2rjMX0OWN5vPR5/q/twIROJvdqnQKDdil/s= +github.com/rogpeppe/fastuuid v0.0.0-20150106093220-6724a57986af/go.mod h1:XWv6SoW27p1b0cqNHllgS5HIMJraePCO15w5zCzIWYg= +github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4= +github.com/sirupsen/logrus v1.2.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo= +github.com/sirupsen/logrus v1.4.2/go.mod h1:tLMulIdttU9McNUspp0xgXVQah82FyeX6MwdIuYE2rE= +github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs= +github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= +github.com/stretchr/testify v1.4.0 h1:2E4SXV/wtOkTonXsotYi4li6zVWxYlZuYNCXe9XRJyk= +github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4= +go.uber.org/atomic v1.5.0 h1:OI5t8sDa1Or+q8AeE+yKeB/SDYioSHAgcVljj9JIETY= +go.uber.org/atomic v1.5.0/go.mod h1:sABNBOSYdrvTF6hTgEIbc7YasKWGhgEQZyfxyTvoXHQ= +go.uber.org/multierr v1.3.0 h1:sFPn2GLc3poCkfrpIXGhBD2X0CMIo4Q/zSULXrj/+uc= +go.uber.org/multierr v1.3.0/go.mod h1:VgVr7evmIr6uPjLBxg28wmKNXyqE9akIJ5XnfpiKl+4= +go.uber.org/tools v0.0.0-20190618225709-2cfd321de3ee/go.mod h1:vJERXedbb3MVM5f9Ejo0C68/HhF8uaILCdgjnY+goOA= +go.uber.org/zap v1.12.0 h1:dySoUQPFBGj6xwjmBzageVL8jGi8uxc6bEmJQjA06bw= +go.uber.org/zap v1.12.0/go.mod h1:zwrFLgMcdUuIBviXEYEH1YKNaOBnKXsx2IPda5bBwHM= +golang.org/x/crypto v0.0.0-20180904163835-0709b304e793/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4= +golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= +golang.org/x/crypto v0.0.0-20190510104115-cbcb75029529/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= +golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= +golang.org/x/lint v0.0.0-20181026193005-c67002cb31c3/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE= +golang.org/x/lint v0.0.0-20190227174305-5b3e6a55c961/go.mod h1:wehouNa3lNwaWXcvxsM5YxQ5yQlVC4a0KAMCusXpPoU= +golang.org/x/lint v0.0.0-20190313153728-d0100b6bd8b3/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc= +golang.org/x/lint v0.0.0-20190930215403-16217165b5de/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc= +golang.org/x/mod v0.0.0-20190513183733-4bf6d317e70e/go.mod h1:mXi4GBBbnImb6dmsKGUJ2LatrhH/nqhxcFungHvyanc= +golang.org/x/net v0.0.0-20180724234803-3673e40ba225/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= +golang.org/x/net v0.0.0-20180826012351-8a410e7b638d/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= +golang.org/x/net v0.0.0-20181114220301-adae6a3d119a/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= +golang.org/x/net v0.0.0-20181220203305-927f97764cc3/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= +golang.org/x/net v0.0.0-20190213061140-3a22650c66bd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= +golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= +golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= +golang.org/x/net v0.0.0-20190613194153-d28f0bde5980/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= +golang.org/x/net v0.0.0-20190620200207-3b0461eec859 h1:R/3boaszxrf1GEUWTVDzSKVwLmSJpwZ1yqXm8j0v2QI= +golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= +golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= +golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20181107165924-66b7b1311ac8/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20181116152217-5ac8a444bdc5/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20190422165155-953cdadca894/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20191010194322-b09406accb47 h1:/XfQ9z7ib8eEJX2hdgFTZJ/ntt0swNk5oYBziWeTCvY= +golang.org/x/sys v0.0.0-20191010194322-b09406accb47/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/text v0.3.0 h1:g61tztE5qeGQ89tm6NTjjM9VPIm088od1l6aSorWRWg= +golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= +golang.org/x/tools v0.0.0-20190114222345-bf090417da8b/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= +golang.org/x/tools v0.0.0-20190226205152-f727befe758c/go.mod h1:9Yl7xja0Znq3iFh3HoIrodX9oNMXvdceNzlUR8zjMvY= +golang.org/x/tools v0.0.0-20190311212946-11955173bddd/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs= +golang.org/x/tools v0.0.0-20190524140312-2c0ae7006135/go.mod h1:RgjU9mgBXZiqYHBnxXauZ1Gv1EHHAz9KjViQ78xBX0Q= +golang.org/x/tools v0.0.0-20190621195816-6e04913cbbac/go.mod h1:/rFqwRUd4F7ZHNgwSSTFct+R/Kf4OFW1sUzUTQQTgfc= +golang.org/x/tools v0.0.0-20191029041327-9cc4af7d6b2c/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= +golang.org/x/tools v0.0.0-20191029190741-b9c20aec41a5/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= +golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +google.golang.org/appengine v1.1.0/go.mod h1:EbEs0AVv82hx2wNQdGPgUI5lhzA/G0D9YwlJXL52JkM= +google.golang.org/appengine v1.4.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7/EB5XEv4= +google.golang.org/genproto v0.0.0-20180817151627-c66870c02cf8/go.mod h1:JiN7NxoALGmiZfu7CAH4rXhgtRTLTxftemlI0sWmxmc= +google.golang.org/genproto v0.0.0-20191028173616-919d9bdd9fe6 h1:UXl+Zk3jqqcbEVV7ace5lrt4YdA4tXiz3f/KbmD29Vo= +google.golang.org/genproto v0.0.0-20191028173616-919d9bdd9fe6/go.mod h1:n3cpQtvxv34hfy77yVDNjmbRyujviMdxYliBSkLhpCc= +google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c= +google.golang.org/grpc v1.24.0 h1:vb/1TCsVn3DcJlQ0Gs1yB1pKI6Do2/QNwxdKqmc/b0s= +google.golang.org/grpc v1.24.0/go.mod h1:XDChyiUovWa60DnaeDeZmSW86xtLtjtZbwvSiRnRtcA= +gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLkstjWtayDeSgw= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/confluentinc/confluent-kafka-go.v1 v1.1.0 h1:roy97m/3wj9/o8OuU3sZ5wildk30ep38k2x8nhNbKrI= +gopkg.in/confluentinc/confluent-kafka-go.v1 v1.1.0/go.mod h1:ZdI3yfYmdNSLQPNCpO1y00EHyWaHG5EnQEyL/ntAegY= +gopkg.in/errgo.v2 v2.1.0/go.mod h1:hNsd1EY+bozCKY1Ytp96fpM3vjJbqLJn88ws8XvfDNI= +gopkg.in/resty.v1 v1.12.0/go.mod h1:mDo4pnntr5jdWRML875a/NmxYqAlA73dVijT2AXvQQo= +gopkg.in/yaml.v2 v2.0.0-20170812160011-eb3733d160e7/go.mod h1:JAlM8MvJe8wmxCU4Bli9HhUf9+ttbYbLASfIpnQbh74= +gopkg.in/yaml.v2 v2.2.1/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= +gopkg.in/yaml.v2 v2.2.2 h1:ZCJp+EgiOT7lHqUV2J862kp8Qj64Jo6az82+3Td9dZw= +gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= +honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= +honnef.co/go/tools v0.0.0-20190523083050-ea95bdfd59fc/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= +honnef.co/go/tools v0.0.1-2019.2.3/go.mod h1:a3bituU0lyd329TUQxRnasdCoJDkEUEAqEt0JzvZhAg= diff --git a/vnfs/DAaaS/microservices/prom-kafka-writer/kubernetes-manifests/pkw.deployment.yaml b/vnfs/DAaaS/microservices/prom-kafka-writer/kubernetes-manifests/pkw.deployment.yaml new file mode 100644 index 00000000..203c8195 --- /dev/null +++ b/vnfs/DAaaS/microservices/prom-kafka-writer/kubernetes-manifests/pkw.deployment.yaml @@ -0,0 +1,27 @@ +# This Deployment manifest defines: +# - single-replica deployment of the container image, with label "app: go-prom-kafka-writer" +# - Pod exposes port 8686 +# - specify PORT environment variable to the container process +# Syntax reference https://kubernetes.io/docs/concepts/configuration/overview/ +apiVersion: apps/v1 +kind: Deployment +metadata: + name: prom-kafka-writer +spec: + replicas: 1 + selector: + matchLabels: + app: prom-kafka-writer + template: + metadata: + labels: + app: prom-kafka-writer + spec: + containers: + - name: server + image: dcr.cluster.local:32644/prom-kafka-writer + ports: + - containerPort: 8686 + env: + - name: PORT + value: "8686" diff --git a/vnfs/DAaaS/microservices/prom-kafka-writer/kubernetes-manifests/pkw.service.yaml b/vnfs/DAaaS/microservices/prom-kafka-writer/kubernetes-manifests/pkw.service.yaml new file mode 100644 index 00000000..f63ef53e --- /dev/null +++ b/vnfs/DAaaS/microservices/prom-kafka-writer/kubernetes-manifests/pkw.service.yaml @@ -0,0 +1,20 @@ +# This Service manifest defines: +# - a load balancer for pods matching label "app: go-prom-kafka-writer" +# - exposing the application to the public Internet (type:LoadBalancer) +# - routes port 80 of the load balancer to the port 8686 of the Pods. +# Syntax reference https://kubernetes.io/docs/concepts/configuration/overview/ +apiVersion: v1 +kind: Service +metadata: + name: prom-kafka-writer + labels: + app: prom-kafka-writer +spec: + type: NodePort + selector: + app: prom-kafka-writer + ports: + - name: http + port: 80 + targetPort: 8686 + nodePort: 30086 diff --git a/vnfs/DAaaS/microservices/prom-kafka-writer/pkg/api/handler.go b/vnfs/DAaaS/microservices/prom-kafka-writer/pkg/api/handler.go new file mode 100644 index 00000000..d7a2b898 --- /dev/null +++ b/vnfs/DAaaS/microservices/prom-kafka-writer/pkg/api/handler.go @@ -0,0 +1,143 @@ +/* + * + * Copyright 2019 Intel Corporation. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package api + +import ( + "encoding/json" + "errors" + "io" + "io/ioutil" + "net/http" + + logger "prom-kafka-writer/pkg/config" + kw "prom-kafka-writer/pkg/kafkawriter" + + "github.com/golang/protobuf/proto" + "github.com/golang/snappy" + "github.com/gorilla/mux" + "github.com/prometheus/prometheus/prompb" +) + +type kwResponse struct { + KWid string `json:"kwid,omitempty"` + KWCRespMap kw.KWRespMap `json:"kafkaWriterConfigs,omitempty"` +} + +var log = logger.GetLoggerInstance() + +// CreateKWHandler - Creates and starts a Prometheus to Kafka writer +func CreateKWHandler(w http.ResponseWriter, r *http.Request) { + log.Infow("Received request for Creating Kafka Writer") + kwConfig := kw.NewKWConfig() + dec := json.NewDecoder(r.Body) + dec.DisallowUnknownFields() + err := dec.Decode(kwConfig) + switch { + case err == io.EOF: + http.Error(w, "Body empty", http.StatusBadRequest) + return + case err != nil: + http.Error(w, err.Error(), http.StatusUnprocessableEntity) + return + } + kwid, err := kw.AddKWC(kwConfig) + if err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + + //Send response back + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(http.StatusCreated) + kwResp := kwResponse{ + KWid: kwid, + } + err = json.NewEncoder(w).Encode(kwResp) + if err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } +} + +// ListKWHandler - Lists the KafkaWriters and its config +func ListKWHandler(w http.ResponseWriter, r *http.Request) { + log.Infow("Received request for List Kafka Writers", "url", r.URL) + res := kw.ListKWC() + //Send response back + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(http.StatusOK) + kwResp := kwResponse{ + KWCRespMap: res, + } + err := json.NewEncoder(w).Encode(kwResp) + if err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } +} + +// DeleteKWHandler - Deletes a given Prometheus to Kafka writer +func DeleteKWHandler(w http.ResponseWriter, r *http.Request) { + params := mux.Vars(r) + log.Infow("Received request for Deleting Kafka Writer", "KWID", params["kwid"]) + kw.DeleteKWC(params["kwid"]) + + //Send response back + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(http.StatusOK) +} + +// ReceiveKWHandler - Publish metrics from Prometheus to Kafka +func ReceiveKWHandler(w http.ResponseWriter, r *http.Request) { + params := mux.Vars(r) + kwid := params["kwid"] + if _, ok := kw.KWMap[kwid]; !ok { + notRegisteredErr := errors.New("kafka writer not registered").Error() + log.Error(notRegisteredErr) + http.Error(w, notRegisteredErr, http.StatusNotFound) + return + } + log.Infow("Produce message on Kafka Writer", "kwid", kwid) + + compressed, err := ioutil.ReadAll(r.Body) + defer r.Body.Close() + if err != nil { + log.Error("error", err.Error()) + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + + metricBuf, err := snappy.Decode(nil, compressed) + if err != nil { + log.Error("error", err.Error()) + http.Error(w, err.Error(), http.StatusBadRequest) + return + } + + var metrics prompb.WriteRequest + if err := proto.Unmarshal(metricBuf, &metrics); err != nil { + log.Error("error", err.Error()) + http.Error(w, err.Error(), http.StatusBadRequest) + return + } + + err = kw.PublishTimeSeries(kwid, &metrics) + if err != nil { + log.Error("error", err.Error()) + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } +} diff --git a/vnfs/DAaaS/microservices/prom-kafka-writer/pkg/api/handler_test.go b/vnfs/DAaaS/microservices/prom-kafka-writer/pkg/api/handler_test.go new file mode 100644 index 00000000..19c4c0ab --- /dev/null +++ b/vnfs/DAaaS/microservices/prom-kafka-writer/pkg/api/handler_test.go @@ -0,0 +1,289 @@ +/* + * + * Copyright 2019 Intel Corporation. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package api + +import ( + "bytes" + "encoding/json" + "errors" + "fmt" + "github.com/golang/protobuf/proto" + "github.com/golang/snappy" + "github.com/prometheus/prometheus/prompb" + "github.com/stretchr/testify/assert" + "io" + "net/http" + "net/http/httptest" + "prom-kafka-writer/pkg/kafkawriter" + "testing" +) + +type errReader int + +func (errReader) Read(p []byte) (n int, err error) { + return 0, errors.New("test error") +} + +func TestCreateKWHandler(t *testing.T) { + tests := []struct { + name string + body io.Reader + expectStatus int + expectResp *kwResponse + }{ + { + name: "Test Create Kafka Writer", + body: bytes.NewBuffer([]byte(`{ + "bootstrap.servers": "kafka-cluster-kafka-bootstrap.kafka.svc.cluster.local:9092", + "topic": "adatopic1", + "usePartition": false, + "compression.codec": "snappy" + }`)), + expectStatus: http.StatusCreated, + expectResp: &kwResponse{ + KWid: "pkw0", + }, + }, + { + name: "Test Create Kafka Writer Wrong parameters", + body: bytes.NewBuffer([]byte(`{ + "servers": "kafka-cluster-kafka-bootstrap.kafka.svc.cluster.local:9092", + "kafkatopic": "adatopic1", + "usePartition": false, + "compression.codec": "snappy" + }`)), + expectStatus: http.StatusUnprocessableEntity, + expectResp: &kwResponse{}, + }, + { + name: "Test Create Kafka Writer Empty Body", + body: bytes.NewBuffer([]byte(nil)), + expectStatus: http.StatusBadRequest, + expectResp: &kwResponse{}, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + req := httptest.NewRequest("POST", "/pkw", tt.body) + rec := httptest.NewRecorder() + r := NewRouter() + r.ServeHTTP(rec, req) + resp := rec.Result() + assert.Equal(t, tt.expectStatus, resp.StatusCode) + kwResp := &kwResponse{} + json.NewDecoder(resp.Body).Decode(&kwResp) + assert.Equal(t, tt.expectResp, kwResp) + }) + } +} + +func TestListKWHandler(t *testing.T) { + + tests := []struct { + name string + body string + expectStatus int + expectResp *kwResponse + }{ + { + name: "Test List Kafka Writers", + body: `{ + "bootstrap.servers": "kafka-cluster-kafka-bootstrap.kafka.svc.cluster.local:9092", + "topic": "adatopic1", + "usePartition": false, + "batch.num.messages": 10000, + "compression.codec": "snappy" + }`, + expectStatus: http.StatusOK, + expectResp: &kwResponse{ + KWCRespMap: map[string]kafkawriter.KWConfig{ + "pkw0": { + Broker: "kafka-cluster-kafka-bootstrap.kafka.svc.cluster.local:9092", + Topic: "adatopic1", + UsePartition: false, + BatchMsgNum: 10000, + Compression: "snappy", + }, + }, + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + preCreateKW("pkw0", tt.body) + req := httptest.NewRequest("GET", "/pkw", nil) + rec := httptest.NewRecorder() + r := NewRouter() + r.ServeHTTP(rec, req) + resp := rec.Result() + assert.Equal(t, tt.expectStatus, resp.StatusCode) + kwResp := &kwResponse{} + json.NewDecoder(resp.Body).Decode(&kwResp) + assert.Equal(t, tt.expectResp, kwResp) + }) + } +} + +func TestDeleteKWHandler(t *testing.T) { + tests := []struct { + name string + kwid string + expectStatus int + }{ + { + name: "Test Delete Kafka Writer", + kwid: "pkw777", + expectStatus: http.StatusOK, + }, + } + body := `{ + "bootstrap.servers": "kafka-cluster-kafka-bootstrap.kafka.svc.cluster.local:9092", + "topic": "adatopic1", + "usePartition": false, + "batch.num.messages": 10000, + "compression.codec": "snappy" + }` + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + preCreateKW(tt.kwid, body) + target := fmt.Sprintf("/pkw/%s", tt.kwid) + req := httptest.NewRequest("DELETE", target, nil) + r := NewRouter() + rec := httptest.NewRecorder() + r.ServeHTTP(rec, req) + resp := rec.Result() + assert.Equal(t, tt.expectStatus, resp.StatusCode) + }) + } +} + +func preCreateKW(kwid string, body string) { + kafkawriter.Cleanup() + k := []byte(body) + kwc := &kafkawriter.KWConfig{} + _ = json.Unmarshal(k, kwc) + producer, _ := kafkawriter.NewKafkaWriter(kwc) + kafkawriter.KWMap[kwid] = kafkawriter.KWProducer{Config: *kwc, Producer: producer} +} + +func TestReceiveKWHandler(t *testing.T) { + f, err := buildRemoteWriteRequest() + if err != nil { + t.Fatal("Could not build prompb.WriteRequest") + } + tests := []struct { + name string + kwid string + body io.Reader + preCreate bool + expectStatus int + }{ + { + name: "Test Receive Messages Empty Message", + kwid: "pkw111", + preCreate: true, + expectStatus: http.StatusBadRequest, + }, + { + name: "Test Receive Messages", + kwid: "pkw111", + preCreate: true, + body: bytes.NewReader(f), + expectStatus: http.StatusOK, + }, + { + name: "Test Receive Messages Kafka Writer Not registed", + kwid: "pkw222", + preCreate: false, + expectStatus: http.StatusNotFound, + }, + { + name: "Test Receive Messages Kafka Writer Not registed", + kwid: "pkw111", + preCreate: true, + body: errReader(0), + expectStatus: http.StatusInternalServerError, + }, + } + body := `{ + "bootstrap.servers": "kafka-cluster-kafka-bootstrap.kafka.svc.cluster.local:9092", + "topic": "adatopic1", + "usePartition": false, + "batch.num.messages": 10000, + "compression.codec": "snappy" + }` + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + if tt.preCreate { + preCreateKW(tt.kwid, body) + } + target := fmt.Sprintf("/pkw/%s/receive", tt.kwid) + req := httptest.NewRequest("POST", target, tt.body) + r := NewRouter() + rec := httptest.NewRecorder() + r.ServeHTTP(rec, req) + resp := rec.Result() + assert.Equal(t, tt.expectStatus, resp.StatusCode) + }) + } +} + +func buildRemoteWriteRequest() ([]byte, error) { + var buf []byte + samples := []*prompb.TimeSeries{ + &prompb.TimeSeries{ + Labels: []*prompb.Label{ + &prompb.Label{Name: "__name__", Value: "go_gc_duration_seconds_count"}, + &prompb.Label{Name: "endpoint", Value: "http"}, + &prompb.Label{Name: "instance", Value: "10.42.1.101:8686"}, + &prompb.Label{Name: "job", Value: "prom-kafka-writer"}, + &prompb.Label{Name: "metrics_storage", Value: "kafka_remote"}, + &prompb.Label{Name: "namespace", Value: "edge1"}, + &prompb.Label{Name: "pod", Value: "prom-kafka-writer-696898f47f-bc5fs"}, + &prompb.Label{Name: "prometheus", Value: "edge1/cp-prometheus-prometheus"}, + &prompb.Label{Name: "prometheus_replica", Value: "prometheus-cp-prometheus-prometheus-0"}, + &prompb.Label{Name: "service", Value: "prom-kafka-writer"}, + }, + Samples: []prompb.Sample{ + prompb.Sample{ + Value: 17, + Timestamp: 1572479934007, + }, + prompb.Sample{ + Value: 19, + Timestamp: 1572480144007, + }, + }, + }, + } + req := &prompb.WriteRequest{ + Timeseries: samples, + } + + data, err := proto.Marshal(req) + if err != nil { + return nil, err + } + + // snappy uses len() to see if it needs to allocate a new slice. Make the + // buffer as long as possible. + if buf != nil { + buf = buf[0:cap(buf)] + } + compressed := snappy.Encode(buf, data) + return compressed, nil +} diff --git a/vnfs/DAaaS/microservices/prom-kafka-writer/pkg/api/router.go b/vnfs/DAaaS/microservices/prom-kafka-writer/pkg/api/router.go new file mode 100644 index 00000000..fb78afe2 --- /dev/null +++ b/vnfs/DAaaS/microservices/prom-kafka-writer/pkg/api/router.go @@ -0,0 +1,33 @@ +/* + * + * Copyright 2019 Intel Corporation. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package api + +import ( + "github.com/gorilla/mux" + "github.com/prometheus/client_golang/prometheus/promhttp" +) + +func NewRouter() *mux.Router { + r := mux.NewRouter() + r.HandleFunc("/pkw", ListKWHandler).Methods("GET") + r.HandleFunc("/pkw", CreateKWHandler).Methods("POST") + r.HandleFunc("/pkw/{kwid}", DeleteKWHandler).Methods("DELETE") + r.HandleFunc("/pkw/{kwid}/receive", ReceiveKWHandler).Methods("POST") + + // Metrics Handler for prom-kafka-writer + r.Handle("/metrics", promhttp.Handler()) + return r +} diff --git a/vnfs/DAaaS/microservices/prom-kafka-writer/pkg/config/logger.go b/vnfs/DAaaS/microservices/prom-kafka-writer/pkg/config/logger.go new file mode 100644 index 00000000..2a5921f1 --- /dev/null +++ b/vnfs/DAaaS/microservices/prom-kafka-writer/pkg/config/logger.go @@ -0,0 +1,38 @@ +/* + * + * Copyright 2019 Intel Corporation. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package config + +import ( + "go.uber.org/zap" + "log" + "sync" +) + +var logOnce sync.Once +var slogger *zap.SugaredLogger + +//GetLoggerInstance returns a singleton instance of logger +func GetLoggerInstance() *zap.SugaredLogger { + logOnce.Do(func() { + logger, err := zap.NewProduction() + if err != nil { + log.Fatalf("can't initialize zap logger: %v", err) + } + defer logger.Sync() + slogger = logger.Sugar() + }) + return slogger +} diff --git a/vnfs/DAaaS/microservices/prom-kafka-writer/pkg/kafkawriter/kafkawriter.go b/vnfs/DAaaS/microservices/prom-kafka-writer/pkg/kafkawriter/kafkawriter.go new file mode 100644 index 00000000..f56f66aa --- /dev/null +++ b/vnfs/DAaaS/microservices/prom-kafka-writer/pkg/kafkawriter/kafkawriter.go @@ -0,0 +1,129 @@ +/* + * + * Copyright 2019 Intel Corporation. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package kafkawriter + +import ( + "gopkg.in/confluentinc/confluent-kafka-go.v1/kafka" + "strconv" + "sync" +) + +//KWConfig - serialized type for config related to Kafka +type KWConfig struct { + //Broker - Kafka Bootstrap servers (comma separated) + Broker string `json:"bootstrap.servers"` + //Topic - kafka topic name + Topic string `json:"topic"` + //UsePartition - Enforce use of partitions + UsePartition bool `json:"usePartition"` + BatchMsgNum int `json:"batch.num.messages,omitempty"` + Compression string `json:"compression.codec,omitempty"` +} + +//KWProducer - holds the Kafka Config and associated Kafka Producer +type KWProducer struct { + Config KWConfig + Producer *kafka.Producer +} + +//KWRespMap packs the KWConfig and kwid for List Api +type KWRespMap map[string]KWConfig + +//KWMap - Stores the Kafka Writer to Kafka Producer Mapping +// This is used to uniquely identify a Kafka Writer - Producer mapping. +var ( + KWMap = make(map[string]KWProducer) + kwMutex sync.Mutex + id int +) + +// NewKafkaWriter - creates a new producer using kafka config. +// Handles the remote write from prometheus and send to kafka topic +func NewKafkaWriter(kwc *KWConfig) (*kafka.Producer, error) { + producer, err := kafka.NewProducer(&kafka.ConfigMap{ + "bootstrap.servers": kwc.Broker, + "compression.codec": kwc.Compression, + "batch.num.messages": kwc.BatchMsgNum, + "go.batch.producer": true, + "go.delivery.reports": false, + }) + if err != nil { + return nil, err + } + return producer, nil +} + +//NewKWConfig - creates a KWConfig object with default values +func NewKWConfig() *KWConfig { + return &KWConfig{ + UsePartition: false, + BatchMsgNum: 10000, + Compression: "none", + } +} + +//NewKWRespMap - packs the KWConfig and kwid for List Api +func newKWRespMap() KWRespMap { + kwr := make(KWRespMap) + return kwr +} + +//AddKWC - Method to add KafkaWriterConfig request to KWMap +func AddKWC(kwc *KWConfig) (string, error) { + kwMutex.Lock() + defer kwMutex.Unlock() + //TODO: Generate kwid + kwid := "pkw" + strconv.Itoa(id) + id++ + producer, err := NewKafkaWriter(kwc) + if err != nil { + log.Error("Error", err) + id-- + return "", err + } + + KWMap[kwid] = KWProducer{ + Config: *kwc, + Producer: producer, + } + return kwid, nil +} + +//DeleteKWC - Method to add KafkaWriter request to KWMap +func DeleteKWC(kwid string) { + kwMutex.Lock() + defer kwMutex.Unlock() + if _, ok := KWMap[kwid]; ok { + KWMap[kwid].Producer.Close() + } + delete(KWMap, kwid) +} + +//ListKWC - Method to add KafkaWriter request to KWMap +func ListKWC() KWRespMap { + kwr := newKWRespMap() + for k, v := range KWMap { + kwr[k] = v.Config + } + return kwr +} + +//Cleanup - Method to cleanup resources +func Cleanup() { + for k := range KWMap { + DeleteKWC(k) + } +} diff --git a/vnfs/DAaaS/microservices/prom-kafka-writer/pkg/kafkawriter/kafkawriter_test.go b/vnfs/DAaaS/microservices/prom-kafka-writer/pkg/kafkawriter/kafkawriter_test.go new file mode 100644 index 00000000..6869452f --- /dev/null +++ b/vnfs/DAaaS/microservices/prom-kafka-writer/pkg/kafkawriter/kafkawriter_test.go @@ -0,0 +1,229 @@ +/* + * + * Copyright 2019 Intel Corporation. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package kafkawriter + +import ( + "github.com/stretchr/testify/assert" + "reflect" + "testing" +) + +func TestNewKafkaWriter(t *testing.T) { + type args struct { + kwc *KWConfig + } + kwc := NewKWConfig() + kwc.Broker = "localhost:9092" + kwc.Topic = "metrics" + + kwc2 := NewKWConfig() + kwc2.Broker = "localhost:9092" + kwc2.Topic = "metrics" + kwc2.BatchMsgNum = 0 + + tests := []struct { + name string + args args + want interface{} + }{ + { + name: "Test New Kafka Writer", + args: args{kwc}, + want: "rdkafka#producer-1", + }, + { + name: "Test New Kafka Writer Wrong Config", + args: args{kwc2}, + want: nil, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got, _ := NewKafkaWriter(tt.args.kwc) + if tt.want == nil { + assert.Equal(t, tt.want, nil) + } else { + if !reflect.DeepEqual(got.String(), tt.want) { + t.Errorf("NewKafkaWriter() = %v, want %v", got, tt.want) + } + } + }) + } +} + +func TestNewKWConfig(t *testing.T) { + tests := []struct { + name string + want *KWConfig + }{ + { + name: "Test New Kafka Config", + want: &KWConfig{ + UsePartition: false, + BatchMsgNum: 10000, + Compression: "none", + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + if got := NewKWConfig(); !reflect.DeepEqual(got, tt.want) { + t.Errorf("NewKWConfig() = %v, want %v", got, tt.want) + } + }) + } +} + +func TestAddKWC(t *testing.T) { + type args struct { + kwc *KWConfig + } + kwc := NewKWConfig() + kwc.Broker = "localhost:9092" + kwc.Topic = "metrics" + tests := []struct { + name string + args args + want string + }{ + { + name: "Test Add Kafka Writer 1", + args: args{kwc}, + want: "pkw0", + }, + { + name: "Test Add Kafka Writer 2 ", + args: args{kwc}, + want: "pkw1", + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got, _ := AddKWC(tt.args.kwc) + assert.Equal(t, tt.want, got) + + }) + } + assert.Equal(t, 2, len(KWMap)) +} + +func TestDeleteKWC(t *testing.T) { + type args struct { + kwid string + } + tests := []struct { + name string + args args + delcount int + }{ + { + name: "Test Delete Kafka Writer 1", + args: args{"pkw0"}, + delcount: 1, + }, + { + name: "Test Delete Kafka Writer Non existent", + args: args{"pkw3"}, + delcount: 0, + }, + { + name: "Test Delete Kafka Writer 2", + args: args{"pkw1"}, + delcount: 1, + }, + } + for _, tt := range tests { + l := len(KWMap) + t.Run(tt.name, func(t *testing.T) { + DeleteKWC(tt.args.kwid) + }) + assert.Equal(t, l-tt.delcount, len(KWMap)) + } + assert.Equal(t, 0, len(KWMap)) +} + +func TestListKWC(t *testing.T) { + tests := []struct { + name string + init func() string + want KWRespMap + clean func(string) + }{ + { + name: "Test List Kafka Writers Empty", + want: KWRespMap{"pkw2": { + Broker: "localhost:9092", + Topic: "metrics", + UsePartition: false, + BatchMsgNum: 10000, + Compression: "none", + }}, + init: func() string { + kwc := NewKWConfig() + kwc.Broker = "localhost:9092" + kwc.Topic = "metrics" + id, _ := AddKWC(kwc) + return id + }, + clean: func(id string) { + DeleteKWC(id) + }, + }, + { + name: "Test List Kafka Writers Empty", + want: KWRespMap{}, + init: func() string { + return "" + }, + clean: func(string) {}, + }, + } + for _, tt := range tests { + id := tt.init() + t.Run(tt.name, func(t *testing.T) { + if got := ListKWC(); !reflect.DeepEqual(got, tt.want) { + t.Errorf("ListKWC() = %v, want %v", got, tt.want) + } + }) + tt.clean(id) + } +} + +func TestCleanup(t *testing.T) { + tests := []struct { + name string + init func() + }{ + { + name: "Test List Kafka Writers Empty", + init: func() { + kwc := NewKWConfig() + kwc.Broker = "localhost:9092" + kwc.Topic = "metrics" + AddKWC(kwc) + AddKWC(kwc) + AddKWC(kwc) + }, + }, + } + for _, tt := range tests { + tt.init() + t.Run(tt.name, func(t *testing.T) { + Cleanup() + }) + assert.Equal(t, 0, len(KWMap)) + } +} diff --git a/vnfs/DAaaS/microservices/prom-kafka-writer/pkg/kafkawriter/producer.go b/vnfs/DAaaS/microservices/prom-kafka-writer/pkg/kafkawriter/producer.go new file mode 100644 index 00000000..42804321 --- /dev/null +++ b/vnfs/DAaaS/microservices/prom-kafka-writer/pkg/kafkawriter/producer.go @@ -0,0 +1,83 @@ +/* + * + * Copyright 2019 Intel Corporation. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package kafkawriter + +import ( + "encoding/json" + "github.com/prometheus/common/model" + "github.com/prometheus/prometheus/prompb" + "gopkg.in/confluentinc/confluent-kafka-go.v1/kafka" + logger "prom-kafka-writer/pkg/config" +) + +var log = logger.GetLoggerInstance() + +func PublishTimeSeries(kwid string, metrics *prompb.WriteRequest) error { + log.Debugw("Remote write Time Series", "length", len(metrics.Timeseries), "TimeSeries", metrics.Timeseries) + for _, ts := range metrics.Timeseries { + m := make(model.Metric, len(ts.Labels)) + for _, l := range ts.Labels { + m[model.LabelName(l.Name)] = model.LabelValue(l.Value) + } + log.Debugw("Labels", "Labelset", m) + + for _, s := range ts.Samples { + log.Debugf(" %f %d\n", s.Value, s.Timestamp) + metric := map[string]interface{}{ + "name": m["__name__"], + "labels": m, + "timestamp": s.Timestamp, + "value": s.Value, + } + key := string(m["__name__"]) + jsonMetric, err := json.Marshal(metric) + if err != nil { + log.Errorw("Marshal error", "error", err.Error()) + continue + } + err = publish(kwid, key, jsonMetric) + if err != nil { + log.Error("Failed to produce message") + return err + } + } + } + return nil +} + +func publish(kwid string, key string, jsonMetric []byte) error { + var ( + kwp = KWMap[kwid].Producer + kwc = KWMap[kwid].Config + ) + + tp := getTopicPartition(kwc) + kwMsg := kafka.Message{TopicPartition: tp, Key: []byte(key), Value: jsonMetric} + err := kwp.Produce(&kwMsg, nil) + if err != nil { + log.Errorw("Kafka Producer Error", "error", err.Error()) + } + return err +} + +func getTopicPartition(kwc KWConfig) kafka.TopicPartition { + p := kafka.PartitionAny + if kwc.UsePartition { + // TODO: Implement partition strategy + p = kafka.PartitionAny + } + return kafka.TopicPartition{Topic: &kwc.Topic, Partition: p} +} diff --git a/vnfs/DAaaS/microservices/prom-kafka-writer/skaffold.yaml b/vnfs/DAaaS/microservices/prom-kafka-writer/skaffold.yaml new file mode 100644 index 00000000..5e5fd4bc --- /dev/null +++ b/vnfs/DAaaS/microservices/prom-kafka-writer/skaffold.yaml @@ -0,0 +1,16 @@ +apiVersion: skaffold/v1beta6 +kind: Config +build: + tagPolicy: + sha256: {} + artifacts: + - context: . + image: prom-kafka-writer +deploy: + kubectl: + manifests: + - kubernetes-manifests/** +profiles: +- name: cloudbuild + build: + googleCloudBuild: {} |