aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorDileep Ranganathan <dileep.ranganathan@intel.com>2019-10-31 15:46:37 -0700
committerDileep Ranganathan <dileep.ranganathan@intel.com>2019-11-11 14:39:44 -0800
commitd11cb99fb101a6798fd57fa44d332737d6637e75 (patch)
tree0d7ae93ac67186a9bd4dd81db85eab9ccd64334f
parent68d118176bb53c36b31a7060cfa16ad5acac1765 (diff)
Prometheus Kafka Writer Microservice
This patch implents Prometheus to remote Kafka Writer Microservice Added sample day-2 config to configure prometheus PS4: Fixed Review comments Issue-ID: ONAPARC-393 Signed-off-by: Dileep Ranganathan <dileep.ranganathan@intel.com> Change-Id: I0bc77175593a165effd7bb1cb4802c988a5ef4ec
-rw-r--r--vnfs/DAaaS/deploy/day2_configs/prometheus/add_remote_write_kafka.yaml10
-rw-r--r--vnfs/DAaaS/microservices/prom-kafka-writer/.gitignore99
-rw-r--r--vnfs/DAaaS/microservices/prom-kafka-writer/Dockerfile36
-rw-r--r--vnfs/DAaaS/microservices/prom-kafka-writer/Dockerfile.ci43
-rw-r--r--vnfs/DAaaS/microservices/prom-kafka-writer/Makefile73
-rw-r--r--vnfs/DAaaS/microservices/prom-kafka-writer/README.md66
-rw-r--r--vnfs/DAaaS/microservices/prom-kafka-writer/cmd/prom-kafka-writer/main.go71
-rw-r--r--vnfs/DAaaS/microservices/prom-kafka-writer/diagram.pngbin0 -> 88448 bytes
-rw-r--r--vnfs/DAaaS/microservices/prom-kafka-writer/go.mod18
-rw-r--r--vnfs/DAaaS/microservices/prom-kafka-writer/go.sum159
-rw-r--r--vnfs/DAaaS/microservices/prom-kafka-writer/kubernetes-manifests/pkw.deployment.yaml27
-rw-r--r--vnfs/DAaaS/microservices/prom-kafka-writer/kubernetes-manifests/pkw.service.yaml20
-rw-r--r--vnfs/DAaaS/microservices/prom-kafka-writer/pkg/api/handler.go143
-rw-r--r--vnfs/DAaaS/microservices/prom-kafka-writer/pkg/api/handler_test.go289
-rw-r--r--vnfs/DAaaS/microservices/prom-kafka-writer/pkg/api/router.go33
-rw-r--r--vnfs/DAaaS/microservices/prom-kafka-writer/pkg/config/logger.go38
-rw-r--r--vnfs/DAaaS/microservices/prom-kafka-writer/pkg/kafkawriter/kafkawriter.go129
-rw-r--r--vnfs/DAaaS/microservices/prom-kafka-writer/pkg/kafkawriter/kafkawriter_test.go229
-rw-r--r--vnfs/DAaaS/microservices/prom-kafka-writer/pkg/kafkawriter/producer.go83
-rw-r--r--vnfs/DAaaS/microservices/prom-kafka-writer/skaffold.yaml16
20 files changed, 1582 insertions, 0 deletions
diff --git a/vnfs/DAaaS/deploy/day2_configs/prometheus/add_remote_write_kafka.yaml b/vnfs/DAaaS/deploy/day2_configs/prometheus/add_remote_write_kafka.yaml
new file mode 100644
index 00000000..090e2a1e
--- /dev/null
+++ b/vnfs/DAaaS/deploy/day2_configs/prometheus/add_remote_write_kafka.yaml
@@ -0,0 +1,10 @@
+spec:
+ remoteWrite:
+ - url: "http://prom-kafka-writer.edge1.svc.cluster.local:8686/pkw/pkw0/receive"
+ writeRelabelConfigs:
+ - targetLabel: metrics_storage
+ replacement: kafka_remote
+ - action: keep
+ regex: go_gc_duration_seconds_count
+ sourceLabels:
+ - __name__
diff --git a/vnfs/DAaaS/microservices/prom-kafka-writer/.gitignore b/vnfs/DAaaS/microservices/prom-kafka-writer/.gitignore
new file mode 100644
index 00000000..ec6f4674
--- /dev/null
+++ b/vnfs/DAaaS/microservices/prom-kafka-writer/.gitignore
@@ -0,0 +1,99 @@
+# Common
+.DS_Store
+.vscode
+*-workspace
+.tox/
+.*.swp
+*.log
+coverage.html
+docs/build
+*.so
+
+# Tests
+*.test
+*.out
+
+# # Directories
+target
+vendor
+src/github.com
+src/golang.org
+
+# Temporary Build Files
+build/_output
+build/_test
+# Created by https://www.gitignore.io/api/go,vim,emacs,visualstudiocode
+### Emacs ###
+# -*- mode: gitignore; -*-
+*~
+\#*\#
+/.emacs.desktop
+/.emacs.desktop.lock
+*.elc
+auto-save-list
+tramp
+.\#*
+# Org-mode
+.org-id-locations
+*_archive
+# flymake-mode
+*_flymake.*
+# eshell files
+/eshell/history
+/eshell/lastdir
+# elpa packages
+/elpa/
+# reftex files
+*.rel
+# AUCTeX auto folder
+/auto/
+# cask packages
+.cask/
+dist/
+# Flycheck
+flycheck_*.el
+# server auth directory
+/server/
+# projectiles files
+.projectile
+projectile-bookmarks.eld
+# directory configuration
+.dir-locals.el
+# saveplace
+places
+# url cache
+url/cache/
+# cedet
+ede-projects.el
+# smex
+smex-items
+# company-statistics
+company-statistics-cache.el
+# anaconda-mode
+anaconda-mode/
+### Go ###
+# Binaries for programs and plugins
+*.exe
+*.exe~
+*.dll
+*.so
+*.dylib
+# Test binary, build with 'go test -c'
+*.test
+# Output of the go coverage tool, specifically when used with LiteIDE
+*.out
+### Vim ###
+# swap
+.sw[a-p]
+.*.sw[a-p]
+# session
+Session.vim
+# temporary
+.netrwhist
+# auto-generated tag files
+tags
+### VisualStudioCode ###
+.vscode/*
+.history
+# End of https://www.gitignore.io/api/go,vim,emacs,visualstudiocode
+bin \ No newline at end of file
diff --git a/vnfs/DAaaS/microservices/prom-kafka-writer/Dockerfile b/vnfs/DAaaS/microservices/prom-kafka-writer/Dockerfile
new file mode 100644
index 00000000..dc8f5fe2
--- /dev/null
+++ b/vnfs/DAaaS/microservices/prom-kafka-writer/Dockerfile
@@ -0,0 +1,36 @@
+# Use base golang image from Docker Hub
+FROM golang:1.12.7
+
+# Download the dlv (delve) debugger for go (you can comment this out if unused)
+RUN go get -u -v github.com/go-delve/delve/cmd/dlv
+
+WORKDIR /src/prom-kafka-writer
+
+RUN mkdir /librdkafka-dir && cd /librdkafka-dir
+RUN git clone https://github.com/edenhill/librdkafka.git && \
+cd librdkafka && \
+./configure --prefix /usr && \
+make && \
+make install
+
+# Install dependencies in go.mod and go.sum
+COPY go.mod go.sum ./
+RUN go mod download
+
+# Copy rest of the application source code
+COPY . ./
+
+# Compile the application to /app.
+RUN go build -o ./bin/prom-kafka-writer -v ./cmd/prom-kafka-writer
+
+# If you want to use the debugger, you need to modify the entrypoint to the
+# container and point it to the "dlv debug" command:
+# * UNCOMMENT the following ENTRYPOINT statement,
+# * COMMENT OUT the last ENTRYPOINT statement
+# Start the "dlv debug" server on port 3000 of the container.
+ENTRYPOINT ["dlv", "exec", "./bin/prom-kafka-writer", "--continue", "--accept-multiclient", "--api-version=2", "--headless", "--listen=:3000", "--log"]
+
+# If you want to run WITHOUT the debugging server:
+# * COMMENT OUT the previous ENTRYPOINT statements,
+# * UNCOMMENT the following ENTRYPOINT statement.
+# ENTRYPOINT ["/prom-kafka-writer"]
diff --git a/vnfs/DAaaS/microservices/prom-kafka-writer/Dockerfile.ci b/vnfs/DAaaS/microservices/prom-kafka-writer/Dockerfile.ci
new file mode 100644
index 00000000..a135dbbd
--- /dev/null
+++ b/vnfs/DAaaS/microservices/prom-kafka-writer/Dockerfile.ci
@@ -0,0 +1,43 @@
+# Use base golang image from Docker Hub
+FROM golang:1.12.7 as builder
+
+# Download the dlv (delve) debugger for go (you can comment this out if unused)
+RUN go get -u -v github.com/go-delve/delve/cmd/dlv
+
+WORKDIR /src/prom-kafka-writer
+
+RUN mkdir /librdkafka-dir && cd /librdkafka-dir
+RUN git clone https://github.com/edenhill/librdkafka.git && \
+cd librdkafka && \
+./configure --prefix /usr && \
+make && \
+make install
+
+# Install dependencies in go.mod and go.sum
+COPY go.mod go.sum ./
+RUN go mod download
+
+# Copy rest of the application source code
+COPY . ./
+
+# Compile the application to /app.
+RUN go build -o ./bin/prom-kafka-writer -v ./cmd/prom-kafka-writer
+
+# If you want to use the debugger, you need to modify the entrypoint to the
+# container and point it to the "dlv debug" command:
+# * UNCOMMENT the following ENTRYPOINT statement,
+# * COMMENT OUT the last ENTRYPOINT statement
+# Start the "dlv debug" server on port 3000 of the container.
+#ENTRYPOINT ["dlv", "exec", "./bin/prom-kafka-writer", "--continue", "--accept-multiclient", "--api-version=2", "--headless", "--listen=:3000", "--log"]
+
+# If you want to run WITHOUT the debugging server:
+# * COMMENT OUT the previous ENTRYPOINT statements,
+# * UNCOMMENT the following ENTRYPOINT statement.
+
+# final stage
+FROM ubuntu:18.04
+COPY --from=builder /usr/lib/pkgconfig /usr/lib/pkgconfig
+COPY --from=builder /usr/lib/librdkafka* /usr/lib/
+COPY --from=builder //src/prom-kafka-writer/* /prom-kafka-writer/
+WORKDIR /prom-kafka-writer
+ENTRYPOINT ["./bin/prom-kafka-writer"]
diff --git a/vnfs/DAaaS/microservices/prom-kafka-writer/Makefile b/vnfs/DAaaS/microservices/prom-kafka-writer/Makefile
new file mode 100644
index 00000000..5d8ba317
--- /dev/null
+++ b/vnfs/DAaaS/microservices/prom-kafka-writer/Makefile
@@ -0,0 +1,73 @@
+# SPDX-license-identifier: Apache-2.0
+##############################################################################
+# Copyright (c) 2019 Intel Corporation
+# All rights reserved. This program and the accompanying materials
+# are made available under the terms of the Apache License, Version 2.0
+# which accompanies this distribution, and is available at
+# http://www.apache.org/licenses/LICENSE-2.0
+##############################################################################
+
+# Common
+VERSION := $(shell git describe --tags)
+BUILD := $(shell git rev-parse --short HEAD)
+PROJECTNAME := $(shell basename "$(PWD)")
+
+ROOTPATH := $(shell realpath "$(PWD)/../")
+PKA := prom-kafka-writer
+ifndef IMAGE_NAME
+override IMAGE_NAME := dcr.cluster.local:32644/prom-kafka-writer:latest
+endif
+
+export GO111MODULE=on
+
+.PHONY: clean plugins
+
+## build: Build the binary for prom-kafka-writer
+build: clean format
+ GOOS=linux GOARCH=amd64
+ @go mod download
+ @go build -o ./bin/prom-kafka-writer -v ./cmd/prom-kafka-writer
+
+## all: Delete the image, binary, complete build, test and run coverage
+all: build test cover
+## debug: Build local binary for debugging
+debug:
+ @echo "Use Cloud Code VSCode plugin or Skaffold to debug"
+
+## deploy: Build Dockerfile and publish to repository
+deploy: build test publish
+
+## publish: Push docker image to repository
+publish:
+ @docker push ${IMAGE_NAME}
+.PHONY: test
+## test: run tests
+test: clean
+ @go test -v ./...
+## format: format source code
+format:
+ @go fmt ./...
+
+## clean: clean build artifacts, image, binary
+clean: format
+ @echo "Deleting the prom-kafka-writer binary"
+ @rm -rf ${PKA}
+ @echo "Deleting the prom-kafka-writer docker image"
+ @docker 2>/dev/null rmi ${IMAGE_NAME} | true
+
+.PHONY: cover
+## cover: run tests and generate coverage report
+cover:
+ @go test ./... -coverprofile=coverage.out
+ @go tool cover -html=coverage.out -o coverage.html
+
+.PHONY: help
+## help: Print help message
+help: Makefile
+ @echo
+ @echo " Requires librdkafka v1.1.0 or later, go1.12.5+"
+ @echo
+ @echo " Choose a command run in "$(PROJECTNAME)":"
+ @echo
+ @sed -n 's/^## //p' $< | column -t -s ':' | sed -e 's/^/ /'
+ @echo
diff --git a/vnfs/DAaaS/microservices/prom-kafka-writer/README.md b/vnfs/DAaaS/microservices/prom-kafka-writer/README.md
new file mode 100644
index 00000000..8d981e1d
--- /dev/null
+++ b/vnfs/DAaaS/microservices/prom-kafka-writer/README.md
@@ -0,0 +1,66 @@
+# Prometheus Kafka Remote Writer
+
+![Architecture Diagram](./diagram.png)
+
+Prometheus Kafka remote writer is a remote writer/adapter to publish metrics from prometheus to a remote storage like kafka.
+The prom-kafka-writer microservice enables to register the remote endpoint and configures the kafka producer endpoint.
+Prometheus can then be configured to set the remote write to this endpoint and the metrics can be published.
+
+----
+## Install
+### Build docker images
+```bash
+git clone https://github.com/onap/demo.git
+DA_WORKING_DIR=$PWD/demo/vnfs/DAaaS
+cd $DA_WORKING_DIR/microservices/prom-kafka-writer
+docker build -f Dockerfile.ci . -t prom-kafka-writer:latest
+```
+### Deploy using Helm chart
+```bash
+cd $DA_WORKING_DIR/deploy/collection/charts/prom-kafka-writer
+
+# Modify the values.yaml
+helm install -n pkw . -f values.yaml --namespace=edge1
+```
+Verify the deployment
+```bash
+kubectl get pods -n edge1 | grep prom-kafka-writer
+
+NAME READY STATUS RESTARTS AGE
+pkw-prom-kafka-writer-c6c89c579-jv6jt 1/1 Running 0 11m
+```
+
+## API
+* Create Prometheus Kafka Writer endpoint
+
+```bash
+ //Request
+ curl -X POST \
+ http://prom-kafka-writer/pkw \
+ -H 'Content-Type: application/json' \
+ -d '{
+ "bootstrap.servers": "kafka-cluster-kafka-bootstrap.kafka.svc.cluster.local:9091",
+ "topic": "topic1",
+ "usePartition": false,
+ "compression.codec": "snappy"
+ }'
+```
+
+* Delete Prometheus Kafka Writer endpoint
+
+```bash
+ //Request
+ curl -X DELETE http://prom-kafka-writer/pkw/pkw0
+```
+
+* List Prometheus Kafka Writers
+```bash
+ //Request
+ curl -X GET http://prom-kafka-writer/pkw
+```
+
+* Publish to Kafka Writer
+```bash
+ //Request
+ curl -X POST http://prom-kafka-writer/pkw/pkw0/receive
+```
diff --git a/vnfs/DAaaS/microservices/prom-kafka-writer/cmd/prom-kafka-writer/main.go b/vnfs/DAaaS/microservices/prom-kafka-writer/cmd/prom-kafka-writer/main.go
new file mode 100644
index 00000000..08dc38b5
--- /dev/null
+++ b/vnfs/DAaaS/microservices/prom-kafka-writer/cmd/prom-kafka-writer/main.go
@@ -0,0 +1,71 @@
+/*
+ *
+ * Copyright 2019 Intel Corporation.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package main
+
+import (
+ "context"
+ "net/http"
+ "os"
+ "os/signal"
+ "time"
+
+ "prom-kafka-writer/pkg/api"
+ logger "prom-kafka-writer/pkg/config"
+ kw "prom-kafka-writer/pkg/kafkawriter"
+)
+
+const defaultAddr = ":8686"
+
+// main starts an http server on the $PORT environment variable.
+func main() {
+ log := logger.GetLoggerInstance()
+
+ addr := defaultAddr
+ // $PORT environment variable is provided in the Kubernetes deployment.
+ if p := os.Getenv("PORT"); p != "" {
+ addr = ":" + p
+ }
+
+ log.Infow("Starting Prometheus Kafka writer", "addr", addr)
+ defer log.Infow("Prometheus Kafka writer Terminated")
+
+ s := &http.Server{
+ Handler: api.NewRouter(),
+ Addr: addr,
+ }
+
+ // shutdown hook. Wait for clean up if the pod/container is killed
+ shutdownChannel := make(chan struct{})
+ go func() {
+ log.Debug("msg", "Creating shutdown hooks")
+ sigChan := make(chan os.Signal, 1)
+ signal.Notify(sigChan, os.Interrupt)
+ <-sigChan
+ log.Debug("msg", "Received os.Interrupt")
+ log.Debug("msg", "Initiate cleanup")
+ //TODO: Cleanup here
+ kw.Cleanup()
+ time.Sleep(time.Second * 3)
+ _ = s.Shutdown(context.Background())
+ close(shutdownChannel)
+ }()
+
+ err := s.ListenAndServe()
+ if err != nil {
+ log.Fatalw("Server Error - Shutting down", "error", err)
+ }
+ <-shutdownChannel
+}
diff --git a/vnfs/DAaaS/microservices/prom-kafka-writer/diagram.png b/vnfs/DAaaS/microservices/prom-kafka-writer/diagram.png
new file mode 100644
index 00000000..14cff60e
--- /dev/null
+++ b/vnfs/DAaaS/microservices/prom-kafka-writer/diagram.png
Binary files differ
diff --git a/vnfs/DAaaS/microservices/prom-kafka-writer/go.mod b/vnfs/DAaaS/microservices/prom-kafka-writer/go.mod
new file mode 100644
index 00000000..3c0be213
--- /dev/null
+++ b/vnfs/DAaaS/microservices/prom-kafka-writer/go.mod
@@ -0,0 +1,18 @@
+module prom-kafka-writer
+
+go 1.13
+
+require (
+ github.com/golang/protobuf v1.3.2
+ github.com/golang/snappy v0.0.1
+ github.com/gorilla/mux v1.7.3
+ github.com/grpc-ecosystem/grpc-gateway v1.11.3 // indirect
+ github.com/prometheus/client_golang v1.2.1
+ github.com/prometheus/common v0.7.0
+ github.com/prometheus/prometheus v2.5.0+incompatible
+ github.com/stretchr/testify v1.4.0
+ go.uber.org/zap v1.12.0
+ google.golang.org/genproto v0.0.0-20191028173616-919d9bdd9fe6 // indirect
+ google.golang.org/grpc v1.24.0 // indirect
+ gopkg.in/confluentinc/confluent-kafka-go.v1 v1.1.0
+)
diff --git a/vnfs/DAaaS/microservices/prom-kafka-writer/go.sum b/vnfs/DAaaS/microservices/prom-kafka-writer/go.sum
new file mode 100644
index 00000000..eaf935dd
--- /dev/null
+++ b/vnfs/DAaaS/microservices/prom-kafka-writer/go.sum
@@ -0,0 +1,159 @@
+cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw=
+github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
+github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc=
+github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc=
+github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0=
+github.com/alecthomas/units v0.0.0-20190717042225-c3de453c63f4/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0=
+github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q=
+github.com/beorn7/perks v1.0.0/go.mod h1:KWe93zE9D1o94FZ5RNwFwVgaQK1VOXiVxmqh+CedLV8=
+github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM=
+github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw=
+github.com/cespare/xxhash/v2 v2.1.0 h1:yTUvW7Vhb89inJ+8irsUqiWjh8iT6sQPZiQzI6ReGkA=
+github.com/cespare/xxhash/v2 v2.1.0/go.mod h1:dgIUBU3pDso/gPgZ1osOZ0iQf77oPR28Tjxl5dIMyVM=
+github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw=
+github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
+github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
+github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
+github.com/ghodss/yaml v1.0.0/go.mod h1:4dBDuWmgqj2HViK6kFavaiC9ZROes6MMH2rRYeMEF04=
+github.com/go-kit/kit v0.8.0/go.mod h1:xBxKIO96dXMWWy0MnWVtmwkA9/13aqxPnvrjFYMA2as=
+github.com/go-kit/kit v0.9.0/go.mod h1:xBxKIO96dXMWWy0MnWVtmwkA9/13aqxPnvrjFYMA2as=
+github.com/go-logfmt/logfmt v0.3.0/go.mod h1:Qt1PoO58o5twSAckw1HlFXLmHsOX5/0LbT9GBnD5lWE=
+github.com/go-logfmt/logfmt v0.4.0/go.mod h1:3RMwSq7FuexP4Kalkev3ejPJsZTpXXBr9+V4qmtdjCk=
+github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY=
+github.com/gogo/protobuf v1.1.1 h1:72R+M5VuhED/KujmZVcIquuo8mBgX4oVda//DQb3PXo=
+github.com/gogo/protobuf v1.1.1/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7atdtwQ=
+github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q=
+github.com/golang/mock v1.1.1/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfbm0A=
+github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
+github.com/golang/protobuf v1.3.1/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
+github.com/golang/protobuf v1.3.2 h1:6nsPYzhq5kReh6QImI3k5qWzO4PEbvbIW2cwSfR/6xs=
+github.com/golang/protobuf v1.3.2/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
+github.com/golang/snappy v0.0.1 h1:Qgr9rKW7uDUkrbSmQeiDsGa8SjGyCOGtuasMWwvp2P4=
+github.com/golang/snappy v0.0.1/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
+github.com/google/go-cmp v0.2.0/go.mod h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5aqRK0M=
+github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU=
+github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg=
+github.com/google/renameio v0.1.0/go.mod h1:KWCgfxg9yswjAJkECMjeO8J8rahYeXnNhOm40UhjYkI=
+github.com/gorilla/mux v1.7.3 h1:gnP5JzjVOuiZD07fKKToCAOjS0yOpj/qPETTXCCS6hw=
+github.com/gorilla/mux v1.7.3/go.mod h1:1lud6UwP+6orDFRuTfBEV8e9/aOM/c4fVVCaMa2zaAs=
+github.com/grpc-ecosystem/grpc-gateway v1.11.3 h1:h8+NsYENhxNTuq+dobk3+ODoJtwY4Fu0WQXsxJfL8aM=
+github.com/grpc-ecosystem/grpc-gateway v1.11.3/go.mod h1:vNeuVxBJEsws4ogUvrchl83t/GYV9WGTSLVdBhOQFDY=
+github.com/json-iterator/go v1.1.6/go.mod h1:+SdeFBvtyEkXs7REEP0seUULqWtbJapLOCVDaaPEHmU=
+github.com/json-iterator/go v1.1.7/go.mod h1:KdQUCv79m/52Kvf8AW2vK1V8akMuk1QjK/uOdHXbAo4=
+github.com/julienschmidt/httprouter v1.2.0/go.mod h1:SYymIcj16QtmaHHD7aYtjjsJG7VTCxuUUipMqKk8s4w=
+github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck=
+github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
+github.com/kr/logfmt v0.0.0-20140226030751-b84e30acd515/go.mod h1:+0opPa2QZZtGFBFZlji/RkVcI2GknAs/DXo4wKdlNEc=
+github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo=
+github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
+github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
+github.com/matttproud/golang_protobuf_extensions v1.0.1 h1:4hp9jkHxhMHkqkrB3Ix0jegS5sx/RkqARlsWZ6pIwiU=
+github.com/matttproud/golang_protobuf_extensions v1.0.1/go.mod h1:D8He9yQNgCq6Z5Ld7szi9bcBfOoFv/3dc6xSMkL2PC0=
+github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q=
+github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q=
+github.com/modern-go/reflect2 v0.0.0-20180701023420-4b7aa43c6742/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3RllmbCylyMrvgv0=
+github.com/modern-go/reflect2 v1.0.1/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3RllmbCylyMrvgv0=
+github.com/mwitkow/go-conntrack v0.0.0-20161129095857-cc309e4a2223/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U=
+github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
+github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
+github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
+github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
+github.com/prometheus/client_golang v0.9.1/go.mod h1:7SWBe2y4D6OKWSNQJUaRYU/AaXPKyh/dDVn+NZz0KFw=
+github.com/prometheus/client_golang v1.0.0/go.mod h1:db9x61etRT2tGnBNRi70OPL5FsnadC4Ky3P0J6CfImo=
+github.com/prometheus/client_golang v1.2.1 h1:JnMpQc6ppsNgw9QPAGF6Dod479itz7lvlsMzzNayLOI=
+github.com/prometheus/client_golang v1.2.1/go.mod h1:XMU6Z2MjaRKVu/dC1qupJI9SiNkDYzz3xecMgSW/F+U=
+github.com/prometheus/client_model v0.0.0-20180712105110-5c3871d89910/go.mod h1:MbSGuTsp3dbXC40dX6PRTWyKYBIrTGTE9sqQNg2J8bo=
+github.com/prometheus/client_model v0.0.0-20190129233127-fd36f4220a90/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA=
+github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4 h1:gQz4mCbXsO+nc9n1hCxHcGA3Zx3Eo+UHZoInFGUIXNM=
+github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA=
+github.com/prometheus/common v0.4.1/go.mod h1:TNfzLD0ON7rHzMJeJkieUDPYmFC7Snx/y86RQel1bk4=
+github.com/prometheus/common v0.7.0 h1:L+1lyG48J1zAQXA3RBX/nG/B3gjlHq0zTt2tlbJLyCY=
+github.com/prometheus/common v0.7.0/go.mod h1:DjGbpBbp5NYNiECxcL/VnbXCCaQpKd3tt26CguLLsqA=
+github.com/prometheus/procfs v0.0.0-20181005140218-185b4288413d/go.mod h1:c3At6R/oaqEKCNdg8wHV1ftS6bRYblBhIjjI8uT2IGk=
+github.com/prometheus/procfs v0.0.2/go.mod h1:TjEm7ze935MbeOT/UhFTIMYKhuLP4wbCsTZCD3I8kEA=
+github.com/prometheus/procfs v0.0.5 h1:3+auTFlqw+ZaQYJARz6ArODtkaIwtvBTx3N2NehQlL8=
+github.com/prometheus/procfs v0.0.5/go.mod h1:4A/X28fw3Fc593LaREMrKMqOKvUAntwMDaekg4FpcdQ=
+github.com/prometheus/prometheus v2.5.0+incompatible h1:7QPitgO2kOFG8ecuRn9O/4L9+10He72rVRJvMXrE9Hg=
+github.com/prometheus/prometheus v2.5.0+incompatible/go.mod h1:oAIUtOny2rjMX0OWN5vPR5/q/twIROJvdqnQKDdil/s=
+github.com/rogpeppe/fastuuid v0.0.0-20150106093220-6724a57986af/go.mod h1:XWv6SoW27p1b0cqNHllgS5HIMJraePCO15w5zCzIWYg=
+github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4=
+github.com/sirupsen/logrus v1.2.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo=
+github.com/sirupsen/logrus v1.4.2/go.mod h1:tLMulIdttU9McNUspp0xgXVQah82FyeX6MwdIuYE2rE=
+github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
+github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
+github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs=
+github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
+github.com/stretchr/testify v1.4.0 h1:2E4SXV/wtOkTonXsotYi4li6zVWxYlZuYNCXe9XRJyk=
+github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4=
+go.uber.org/atomic v1.5.0 h1:OI5t8sDa1Or+q8AeE+yKeB/SDYioSHAgcVljj9JIETY=
+go.uber.org/atomic v1.5.0/go.mod h1:sABNBOSYdrvTF6hTgEIbc7YasKWGhgEQZyfxyTvoXHQ=
+go.uber.org/multierr v1.3.0 h1:sFPn2GLc3poCkfrpIXGhBD2X0CMIo4Q/zSULXrj/+uc=
+go.uber.org/multierr v1.3.0/go.mod h1:VgVr7evmIr6uPjLBxg28wmKNXyqE9akIJ5XnfpiKl+4=
+go.uber.org/tools v0.0.0-20190618225709-2cfd321de3ee/go.mod h1:vJERXedbb3MVM5f9Ejo0C68/HhF8uaILCdgjnY+goOA=
+go.uber.org/zap v1.12.0 h1:dySoUQPFBGj6xwjmBzageVL8jGi8uxc6bEmJQjA06bw=
+go.uber.org/zap v1.12.0/go.mod h1:zwrFLgMcdUuIBviXEYEH1YKNaOBnKXsx2IPda5bBwHM=
+golang.org/x/crypto v0.0.0-20180904163835-0709b304e793/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4=
+golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
+golang.org/x/crypto v0.0.0-20190510104115-cbcb75029529/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
+golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
+golang.org/x/lint v0.0.0-20181026193005-c67002cb31c3/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE=
+golang.org/x/lint v0.0.0-20190227174305-5b3e6a55c961/go.mod h1:wehouNa3lNwaWXcvxsM5YxQ5yQlVC4a0KAMCusXpPoU=
+golang.org/x/lint v0.0.0-20190313153728-d0100b6bd8b3/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc=
+golang.org/x/lint v0.0.0-20190930215403-16217165b5de/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc=
+golang.org/x/mod v0.0.0-20190513183733-4bf6d317e70e/go.mod h1:mXi4GBBbnImb6dmsKGUJ2LatrhH/nqhxcFungHvyanc=
+golang.org/x/net v0.0.0-20180724234803-3673e40ba225/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
+golang.org/x/net v0.0.0-20180826012351-8a410e7b638d/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
+golang.org/x/net v0.0.0-20181114220301-adae6a3d119a/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
+golang.org/x/net v0.0.0-20181220203305-927f97764cc3/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
+golang.org/x/net v0.0.0-20190213061140-3a22650c66bd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
+golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
+golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
+golang.org/x/net v0.0.0-20190613194153-d28f0bde5980/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
+golang.org/x/net v0.0.0-20190620200207-3b0461eec859 h1:R/3boaszxrf1GEUWTVDzSKVwLmSJpwZ1yqXm8j0v2QI=
+golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
+golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U=
+golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
+golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
+golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
+golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
+golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
+golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
+golang.org/x/sys v0.0.0-20181107165924-66b7b1311ac8/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
+golang.org/x/sys v0.0.0-20181116152217-5ac8a444bdc5/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
+golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
+golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
+golang.org/x/sys v0.0.0-20190422165155-953cdadca894/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
+golang.org/x/sys v0.0.0-20191010194322-b09406accb47 h1:/XfQ9z7ib8eEJX2hdgFTZJ/ntt0swNk5oYBziWeTCvY=
+golang.org/x/sys v0.0.0-20191010194322-b09406accb47/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
+golang.org/x/text v0.3.0 h1:g61tztE5qeGQ89tm6NTjjM9VPIm088od1l6aSorWRWg=
+golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
+golang.org/x/tools v0.0.0-20190114222345-bf090417da8b/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
+golang.org/x/tools v0.0.0-20190226205152-f727befe758c/go.mod h1:9Yl7xja0Znq3iFh3HoIrodX9oNMXvdceNzlUR8zjMvY=
+golang.org/x/tools v0.0.0-20190311212946-11955173bddd/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs=
+golang.org/x/tools v0.0.0-20190524140312-2c0ae7006135/go.mod h1:RgjU9mgBXZiqYHBnxXauZ1Gv1EHHAz9KjViQ78xBX0Q=
+golang.org/x/tools v0.0.0-20190621195816-6e04913cbbac/go.mod h1:/rFqwRUd4F7ZHNgwSSTFct+R/Kf4OFW1sUzUTQQTgfc=
+golang.org/x/tools v0.0.0-20191029041327-9cc4af7d6b2c/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
+golang.org/x/tools v0.0.0-20191029190741-b9c20aec41a5/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
+golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
+google.golang.org/appengine v1.1.0/go.mod h1:EbEs0AVv82hx2wNQdGPgUI5lhzA/G0D9YwlJXL52JkM=
+google.golang.org/appengine v1.4.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7/EB5XEv4=
+google.golang.org/genproto v0.0.0-20180817151627-c66870c02cf8/go.mod h1:JiN7NxoALGmiZfu7CAH4rXhgtRTLTxftemlI0sWmxmc=
+google.golang.org/genproto v0.0.0-20191028173616-919d9bdd9fe6 h1:UXl+Zk3jqqcbEVV7ace5lrt4YdA4tXiz3f/KbmD29Vo=
+google.golang.org/genproto v0.0.0-20191028173616-919d9bdd9fe6/go.mod h1:n3cpQtvxv34hfy77yVDNjmbRyujviMdxYliBSkLhpCc=
+google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c=
+google.golang.org/grpc v1.24.0 h1:vb/1TCsVn3DcJlQ0Gs1yB1pKI6Do2/QNwxdKqmc/b0s=
+google.golang.org/grpc v1.24.0/go.mod h1:XDChyiUovWa60DnaeDeZmSW86xtLtjtZbwvSiRnRtcA=
+gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLkstjWtayDeSgw=
+gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
+gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
+gopkg.in/confluentinc/confluent-kafka-go.v1 v1.1.0 h1:roy97m/3wj9/o8OuU3sZ5wildk30ep38k2x8nhNbKrI=
+gopkg.in/confluentinc/confluent-kafka-go.v1 v1.1.0/go.mod h1:ZdI3yfYmdNSLQPNCpO1y00EHyWaHG5EnQEyL/ntAegY=
+gopkg.in/errgo.v2 v2.1.0/go.mod h1:hNsd1EY+bozCKY1Ytp96fpM3vjJbqLJn88ws8XvfDNI=
+gopkg.in/resty.v1 v1.12.0/go.mod h1:mDo4pnntr5jdWRML875a/NmxYqAlA73dVijT2AXvQQo=
+gopkg.in/yaml.v2 v2.0.0-20170812160011-eb3733d160e7/go.mod h1:JAlM8MvJe8wmxCU4Bli9HhUf9+ttbYbLASfIpnQbh74=
+gopkg.in/yaml.v2 v2.2.1/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
+gopkg.in/yaml.v2 v2.2.2 h1:ZCJp+EgiOT7lHqUV2J862kp8Qj64Jo6az82+3Td9dZw=
+gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
+honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
+honnef.co/go/tools v0.0.0-20190523083050-ea95bdfd59fc/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
+honnef.co/go/tools v0.0.1-2019.2.3/go.mod h1:a3bituU0lyd329TUQxRnasdCoJDkEUEAqEt0JzvZhAg=
diff --git a/vnfs/DAaaS/microservices/prom-kafka-writer/kubernetes-manifests/pkw.deployment.yaml b/vnfs/DAaaS/microservices/prom-kafka-writer/kubernetes-manifests/pkw.deployment.yaml
new file mode 100644
index 00000000..203c8195
--- /dev/null
+++ b/vnfs/DAaaS/microservices/prom-kafka-writer/kubernetes-manifests/pkw.deployment.yaml
@@ -0,0 +1,27 @@
+# This Deployment manifest defines:
+# - single-replica deployment of the container image, with label "app: go-prom-kafka-writer"
+# - Pod exposes port 8686
+# - specify PORT environment variable to the container process
+# Syntax reference https://kubernetes.io/docs/concepts/configuration/overview/
+apiVersion: apps/v1
+kind: Deployment
+metadata:
+ name: prom-kafka-writer
+spec:
+ replicas: 1
+ selector:
+ matchLabels:
+ app: prom-kafka-writer
+ template:
+ metadata:
+ labels:
+ app: prom-kafka-writer
+ spec:
+ containers:
+ - name: server
+ image: dcr.cluster.local:32644/prom-kafka-writer
+ ports:
+ - containerPort: 8686
+ env:
+ - name: PORT
+ value: "8686"
diff --git a/vnfs/DAaaS/microservices/prom-kafka-writer/kubernetes-manifests/pkw.service.yaml b/vnfs/DAaaS/microservices/prom-kafka-writer/kubernetes-manifests/pkw.service.yaml
new file mode 100644
index 00000000..f63ef53e
--- /dev/null
+++ b/vnfs/DAaaS/microservices/prom-kafka-writer/kubernetes-manifests/pkw.service.yaml
@@ -0,0 +1,20 @@
+# This Service manifest defines:
+# - a load balancer for pods matching label "app: go-prom-kafka-writer"
+# - exposing the application to the public Internet (type:LoadBalancer)
+# - routes port 80 of the load balancer to the port 8686 of the Pods.
+# Syntax reference https://kubernetes.io/docs/concepts/configuration/overview/
+apiVersion: v1
+kind: Service
+metadata:
+ name: prom-kafka-writer
+ labels:
+ app: prom-kafka-writer
+spec:
+ type: NodePort
+ selector:
+ app: prom-kafka-writer
+ ports:
+ - name: http
+ port: 80
+ targetPort: 8686
+ nodePort: 30086
diff --git a/vnfs/DAaaS/microservices/prom-kafka-writer/pkg/api/handler.go b/vnfs/DAaaS/microservices/prom-kafka-writer/pkg/api/handler.go
new file mode 100644
index 00000000..d7a2b898
--- /dev/null
+++ b/vnfs/DAaaS/microservices/prom-kafka-writer/pkg/api/handler.go
@@ -0,0 +1,143 @@
+/*
+ *
+ * Copyright 2019 Intel Corporation.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package api
+
+import (
+ "encoding/json"
+ "errors"
+ "io"
+ "io/ioutil"
+ "net/http"
+
+ logger "prom-kafka-writer/pkg/config"
+ kw "prom-kafka-writer/pkg/kafkawriter"
+
+ "github.com/golang/protobuf/proto"
+ "github.com/golang/snappy"
+ "github.com/gorilla/mux"
+ "github.com/prometheus/prometheus/prompb"
+)
+
+type kwResponse struct {
+ KWid string `json:"kwid,omitempty"`
+ KWCRespMap kw.KWRespMap `json:"kafkaWriterConfigs,omitempty"`
+}
+
+var log = logger.GetLoggerInstance()
+
+// CreateKWHandler - Creates and starts a Prometheus to Kafka writer
+func CreateKWHandler(w http.ResponseWriter, r *http.Request) {
+ log.Infow("Received request for Creating Kafka Writer")
+ kwConfig := kw.NewKWConfig()
+ dec := json.NewDecoder(r.Body)
+ dec.DisallowUnknownFields()
+ err := dec.Decode(kwConfig)
+ switch {
+ case err == io.EOF:
+ http.Error(w, "Body empty", http.StatusBadRequest)
+ return
+ case err != nil:
+ http.Error(w, err.Error(), http.StatusUnprocessableEntity)
+ return
+ }
+ kwid, err := kw.AddKWC(kwConfig)
+ if err != nil {
+ http.Error(w, err.Error(), http.StatusInternalServerError)
+ return
+ }
+
+ //Send response back
+ w.Header().Set("Content-Type", "application/json")
+ w.WriteHeader(http.StatusCreated)
+ kwResp := kwResponse{
+ KWid: kwid,
+ }
+ err = json.NewEncoder(w).Encode(kwResp)
+ if err != nil {
+ http.Error(w, err.Error(), http.StatusInternalServerError)
+ return
+ }
+}
+
+// ListKWHandler - Lists the KafkaWriters and its config
+func ListKWHandler(w http.ResponseWriter, r *http.Request) {
+ log.Infow("Received request for List Kafka Writers", "url", r.URL)
+ res := kw.ListKWC()
+ //Send response back
+ w.Header().Set("Content-Type", "application/json")
+ w.WriteHeader(http.StatusOK)
+ kwResp := kwResponse{
+ KWCRespMap: res,
+ }
+ err := json.NewEncoder(w).Encode(kwResp)
+ if err != nil {
+ http.Error(w, err.Error(), http.StatusInternalServerError)
+ return
+ }
+}
+
+// DeleteKWHandler - Deletes a given Prometheus to Kafka writer
+func DeleteKWHandler(w http.ResponseWriter, r *http.Request) {
+ params := mux.Vars(r)
+ log.Infow("Received request for Deleting Kafka Writer", "KWID", params["kwid"])
+ kw.DeleteKWC(params["kwid"])
+
+ //Send response back
+ w.Header().Set("Content-Type", "application/json")
+ w.WriteHeader(http.StatusOK)
+}
+
+// ReceiveKWHandler - Publish metrics from Prometheus to Kafka
+func ReceiveKWHandler(w http.ResponseWriter, r *http.Request) {
+ params := mux.Vars(r)
+ kwid := params["kwid"]
+ if _, ok := kw.KWMap[kwid]; !ok {
+ notRegisteredErr := errors.New("kafka writer not registered").Error()
+ log.Error(notRegisteredErr)
+ http.Error(w, notRegisteredErr, http.StatusNotFound)
+ return
+ }
+ log.Infow("Produce message on Kafka Writer", "kwid", kwid)
+
+ compressed, err := ioutil.ReadAll(r.Body)
+ defer r.Body.Close()
+ if err != nil {
+ log.Error("error", err.Error())
+ http.Error(w, err.Error(), http.StatusInternalServerError)
+ return
+ }
+
+ metricBuf, err := snappy.Decode(nil, compressed)
+ if err != nil {
+ log.Error("error", err.Error())
+ http.Error(w, err.Error(), http.StatusBadRequest)
+ return
+ }
+
+ var metrics prompb.WriteRequest
+ if err := proto.Unmarshal(metricBuf, &metrics); err != nil {
+ log.Error("error", err.Error())
+ http.Error(w, err.Error(), http.StatusBadRequest)
+ return
+ }
+
+ err = kw.PublishTimeSeries(kwid, &metrics)
+ if err != nil {
+ log.Error("error", err.Error())
+ http.Error(w, err.Error(), http.StatusInternalServerError)
+ return
+ }
+}
diff --git a/vnfs/DAaaS/microservices/prom-kafka-writer/pkg/api/handler_test.go b/vnfs/DAaaS/microservices/prom-kafka-writer/pkg/api/handler_test.go
new file mode 100644
index 00000000..19c4c0ab
--- /dev/null
+++ b/vnfs/DAaaS/microservices/prom-kafka-writer/pkg/api/handler_test.go
@@ -0,0 +1,289 @@
+/*
+ *
+ * Copyright 2019 Intel Corporation.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package api
+
+import (
+ "bytes"
+ "encoding/json"
+ "errors"
+ "fmt"
+ "github.com/golang/protobuf/proto"
+ "github.com/golang/snappy"
+ "github.com/prometheus/prometheus/prompb"
+ "github.com/stretchr/testify/assert"
+ "io"
+ "net/http"
+ "net/http/httptest"
+ "prom-kafka-writer/pkg/kafkawriter"
+ "testing"
+)
+
+type errReader int
+
+func (errReader) Read(p []byte) (n int, err error) {
+ return 0, errors.New("test error")
+}
+
+func TestCreateKWHandler(t *testing.T) {
+ tests := []struct {
+ name string
+ body io.Reader
+ expectStatus int
+ expectResp *kwResponse
+ }{
+ {
+ name: "Test Create Kafka Writer",
+ body: bytes.NewBuffer([]byte(`{
+ "bootstrap.servers": "kafka-cluster-kafka-bootstrap.kafka.svc.cluster.local:9092",
+ "topic": "adatopic1",
+ "usePartition": false,
+ "compression.codec": "snappy"
+ }`)),
+ expectStatus: http.StatusCreated,
+ expectResp: &kwResponse{
+ KWid: "pkw0",
+ },
+ },
+ {
+ name: "Test Create Kafka Writer Wrong parameters",
+ body: bytes.NewBuffer([]byte(`{
+ "servers": "kafka-cluster-kafka-bootstrap.kafka.svc.cluster.local:9092",
+ "kafkatopic": "adatopic1",
+ "usePartition": false,
+ "compression.codec": "snappy"
+ }`)),
+ expectStatus: http.StatusUnprocessableEntity,
+ expectResp: &kwResponse{},
+ },
+ {
+ name: "Test Create Kafka Writer Empty Body",
+ body: bytes.NewBuffer([]byte(nil)),
+ expectStatus: http.StatusBadRequest,
+ expectResp: &kwResponse{},
+ },
+ }
+
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ req := httptest.NewRequest("POST", "/pkw", tt.body)
+ rec := httptest.NewRecorder()
+ r := NewRouter()
+ r.ServeHTTP(rec, req)
+ resp := rec.Result()
+ assert.Equal(t, tt.expectStatus, resp.StatusCode)
+ kwResp := &kwResponse{}
+ json.NewDecoder(resp.Body).Decode(&kwResp)
+ assert.Equal(t, tt.expectResp, kwResp)
+ })
+ }
+}
+
+func TestListKWHandler(t *testing.T) {
+
+ tests := []struct {
+ name string
+ body string
+ expectStatus int
+ expectResp *kwResponse
+ }{
+ {
+ name: "Test List Kafka Writers",
+ body: `{
+ "bootstrap.servers": "kafka-cluster-kafka-bootstrap.kafka.svc.cluster.local:9092",
+ "topic": "adatopic1",
+ "usePartition": false,
+ "batch.num.messages": 10000,
+ "compression.codec": "snappy"
+ }`,
+ expectStatus: http.StatusOK,
+ expectResp: &kwResponse{
+ KWCRespMap: map[string]kafkawriter.KWConfig{
+ "pkw0": {
+ Broker: "kafka-cluster-kafka-bootstrap.kafka.svc.cluster.local:9092",
+ Topic: "adatopic1",
+ UsePartition: false,
+ BatchMsgNum: 10000,
+ Compression: "snappy",
+ },
+ },
+ },
+ },
+ }
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ preCreateKW("pkw0", tt.body)
+ req := httptest.NewRequest("GET", "/pkw", nil)
+ rec := httptest.NewRecorder()
+ r := NewRouter()
+ r.ServeHTTP(rec, req)
+ resp := rec.Result()
+ assert.Equal(t, tt.expectStatus, resp.StatusCode)
+ kwResp := &kwResponse{}
+ json.NewDecoder(resp.Body).Decode(&kwResp)
+ assert.Equal(t, tt.expectResp, kwResp)
+ })
+ }
+}
+
+func TestDeleteKWHandler(t *testing.T) {
+ tests := []struct {
+ name string
+ kwid string
+ expectStatus int
+ }{
+ {
+ name: "Test Delete Kafka Writer",
+ kwid: "pkw777",
+ expectStatus: http.StatusOK,
+ },
+ }
+ body := `{
+ "bootstrap.servers": "kafka-cluster-kafka-bootstrap.kafka.svc.cluster.local:9092",
+ "topic": "adatopic1",
+ "usePartition": false,
+ "batch.num.messages": 10000,
+ "compression.codec": "snappy"
+ }`
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ preCreateKW(tt.kwid, body)
+ target := fmt.Sprintf("/pkw/%s", tt.kwid)
+ req := httptest.NewRequest("DELETE", target, nil)
+ r := NewRouter()
+ rec := httptest.NewRecorder()
+ r.ServeHTTP(rec, req)
+ resp := rec.Result()
+ assert.Equal(t, tt.expectStatus, resp.StatusCode)
+ })
+ }
+}
+
+func preCreateKW(kwid string, body string) {
+ kafkawriter.Cleanup()
+ k := []byte(body)
+ kwc := &kafkawriter.KWConfig{}
+ _ = json.Unmarshal(k, kwc)
+ producer, _ := kafkawriter.NewKafkaWriter(kwc)
+ kafkawriter.KWMap[kwid] = kafkawriter.KWProducer{Config: *kwc, Producer: producer}
+}
+
+func TestReceiveKWHandler(t *testing.T) {
+ f, err := buildRemoteWriteRequest()
+ if err != nil {
+ t.Fatal("Could not build prompb.WriteRequest")
+ }
+ tests := []struct {
+ name string
+ kwid string
+ body io.Reader
+ preCreate bool
+ expectStatus int
+ }{
+ {
+ name: "Test Receive Messages Empty Message",
+ kwid: "pkw111",
+ preCreate: true,
+ expectStatus: http.StatusBadRequest,
+ },
+ {
+ name: "Test Receive Messages",
+ kwid: "pkw111",
+ preCreate: true,
+ body: bytes.NewReader(f),
+ expectStatus: http.StatusOK,
+ },
+ {
+ name: "Test Receive Messages Kafka Writer Not registed",
+ kwid: "pkw222",
+ preCreate: false,
+ expectStatus: http.StatusNotFound,
+ },
+ {
+ name: "Test Receive Messages Kafka Writer Not registed",
+ kwid: "pkw111",
+ preCreate: true,
+ body: errReader(0),
+ expectStatus: http.StatusInternalServerError,
+ },
+ }
+ body := `{
+ "bootstrap.servers": "kafka-cluster-kafka-bootstrap.kafka.svc.cluster.local:9092",
+ "topic": "adatopic1",
+ "usePartition": false,
+ "batch.num.messages": 10000,
+ "compression.codec": "snappy"
+ }`
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ if tt.preCreate {
+ preCreateKW(tt.kwid, body)
+ }
+ target := fmt.Sprintf("/pkw/%s/receive", tt.kwid)
+ req := httptest.NewRequest("POST", target, tt.body)
+ r := NewRouter()
+ rec := httptest.NewRecorder()
+ r.ServeHTTP(rec, req)
+ resp := rec.Result()
+ assert.Equal(t, tt.expectStatus, resp.StatusCode)
+ })
+ }
+}
+
+func buildRemoteWriteRequest() ([]byte, error) {
+ var buf []byte
+ samples := []*prompb.TimeSeries{
+ &prompb.TimeSeries{
+ Labels: []*prompb.Label{
+ &prompb.Label{Name: "__name__", Value: "go_gc_duration_seconds_count"},
+ &prompb.Label{Name: "endpoint", Value: "http"},
+ &prompb.Label{Name: "instance", Value: "10.42.1.101:8686"},
+ &prompb.Label{Name: "job", Value: "prom-kafka-writer"},
+ &prompb.Label{Name: "metrics_storage", Value: "kafka_remote"},
+ &prompb.Label{Name: "namespace", Value: "edge1"},
+ &prompb.Label{Name: "pod", Value: "prom-kafka-writer-696898f47f-bc5fs"},
+ &prompb.Label{Name: "prometheus", Value: "edge1/cp-prometheus-prometheus"},
+ &prompb.Label{Name: "prometheus_replica", Value: "prometheus-cp-prometheus-prometheus-0"},
+ &prompb.Label{Name: "service", Value: "prom-kafka-writer"},
+ },
+ Samples: []prompb.Sample{
+ prompb.Sample{
+ Value: 17,
+ Timestamp: 1572479934007,
+ },
+ prompb.Sample{
+ Value: 19,
+ Timestamp: 1572480144007,
+ },
+ },
+ },
+ }
+ req := &prompb.WriteRequest{
+ Timeseries: samples,
+ }
+
+ data, err := proto.Marshal(req)
+ if err != nil {
+ return nil, err
+ }
+
+ // snappy uses len() to see if it needs to allocate a new slice. Make the
+ // buffer as long as possible.
+ if buf != nil {
+ buf = buf[0:cap(buf)]
+ }
+ compressed := snappy.Encode(buf, data)
+ return compressed, nil
+}
diff --git a/vnfs/DAaaS/microservices/prom-kafka-writer/pkg/api/router.go b/vnfs/DAaaS/microservices/prom-kafka-writer/pkg/api/router.go
new file mode 100644
index 00000000..fb78afe2
--- /dev/null
+++ b/vnfs/DAaaS/microservices/prom-kafka-writer/pkg/api/router.go
@@ -0,0 +1,33 @@
+/*
+ *
+ * Copyright 2019 Intel Corporation.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package api
+
+import (
+ "github.com/gorilla/mux"
+ "github.com/prometheus/client_golang/prometheus/promhttp"
+)
+
+func NewRouter() *mux.Router {
+ r := mux.NewRouter()
+ r.HandleFunc("/pkw", ListKWHandler).Methods("GET")
+ r.HandleFunc("/pkw", CreateKWHandler).Methods("POST")
+ r.HandleFunc("/pkw/{kwid}", DeleteKWHandler).Methods("DELETE")
+ r.HandleFunc("/pkw/{kwid}/receive", ReceiveKWHandler).Methods("POST")
+
+ // Metrics Handler for prom-kafka-writer
+ r.Handle("/metrics", promhttp.Handler())
+ return r
+}
diff --git a/vnfs/DAaaS/microservices/prom-kafka-writer/pkg/config/logger.go b/vnfs/DAaaS/microservices/prom-kafka-writer/pkg/config/logger.go
new file mode 100644
index 00000000..2a5921f1
--- /dev/null
+++ b/vnfs/DAaaS/microservices/prom-kafka-writer/pkg/config/logger.go
@@ -0,0 +1,38 @@
+/*
+ *
+ * Copyright 2019 Intel Corporation.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package config
+
+import (
+ "go.uber.org/zap"
+ "log"
+ "sync"
+)
+
+var logOnce sync.Once
+var slogger *zap.SugaredLogger
+
+//GetLoggerInstance returns a singleton instance of logger
+func GetLoggerInstance() *zap.SugaredLogger {
+ logOnce.Do(func() {
+ logger, err := zap.NewProduction()
+ if err != nil {
+ log.Fatalf("can't initialize zap logger: %v", err)
+ }
+ defer logger.Sync()
+ slogger = logger.Sugar()
+ })
+ return slogger
+}
diff --git a/vnfs/DAaaS/microservices/prom-kafka-writer/pkg/kafkawriter/kafkawriter.go b/vnfs/DAaaS/microservices/prom-kafka-writer/pkg/kafkawriter/kafkawriter.go
new file mode 100644
index 00000000..f56f66aa
--- /dev/null
+++ b/vnfs/DAaaS/microservices/prom-kafka-writer/pkg/kafkawriter/kafkawriter.go
@@ -0,0 +1,129 @@
+/*
+ *
+ * Copyright 2019 Intel Corporation.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package kafkawriter
+
+import (
+ "gopkg.in/confluentinc/confluent-kafka-go.v1/kafka"
+ "strconv"
+ "sync"
+)
+
+//KWConfig - serialized type for config related to Kafka
+type KWConfig struct {
+ //Broker - Kafka Bootstrap servers (comma separated)
+ Broker string `json:"bootstrap.servers"`
+ //Topic - kafka topic name
+ Topic string `json:"topic"`
+ //UsePartition - Enforce use of partitions
+ UsePartition bool `json:"usePartition"`
+ BatchMsgNum int `json:"batch.num.messages,omitempty"`
+ Compression string `json:"compression.codec,omitempty"`
+}
+
+//KWProducer - holds the Kafka Config and associated Kafka Producer
+type KWProducer struct {
+ Config KWConfig
+ Producer *kafka.Producer
+}
+
+//KWRespMap packs the KWConfig and kwid for List Api
+type KWRespMap map[string]KWConfig
+
+//KWMap - Stores the Kafka Writer to Kafka Producer Mapping
+// This is used to uniquely identify a Kafka Writer - Producer mapping.
+var (
+ KWMap = make(map[string]KWProducer)
+ kwMutex sync.Mutex
+ id int
+)
+
+// NewKafkaWriter - creates a new producer using kafka config.
+// Handles the remote write from prometheus and send to kafka topic
+func NewKafkaWriter(kwc *KWConfig) (*kafka.Producer, error) {
+ producer, err := kafka.NewProducer(&kafka.ConfigMap{
+ "bootstrap.servers": kwc.Broker,
+ "compression.codec": kwc.Compression,
+ "batch.num.messages": kwc.BatchMsgNum,
+ "go.batch.producer": true,
+ "go.delivery.reports": false,
+ })
+ if err != nil {
+ return nil, err
+ }
+ return producer, nil
+}
+
+//NewKWConfig - creates a KWConfig object with default values
+func NewKWConfig() *KWConfig {
+ return &KWConfig{
+ UsePartition: false,
+ BatchMsgNum: 10000,
+ Compression: "none",
+ }
+}
+
+//NewKWRespMap - packs the KWConfig and kwid for List Api
+func newKWRespMap() KWRespMap {
+ kwr := make(KWRespMap)
+ return kwr
+}
+
+//AddKWC - Method to add KafkaWriterConfig request to KWMap
+func AddKWC(kwc *KWConfig) (string, error) {
+ kwMutex.Lock()
+ defer kwMutex.Unlock()
+ //TODO: Generate kwid
+ kwid := "pkw" + strconv.Itoa(id)
+ id++
+ producer, err := NewKafkaWriter(kwc)
+ if err != nil {
+ log.Error("Error", err)
+ id--
+ return "", err
+ }
+
+ KWMap[kwid] = KWProducer{
+ Config: *kwc,
+ Producer: producer,
+ }
+ return kwid, nil
+}
+
+//DeleteKWC - Method to add KafkaWriter request to KWMap
+func DeleteKWC(kwid string) {
+ kwMutex.Lock()
+ defer kwMutex.Unlock()
+ if _, ok := KWMap[kwid]; ok {
+ KWMap[kwid].Producer.Close()
+ }
+ delete(KWMap, kwid)
+}
+
+//ListKWC - Method to add KafkaWriter request to KWMap
+func ListKWC() KWRespMap {
+ kwr := newKWRespMap()
+ for k, v := range KWMap {
+ kwr[k] = v.Config
+ }
+ return kwr
+}
+
+//Cleanup - Method to cleanup resources
+func Cleanup() {
+ for k := range KWMap {
+ DeleteKWC(k)
+ }
+}
diff --git a/vnfs/DAaaS/microservices/prom-kafka-writer/pkg/kafkawriter/kafkawriter_test.go b/vnfs/DAaaS/microservices/prom-kafka-writer/pkg/kafkawriter/kafkawriter_test.go
new file mode 100644
index 00000000..6869452f
--- /dev/null
+++ b/vnfs/DAaaS/microservices/prom-kafka-writer/pkg/kafkawriter/kafkawriter_test.go
@@ -0,0 +1,229 @@
+/*
+ *
+ * Copyright 2019 Intel Corporation.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package kafkawriter
+
+import (
+ "github.com/stretchr/testify/assert"
+ "reflect"
+ "testing"
+)
+
+func TestNewKafkaWriter(t *testing.T) {
+ type args struct {
+ kwc *KWConfig
+ }
+ kwc := NewKWConfig()
+ kwc.Broker = "localhost:9092"
+ kwc.Topic = "metrics"
+
+ kwc2 := NewKWConfig()
+ kwc2.Broker = "localhost:9092"
+ kwc2.Topic = "metrics"
+ kwc2.BatchMsgNum = 0
+
+ tests := []struct {
+ name string
+ args args
+ want interface{}
+ }{
+ {
+ name: "Test New Kafka Writer",
+ args: args{kwc},
+ want: "rdkafka#producer-1",
+ },
+ {
+ name: "Test New Kafka Writer Wrong Config",
+ args: args{kwc2},
+ want: nil,
+ },
+ }
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ got, _ := NewKafkaWriter(tt.args.kwc)
+ if tt.want == nil {
+ assert.Equal(t, tt.want, nil)
+ } else {
+ if !reflect.DeepEqual(got.String(), tt.want) {
+ t.Errorf("NewKafkaWriter() = %v, want %v", got, tt.want)
+ }
+ }
+ })
+ }
+}
+
+func TestNewKWConfig(t *testing.T) {
+ tests := []struct {
+ name string
+ want *KWConfig
+ }{
+ {
+ name: "Test New Kafka Config",
+ want: &KWConfig{
+ UsePartition: false,
+ BatchMsgNum: 10000,
+ Compression: "none",
+ },
+ },
+ }
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ if got := NewKWConfig(); !reflect.DeepEqual(got, tt.want) {
+ t.Errorf("NewKWConfig() = %v, want %v", got, tt.want)
+ }
+ })
+ }
+}
+
+func TestAddKWC(t *testing.T) {
+ type args struct {
+ kwc *KWConfig
+ }
+ kwc := NewKWConfig()
+ kwc.Broker = "localhost:9092"
+ kwc.Topic = "metrics"
+ tests := []struct {
+ name string
+ args args
+ want string
+ }{
+ {
+ name: "Test Add Kafka Writer 1",
+ args: args{kwc},
+ want: "pkw0",
+ },
+ {
+ name: "Test Add Kafka Writer 2 ",
+ args: args{kwc},
+ want: "pkw1",
+ },
+ }
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ got, _ := AddKWC(tt.args.kwc)
+ assert.Equal(t, tt.want, got)
+
+ })
+ }
+ assert.Equal(t, 2, len(KWMap))
+}
+
+func TestDeleteKWC(t *testing.T) {
+ type args struct {
+ kwid string
+ }
+ tests := []struct {
+ name string
+ args args
+ delcount int
+ }{
+ {
+ name: "Test Delete Kafka Writer 1",
+ args: args{"pkw0"},
+ delcount: 1,
+ },
+ {
+ name: "Test Delete Kafka Writer Non existent",
+ args: args{"pkw3"},
+ delcount: 0,
+ },
+ {
+ name: "Test Delete Kafka Writer 2",
+ args: args{"pkw1"},
+ delcount: 1,
+ },
+ }
+ for _, tt := range tests {
+ l := len(KWMap)
+ t.Run(tt.name, func(t *testing.T) {
+ DeleteKWC(tt.args.kwid)
+ })
+ assert.Equal(t, l-tt.delcount, len(KWMap))
+ }
+ assert.Equal(t, 0, len(KWMap))
+}
+
+func TestListKWC(t *testing.T) {
+ tests := []struct {
+ name string
+ init func() string
+ want KWRespMap
+ clean func(string)
+ }{
+ {
+ name: "Test List Kafka Writers Empty",
+ want: KWRespMap{"pkw2": {
+ Broker: "localhost:9092",
+ Topic: "metrics",
+ UsePartition: false,
+ BatchMsgNum: 10000,
+ Compression: "none",
+ }},
+ init: func() string {
+ kwc := NewKWConfig()
+ kwc.Broker = "localhost:9092"
+ kwc.Topic = "metrics"
+ id, _ := AddKWC(kwc)
+ return id
+ },
+ clean: func(id string) {
+ DeleteKWC(id)
+ },
+ },
+ {
+ name: "Test List Kafka Writers Empty",
+ want: KWRespMap{},
+ init: func() string {
+ return ""
+ },
+ clean: func(string) {},
+ },
+ }
+ for _, tt := range tests {
+ id := tt.init()
+ t.Run(tt.name, func(t *testing.T) {
+ if got := ListKWC(); !reflect.DeepEqual(got, tt.want) {
+ t.Errorf("ListKWC() = %v, want %v", got, tt.want)
+ }
+ })
+ tt.clean(id)
+ }
+}
+
+func TestCleanup(t *testing.T) {
+ tests := []struct {
+ name string
+ init func()
+ }{
+ {
+ name: "Test List Kafka Writers Empty",
+ init: func() {
+ kwc := NewKWConfig()
+ kwc.Broker = "localhost:9092"
+ kwc.Topic = "metrics"
+ AddKWC(kwc)
+ AddKWC(kwc)
+ AddKWC(kwc)
+ },
+ },
+ }
+ for _, tt := range tests {
+ tt.init()
+ t.Run(tt.name, func(t *testing.T) {
+ Cleanup()
+ })
+ assert.Equal(t, 0, len(KWMap))
+ }
+}
diff --git a/vnfs/DAaaS/microservices/prom-kafka-writer/pkg/kafkawriter/producer.go b/vnfs/DAaaS/microservices/prom-kafka-writer/pkg/kafkawriter/producer.go
new file mode 100644
index 00000000..42804321
--- /dev/null
+++ b/vnfs/DAaaS/microservices/prom-kafka-writer/pkg/kafkawriter/producer.go
@@ -0,0 +1,83 @@
+/*
+ *
+ * Copyright 2019 Intel Corporation.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package kafkawriter
+
+import (
+ "encoding/json"
+ "github.com/prometheus/common/model"
+ "github.com/prometheus/prometheus/prompb"
+ "gopkg.in/confluentinc/confluent-kafka-go.v1/kafka"
+ logger "prom-kafka-writer/pkg/config"
+)
+
+var log = logger.GetLoggerInstance()
+
+func PublishTimeSeries(kwid string, metrics *prompb.WriteRequest) error {
+ log.Debugw("Remote write Time Series", "length", len(metrics.Timeseries), "TimeSeries", metrics.Timeseries)
+ for _, ts := range metrics.Timeseries {
+ m := make(model.Metric, len(ts.Labels))
+ for _, l := range ts.Labels {
+ m[model.LabelName(l.Name)] = model.LabelValue(l.Value)
+ }
+ log.Debugw("Labels", "Labelset", m)
+
+ for _, s := range ts.Samples {
+ log.Debugf(" %f %d\n", s.Value, s.Timestamp)
+ metric := map[string]interface{}{
+ "name": m["__name__"],
+ "labels": m,
+ "timestamp": s.Timestamp,
+ "value": s.Value,
+ }
+ key := string(m["__name__"])
+ jsonMetric, err := json.Marshal(metric)
+ if err != nil {
+ log.Errorw("Marshal error", "error", err.Error())
+ continue
+ }
+ err = publish(kwid, key, jsonMetric)
+ if err != nil {
+ log.Error("Failed to produce message")
+ return err
+ }
+ }
+ }
+ return nil
+}
+
+func publish(kwid string, key string, jsonMetric []byte) error {
+ var (
+ kwp = KWMap[kwid].Producer
+ kwc = KWMap[kwid].Config
+ )
+
+ tp := getTopicPartition(kwc)
+ kwMsg := kafka.Message{TopicPartition: tp, Key: []byte(key), Value: jsonMetric}
+ err := kwp.Produce(&kwMsg, nil)
+ if err != nil {
+ log.Errorw("Kafka Producer Error", "error", err.Error())
+ }
+ return err
+}
+
+func getTopicPartition(kwc KWConfig) kafka.TopicPartition {
+ p := kafka.PartitionAny
+ if kwc.UsePartition {
+ // TODO: Implement partition strategy
+ p = kafka.PartitionAny
+ }
+ return kafka.TopicPartition{Topic: &kwc.Topic, Partition: p}
+}
diff --git a/vnfs/DAaaS/microservices/prom-kafka-writer/skaffold.yaml b/vnfs/DAaaS/microservices/prom-kafka-writer/skaffold.yaml
new file mode 100644
index 00000000..5e5fd4bc
--- /dev/null
+++ b/vnfs/DAaaS/microservices/prom-kafka-writer/skaffold.yaml
@@ -0,0 +1,16 @@
+apiVersion: skaffold/v1beta6
+kind: Config
+build:
+ tagPolicy:
+ sha256: {}
+ artifacts:
+ - context: .
+ image: prom-kafka-writer
+deploy:
+ kubectl:
+ manifests:
+ - kubernetes-manifests/**
+profiles:
+- name: cloudbuild
+ build:
+ googleCloudBuild: {}