aboutsummaryrefslogtreecommitdiffstats
path: root/vnfs/DAaaS/microservices/GoApps/src/go-hdfs-writer
diff options
context:
space:
mode:
Diffstat (limited to 'vnfs/DAaaS/microservices/GoApps/src/go-hdfs-writer')
-rw-r--r--vnfs/DAaaS/microservices/GoApps/src/go-hdfs-writer/Dockerfile38
-rw-r--r--vnfs/DAaaS/microservices/GoApps/src/go-hdfs-writer/cmd/hdfs-writer/main.go51
-rw-r--r--vnfs/DAaaS/microservices/GoApps/src/go-hdfs-writer/go.mod17
-rw-r--r--vnfs/DAaaS/microservices/GoApps/src/go-hdfs-writer/go.sum27
-rw-r--r--vnfs/DAaaS/microservices/GoApps/src/go-hdfs-writer/kubernetes-manifests/configmap_hdfs.yaml10
-rw-r--r--vnfs/DAaaS/microservices/GoApps/src/go-hdfs-writer/kubernetes-manifests/configmap_kafka.yaml11
-rw-r--r--vnfs/DAaaS/microservices/GoApps/src/go-hdfs-writer/kubernetes-manifests/configmap_writer.yaml22
-rw-r--r--vnfs/DAaaS/microservices/GoApps/src/go-hdfs-writer/kubernetes-manifests/hdfs_writer_deployment.yaml63
-rw-r--r--vnfs/DAaaS/microservices/GoApps/src/go-hdfs-writer/kubernetes-manifests/hdfs_writer_service.yaml14
-rw-r--r--vnfs/DAaaS/microservices/GoApps/src/go-hdfs-writer/pkg/handler/handler.go96
-rw-r--r--vnfs/DAaaS/microservices/GoApps/src/go-hdfs-writer/pkg/pipeline/pipeline.go130
-rw-r--r--vnfs/DAaaS/microservices/GoApps/src/go-hdfs-writer/pkg/sample-rest-requests/createWriter.json11
-rw-r--r--vnfs/DAaaS/microservices/GoApps/src/go-hdfs-writer/pkg/utils/hdfs-config.go37
-rw-r--r--vnfs/DAaaS/microservices/GoApps/src/go-hdfs-writer/pkg/utils/hdfsUtils.go33
-rw-r--r--vnfs/DAaaS/microservices/GoApps/src/go-hdfs-writer/pkg/utils/kafka-config.go55
-rw-r--r--vnfs/DAaaS/microservices/GoApps/src/go-hdfs-writer/pkg/utils/logutils.go32
-rw-r--r--vnfs/DAaaS/microservices/GoApps/src/go-hdfs-writer/pkg/utils/readJson.go28
-rw-r--r--vnfs/DAaaS/microservices/GoApps/src/go-hdfs-writer/skaffold.yaml19
18 files changed, 694 insertions, 0 deletions
diff --git a/vnfs/DAaaS/microservices/GoApps/src/go-hdfs-writer/Dockerfile b/vnfs/DAaaS/microservices/GoApps/src/go-hdfs-writer/Dockerfile
new file mode 100644
index 00000000..ee476717
--- /dev/null
+++ b/vnfs/DAaaS/microservices/GoApps/src/go-hdfs-writer/Dockerfile
@@ -0,0 +1,38 @@
+# Use base golang image from Docker Hub
+FROM golang:1.12.7
+
+# Download the dlv (delve) debugger for go (you can comment this out if unused)
+RUN go get -u github.com/go-delve/delve/cmd/dlv
+
+WORKDIR /src/hdfs-writer
+
+RUN mkdir /librdkafka-dir && cd /librdkafka-dir
+RUN git clone https://github.com/edenhill/librdkafka.git && \
+cd librdkafka && \
+./configure --prefix /usr && \
+make && \
+make install
+
+# Install dependencies in go.mod and go.sum
+COPY go.mod go.sum ./
+RUN go mod download
+
+# Copy rest of the application source code
+COPY . ./
+
+# Compile the application to /app.
+RUN go build -o /hdfs-writer -v ./cmd/hdfs-writer
+
+# If you want to use the debugger, you need to modify the entrypoint to the
+# container and point it to the "dlv debug" command:
+# * UNCOMMENT the following ENTRYPOINT statement,
+# * COMMENT OUT the last ENTRYPOINT statement
+# Start the "dlv debug" server on port 3000 of the container. (Note that the
+# application process will NOT start until the debugger is attached.)
+#ENTRYPOINT ["dlv", "debug", "./cmd/hdfs-writer", "--api-version=2", "--headless", "--listen=:3001", "--log", "--log-dest=/home.dlv.log"]
+
+# If you want to run WITHOUT the debugging server:
+# * COMMENT OUT the previous ENTRYPOINT statements,
+# * UNCOMMENT the following ENTRYPOINT statement.
+#ENTRYPOINT ["/bin/sleep", "3600"]
+ENTRYPOINT ["/hdfs-writer"] \ No newline at end of file
diff --git a/vnfs/DAaaS/microservices/GoApps/src/go-hdfs-writer/cmd/hdfs-writer/main.go b/vnfs/DAaaS/microservices/GoApps/src/go-hdfs-writer/cmd/hdfs-writer/main.go
new file mode 100644
index 00000000..a79a3e06
--- /dev/null
+++ b/vnfs/DAaaS/microservices/GoApps/src/go-hdfs-writer/cmd/hdfs-writer/main.go
@@ -0,0 +1,51 @@
+package main
+
+import (
+ "context"
+ "fmt"
+ "net/http"
+ "os"
+ "os/signal"
+ "time"
+
+ handler "hdfs-writer/pkg/handler"
+ utils "hdfs-writer/pkg/utils"
+)
+
+func main() {
+
+ slogger := utils.GetLoggerInstance()
+
+ // Create the server
+ httpServer := &http.Server{
+ Handler: handler.CreateRouter(),
+ Addr: ":9393",
+ }
+
+ connectionsClose := make(chan struct{})
+ go func() {
+ c := make(chan os.Signal, 1)
+ signal.Notify(c, os.Interrupt)
+ <-c // function literal waiting to receive Interrupt signal
+ fmt.Printf(":::Got the kill signal:::")
+ slogger.Info(":::Got the kill signal:::")
+ for eachWriter, eachChannel := range handler.ChannelMap {
+ slogger.Infof("Closing writer goroutine :: %s", eachWriter)
+ slogger.Infof("eachChannel:: %v", eachChannel)
+ close(eachChannel)
+ // This wait time ensures that the each of the channel is killed before
+ // main routine finishes.
+ time.Sleep(time.Second * 5)
+ }
+ //once all goroutines are signalled, send close to main thread
+ httpServer.Shutdown(context.Background())
+ close(connectionsClose)
+ }()
+
+ // Sever starts listening
+ err := httpServer.ListenAndServe()
+ if err != nil && err != http.ErrServerClosed {
+ slogger.Fatal(err)
+ }
+ <-connectionsClose //main thread waiting to receive close signal
+}
diff --git a/vnfs/DAaaS/microservices/GoApps/src/go-hdfs-writer/go.mod b/vnfs/DAaaS/microservices/GoApps/src/go-hdfs-writer/go.mod
new file mode 100644
index 00000000..75df2038
--- /dev/null
+++ b/vnfs/DAaaS/microservices/GoApps/src/go-hdfs-writer/go.mod
@@ -0,0 +1,17 @@
+module hdfs-writer
+
+go 1.12
+
+require (
+ github.com/colinmarc/hdfs v1.1.3
+ github.com/confluentinc/confluent-kafka-go v1.1.0
+ github.com/golang/protobuf v1.3.2 // indirect
+ github.com/google/uuid v1.1.1
+ github.com/gorilla/mux v1.7.3
+ github.com/pkg/errors v0.8.1 // indirect
+ github.com/stretchr/testify v1.3.0 // indirect
+ go.uber.org/atomic v1.4.0 // indirect
+ go.uber.org/multierr v1.1.0 // indirect
+ go.uber.org/zap v1.10.0
+ gopkg.in/confluentinc/confluent-kafka-go.v1 v1.1.0
+)
diff --git a/vnfs/DAaaS/microservices/GoApps/src/go-hdfs-writer/go.sum b/vnfs/DAaaS/microservices/GoApps/src/go-hdfs-writer/go.sum
new file mode 100644
index 00000000..53cc1db4
--- /dev/null
+++ b/vnfs/DAaaS/microservices/GoApps/src/go-hdfs-writer/go.sum
@@ -0,0 +1,27 @@
+github.com/colinmarc/hdfs v1.1.3 h1:662salalXLFmp+ctD+x0aG+xOg62lnVnOJHksXYpFBw=
+github.com/colinmarc/hdfs v1.1.3/go.mod h1:0DumPviB681UcSuJErAbDIOx6SIaJWj463TymfZG02I=
+github.com/confluentinc/confluent-kafka-go v1.1.0 h1:HIW7Nkm8IeKRotC34mGY06DwQMf9Mp9PZMyqDxid2wI=
+github.com/confluentinc/confluent-kafka-go v1.1.0/go.mod h1:u2zNLny2xq+5rWeTQjFHbDzzNuba4P1vo31r9r4uAdg=
+github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8=
+github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
+github.com/golang/protobuf v1.3.2 h1:6nsPYzhq5kReh6QImI3k5qWzO4PEbvbIW2cwSfR/6xs=
+github.com/golang/protobuf v1.3.2/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
+github.com/google/uuid v1.1.1 h1:Gkbcsh/GbpXz7lPftLA3P6TYMwjCLYm83jiFQZF/3gY=
+github.com/google/uuid v1.1.1/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
+github.com/gorilla/mux v1.7.3 h1:gnP5JzjVOuiZD07fKKToCAOjS0yOpj/qPETTXCCS6hw=
+github.com/gorilla/mux v1.7.3/go.mod h1:1lud6UwP+6orDFRuTfBEV8e9/aOM/c4fVVCaMa2zaAs=
+github.com/pkg/errors v0.8.1 h1:iURUrRGxPUNPdy5/HRSm+Yj6okJ6UtLINN0Q9M4+h3I=
+github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
+github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
+github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
+github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
+github.com/stretchr/testify v1.3.0 h1:TivCn/peBQ7UY8ooIcPgZFpTNSz0Q2U6UrFlUfqbe0Q=
+github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
+go.uber.org/atomic v1.4.0 h1:cxzIVoETapQEqDhQu3QfnvXAV4AlzcvUCxkVUFw3+EU=
+go.uber.org/atomic v1.4.0/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE=
+go.uber.org/multierr v1.1.0 h1:HoEmRHQPVSqub6w2z2d2EOVs2fjyFRGyofhKuyDq0QI=
+go.uber.org/multierr v1.1.0/go.mod h1:wR5kodmAFQ0UK8QlbwjlSNy0Z68gJhDJUG5sjR94q/0=
+go.uber.org/zap v1.10.0 h1:ORx85nbTijNz8ljznvCMR1ZBIPKFn3jQrag10X2AsuM=
+go.uber.org/zap v1.10.0/go.mod h1:vwi/ZaCAaUcBkycHslxD9B2zi4UTXhF60s6SWpuDF0Q=
+gopkg.in/confluentinc/confluent-kafka-go.v1 v1.1.0 h1:roy97m/3wj9/o8OuU3sZ5wildk30ep38k2x8nhNbKrI=
+gopkg.in/confluentinc/confluent-kafka-go.v1 v1.1.0/go.mod h1:ZdI3yfYmdNSLQPNCpO1y00EHyWaHG5EnQEyL/ntAegY=
diff --git a/vnfs/DAaaS/microservices/GoApps/src/go-hdfs-writer/kubernetes-manifests/configmap_hdfs.yaml b/vnfs/DAaaS/microservices/GoApps/src/go-hdfs-writer/kubernetes-manifests/configmap_hdfs.yaml
new file mode 100644
index 00000000..1fdd7ca2
--- /dev/null
+++ b/vnfs/DAaaS/microservices/GoApps/src/go-hdfs-writer/kubernetes-manifests/configmap_hdfs.yaml
@@ -0,0 +1,10 @@
+# Preserving a sample config of hdfs
+#NOTE : The kafka config shall come through the REST request as part of writer config
+apiVersion: v1
+kind: ConfigMap
+metadata:
+ name: configmap-hdfs
+data:
+ #hdfs_url: hdfs1-namenode-1.hdfs1-namenode.hdfs1:8020
+ hdfs_url: hdfs1-namenode:8020
+ \ No newline at end of file
diff --git a/vnfs/DAaaS/microservices/GoApps/src/go-hdfs-writer/kubernetes-manifests/configmap_kafka.yaml b/vnfs/DAaaS/microservices/GoApps/src/go-hdfs-writer/kubernetes-manifests/configmap_kafka.yaml
new file mode 100644
index 00000000..503896b3
--- /dev/null
+++ b/vnfs/DAaaS/microservices/GoApps/src/go-hdfs-writer/kubernetes-manifests/configmap_kafka.yaml
@@ -0,0 +1,11 @@
+# Preserving a sample config of kafka broker
+#NOTE : The kafka config shall come through the REST request as part of writer config
+apiVersion: v1
+kind: ConfigMap
+metadata:
+ name: configmap-kafka
+data:
+ #broker: kafka-cluster-kafka-1.kafka-cluster-kafka-brokers.hdfs1.svc.cluster.local:9092
+ broker: kafka-cluster-kafka-bootstrap:9092
+ group: grp1i
+ topic: newTopc9 \ No newline at end of file
diff --git a/vnfs/DAaaS/microservices/GoApps/src/go-hdfs-writer/kubernetes-manifests/configmap_writer.yaml b/vnfs/DAaaS/microservices/GoApps/src/go-hdfs-writer/kubernetes-manifests/configmap_writer.yaml
new file mode 100644
index 00000000..f5d0833b
--- /dev/null
+++ b/vnfs/DAaaS/microservices/GoApps/src/go-hdfs-writer/kubernetes-manifests/configmap_writer.yaml
@@ -0,0 +1,22 @@
+#Preserving a sample config of writer.
+#NOTE : The writer config shall come through the REST request
+apiVersion: v1
+kind: ConfigMap
+metadata:
+ name: config-json
+data:
+ config.json: |-
+ [
+ {"writer": {
+ "kafkaConfig": {
+ "broker": "kafka-cluster-kafka-bootstrap:9092",
+ "group": "grp1",
+ "topic": "newTopic9"
+ },
+ "hdfsConfig": {
+ "hdfs_url": "hdfs1-namenode:8020"
+ }
+ }
+ }
+ ]
+ \ No newline at end of file
diff --git a/vnfs/DAaaS/microservices/GoApps/src/go-hdfs-writer/kubernetes-manifests/hdfs_writer_deployment.yaml b/vnfs/DAaaS/microservices/GoApps/src/go-hdfs-writer/kubernetes-manifests/hdfs_writer_deployment.yaml
new file mode 100644
index 00000000..393a1d78
--- /dev/null
+++ b/vnfs/DAaaS/microservices/GoApps/src/go-hdfs-writer/kubernetes-manifests/hdfs_writer_deployment.yaml
@@ -0,0 +1,63 @@
+# This Deployment manifest defines:
+# - single-replica deployment of the container image, with label "app: go-hello-world"
+# - Pod exposes port 8080
+# - specify PORT environment variable to the container process
+# Syntax reference https://kubernetes.io/docs/concepts/configuration/overview/
+apiVersion: apps/v1
+kind: Deployment
+metadata:
+ name: go-hdfs-writer
+spec:
+ replicas: 1
+ selector:
+ matchLabels:
+ app: hdfs-writer
+ template:
+ metadata:
+ labels:
+ app: hdfs-writer
+ spec:
+ containers:
+ - name: server
+ image: hdfs-writer
+ volumeMounts:
+ - name: config-volume
+ mountPath: src/hdfs-writer/cmd/hdfs-writer/config.json
+ subPath: config.json
+ ports:
+ - containerPort: 8080
+ env:
+ - name: PORT
+ value: "8080"
+ - name: BROKER
+ valueFrom:
+ configMapKeyRef:
+ name: configmap-kafka
+ key: broker
+ - name: GROUP
+ valueFrom:
+ configMapKeyRef:
+ name: configmap-kafka
+ key: group
+ - name: TOPIC
+ valueFrom:
+ configMapKeyRef:
+ name: configmap-kafka
+ key: topic
+ - name: HDFS_URL
+ valueFrom:
+ configMapKeyRef:
+ name: configmap-hdfs
+ key: hdfs_url
+ resources:
+ requests:
+ memory: "640Mi"
+ cpu: "2500m"
+ limits:
+ memory: "1280Mi"
+ cpu: "5000m"
+ volumes:
+ - name: config-volume
+ configMap:
+ name: config-json
+ terminationGracePeriodSeconds: 3
diff --git a/vnfs/DAaaS/microservices/GoApps/src/go-hdfs-writer/kubernetes-manifests/hdfs_writer_service.yaml b/vnfs/DAaaS/microservices/GoApps/src/go-hdfs-writer/kubernetes-manifests/hdfs_writer_service.yaml
new file mode 100644
index 00000000..596ad5fd
--- /dev/null
+++ b/vnfs/DAaaS/microservices/GoApps/src/go-hdfs-writer/kubernetes-manifests/hdfs_writer_service.yaml
@@ -0,0 +1,14 @@
+# This is required for testing using the POSTMAN
+kind: Service
+apiVersion: v1
+metadata:
+ name: hdfs-writer-svc
+spec:
+ type: NodePort
+ selector:
+ app: hdfs-writer
+ ports:
+ - nodePort: 30303
+ port: 9393
+ targetPort: 9393
+ \ No newline at end of file
diff --git a/vnfs/DAaaS/microservices/GoApps/src/go-hdfs-writer/pkg/handler/handler.go b/vnfs/DAaaS/microservices/GoApps/src/go-hdfs-writer/pkg/handler/handler.go
new file mode 100644
index 00000000..65021b4a
--- /dev/null
+++ b/vnfs/DAaaS/microservices/GoApps/src/go-hdfs-writer/pkg/handler/handler.go
@@ -0,0 +1,96 @@
+package handler
+
+
+import (
+ "fmt"
+ "net/http"
+ "io/ioutil"
+ "encoding/json"
+ "github.com/gorilla/mux"
+
+ guuid "github.com/google/uuid"
+ pipeline "hdfs-writer/pkg/pipeline"
+ utils "hdfs-writer/pkg/utils"
+)
+
+
+var slogger = utils.GetLoggerInstance()
+// ChannelMap is the global map to store writerNames as key and channels as values.
+var ChannelMap =make(map[string]chan struct{})
+
+
+// This is a sample test request handler
+func testFunc(w http.ResponseWriter, r *http.Request){
+ slogger.Info("Invoking testFunc ...")
+ w.WriteHeader(http.StatusOK)
+ fmt.Fprintln(w,"HTTP Test successful ")
+}
+
+// CreateRouter returns a http handler for the registered URLs
+func CreateRouter() http.Handler{
+ router := mux.NewRouter().StrictSlash(true)
+ slogger.Info("Created router ...")
+ router.HandleFunc("/test", testFunc).Methods("GET")
+ router.HandleFunc("/createWriter", createWriter).Methods("POST")
+ router.HandleFunc("/deleteWriter/{writerName}", deleteWriter).Methods("DELETE")
+ return router
+}
+
+
+// CreateWriter creates a pipeline
+func createWriter(w http.ResponseWriter, r *http.Request){
+ reqBody, _ := ioutil.ReadAll(r.Body)
+ slogger.Info(string(reqBody))
+ var results map[string]interface{}
+ json.Unmarshal(reqBody, &results)
+ if len(results)==0{
+ slogger.Fatalf("Unable to read from the config json file, unable to create configObject map")
+ }
+ writerStr := "writer"
+ writer := results[writerStr].(map[string]interface{})
+ kafkaConfigMapObj := writer["kafkaConfig"].(map[string]interface{})
+ hdfsConfigObj := writer["hdfsConfig"].(map[string]interface{})
+
+ kc := utils.SetKafkaParametersByObjectMap(kafkaConfigMapObj)
+ hc := utils.SetHdfsParametersByObjectMap(hdfsConfigObj)
+
+ //populate the channelMap
+ pipelineChan := make(chan struct{})
+ slogger.Infof("Channel created by post :: %v", pipelineChan)
+ uuid := guuid.New().String()
+ //slogger.Infof("guuid :: %s",uuid)
+ slogger.Infof(":: Storing writerName and channel in ChannelMap :: ")
+ writerName := writerStr+"-"+uuid[len(uuid)-4:]
+ slogger.Infof("::writerName:: %s ",writerName)
+ ChannelMap[writerName] = pipelineChan
+
+ // envoke the go routine to build pipeline
+ go pipeline.BuildWriterPipeline(kc,hc, writerName, ChannelMap[writerName])
+ successMessage := fmt.Sprintf("Created the writer ::%s", writerName)
+ w.WriteHeader(http.StatusOK)
+ fmt.Fprintln(w,successMessage)
+}
+
+
+// deleteWriter deletes a given writer pipeline
+func deleteWriter(w http.ResponseWriter, r *http.Request){
+ vars := mux.Vars(r)
+ writerName := vars["writerName"]
+ if _, keyExists := ChannelMap[writerName]; keyExists{
+ slogger.Infof("::Writer to be closed:: %s",writerName)
+ toBeClosedChannel := ChannelMap[writerName]
+ close(toBeClosedChannel)
+ // deleting the channel from ChannelMap after closure to
+ // avoid closing the closed channel
+ delete(ChannelMap, writerName)
+
+ w.WriteHeader(http.StatusOK)
+ deleteMessage := fmt.Sprintf("Deleted writer :: %s",writerName)
+ fmt.Fprintln(w,deleteMessage)
+
+ }else{
+ notFoundMessage := fmt.Sprintf("Could not find writer :: %s",writerName)
+ fmt.Fprintln(w,notFoundMessage)
+ }
+
+} \ No newline at end of file
diff --git a/vnfs/DAaaS/microservices/GoApps/src/go-hdfs-writer/pkg/pipeline/pipeline.go b/vnfs/DAaaS/microservices/GoApps/src/go-hdfs-writer/pkg/pipeline/pipeline.go
new file mode 100644
index 00000000..c5dbd3cd
--- /dev/null
+++ b/vnfs/DAaaS/microservices/GoApps/src/go-hdfs-writer/pkg/pipeline/pipeline.go
@@ -0,0 +1,130 @@
+package pipeline
+
+import (
+ "fmt"
+ "os"
+ "github.com/colinmarc/hdfs"
+ "github.com/confluentinc/confluent-kafka-go/kafka"
+ utils "hdfs-writer/pkg/utils"
+
+)
+
+// BuildWriterPipeline builds a pipeline
+func BuildWriterPipeline(k utils.KafkaConfig, h utils.HdfsConfig, writerName string, sigchan chan struct{}) {
+ slogger := utils.GetLoggerInstance()
+ topics := make([]string, 1)
+ topics[0] = k.GetTopic()
+
+ c,err := kafka.NewConsumer(&kafka.ConfigMap{
+ "bootstrap.servers": k.GetBroker(),
+ "broker.address.family": "v4",
+ "group.id": k.GetGroup(),
+ "session.timeout.ms": 6000,
+ "auto.offset.reset": "earliest"})
+
+ if err != nil {
+ fmt.Fprintf(os.Stderr, "Failed to create consumer: %s\n", err)
+ os.Exit(1)
+ }
+ fmt.Printf("Created Consumer %v\n", c)
+ err = c.SubscribeTopics(topics, nil)
+
+ run := true
+ setUpPipeline := false
+
+ var hdfsFileWriter *hdfs.FileWriter
+ var hdfsFileWriterError error
+ // HDFS CLIENT CREATION
+ //client := utils.GetHdfsClientInstance(h.GetHdfsURL())
+ client := utils.CreateHdfsClient(h.GetHdfsURL())
+
+
+ for run==true {
+ select {
+ case sig := <-sigchan:
+ client.Close()
+ if hdfsFileWriter!=nil{
+ cleanup(hdfsFileWriter)
+ }
+ slogger.Infof("\nCaught signal %v: terminating the go-routine of writer :: %s\n", sig, writerName)
+ run = false
+ default:
+ //slogger.Info("Running default option ....")
+ ev := c.Poll(100)
+ if ev == nil {
+ continue
+ }
+ //:: BEGIN : Switch between different types of messages that come out of kafka
+ switch e := ev.(type){
+ case *kafka.Message:
+ slogger.Infof("::: Message on %s\n%s\n", e.TopicPartition, e.Value)
+ dataStr := string(e.Value)
+ slogger.Infof("byte array ::: %s", []byte(dataStr))
+ fileInfo, fileInfoError := client.Stat("/" + k.GetTopic())
+ // create file if it doesnt exists already
+ if fileInfoError != nil {
+ slogger.Infof("Error::: %s",fileInfoError)
+ slogger.Infof("Creating file::: %s", "/"+k.GetTopic())
+ hdfsFileWriterError = client.CreateEmptyFile("/"+k.GetTopic())
+ if hdfsFileWriterError !=nil {
+ slogger.Infof("Creation of empty file ::: %s failed\n Error:: %s",
+ "/"+k.GetTopic(), hdfsFileWriterError.Error())
+ panic(fmt.Sprintf("Creation of empty file ::: %s failed", k.GetTopic()))
+ }
+ _= client.Chmod("/"+k.GetTopic(), 0777);
+ }
+ newDataStr := dataStr + "\n"
+ // file exists case, so just append
+ hdfsFileWriter, hdfsFileWriterError = client.Append("/"+fileInfo.Name())
+
+ if hdfsFileWriterError != nil || hdfsFileWriter==nil{
+ if(hdfsFileWriter==nil){
+ slogger.Infof("hdfsFileWriter is NULL !!")
+ }
+ slogger.Infof(":::Appending to file : %s failed:::\nError occured:::%s\n",
+ "/"+k.GetTopic(),hdfsFileWriterError)
+ panic(fmt.Sprintf("Appending to file : %s failed", k.GetTopic()))
+ }
+ bytesWritten, error := hdfsFileWriter.Write([]byte(newDataStr))
+ if bytesWritten > 0 && error == nil {
+ slogger.Infof("::: Wrote %s to HDFS:::", newDataStr)
+ slogger.Infof("::: Wrote %d bytes to HDFS:::", bytesWritten)
+
+ if setUpPipeline==false{
+ slogger.Infof(" The pipeline with topic: %s and hdfs url %s is setup,"+
+ "watching for more messages.. ",k.GetTopic(), h.GetHdfsURL())
+ setUpPipeline = true
+ }
+ } else {
+ slogger.Info("::: Unable to write to HDFS\n :::Error:: %s",error)
+ }
+ hdfsFileWriter.Close()
+
+ case kafka.Error:
+ // Errors should generally be considered
+ // informational, the client will try to
+ // automatically recover.
+ // But in this example we choose to terminate
+ // the application if all brokers are down.
+ fmt.Fprintf(os.Stderr, "%% Error: %v: %v\n", e.Code(), e)
+ if e.Code() == kafka.ErrAllBrokersDown {
+ run = false
+ }
+
+ default:
+ fmt.Printf("Ignored %v\n", e)
+ } //:: END : Switch between different types of messages that come out of kafka
+ } // END: select channel
+ } // END : infinite loop
+
+ fmt.Printf("Closing the consumer")
+}
+
+func cleanup(h *hdfs.FileWriter){
+ if h!=nil{
+ err := h.Close()
+ if err!=nil{
+ fmt.Printf(":::Error occured while closing the hdfs writer::: \n%s", err.Error())
+ }
+ }
+} \ No newline at end of file
diff --git a/vnfs/DAaaS/microservices/GoApps/src/go-hdfs-writer/pkg/sample-rest-requests/createWriter.json b/vnfs/DAaaS/microservices/GoApps/src/go-hdfs-writer/pkg/sample-rest-requests/createWriter.json
new file mode 100644
index 00000000..9a41d91b
--- /dev/null
+++ b/vnfs/DAaaS/microservices/GoApps/src/go-hdfs-writer/pkg/sample-rest-requests/createWriter.json
@@ -0,0 +1,11 @@
+{"writer": {
+ "kafkaConfig": {
+ "broker": "kafka-cluster-kafka-bootstrap:9092",
+ "group": "grp1",
+ "topic": "newTopic9"
+ },
+ "hdfsConfig": {
+ "hdfs_url": "hdfs1-namenode:8020"
+ }
+}
+} \ No newline at end of file
diff --git a/vnfs/DAaaS/microservices/GoApps/src/go-hdfs-writer/pkg/utils/hdfs-config.go b/vnfs/DAaaS/microservices/GoApps/src/go-hdfs-writer/pkg/utils/hdfs-config.go
new file mode 100644
index 00000000..ac33bc6a
--- /dev/null
+++ b/vnfs/DAaaS/microservices/GoApps/src/go-hdfs-writer/pkg/utils/hdfs-config.go
@@ -0,0 +1,37 @@
+package utils
+
+import (
+ "os"
+)
+
+// SetHdfsParametersByObjectMap set the value of the hdfs config parameters
+// and return HdfsConfig object
+func SetHdfsParametersByObjectMap(m map[string]interface{}) HdfsConfig{
+
+ hc := HdfsConfig{}
+ hc.hdfsURL = m["hdfs_url"].(string)
+ return hc
+
+}
+
+// SetHdfsParametersByEnvVariables sets the hdfs parameters
+func SetHdfsParametersByEnvVariables() HdfsConfig {
+
+ slogger := GetLoggerInstance()
+ hdfsConfigObject := HdfsConfig{
+ hdfsURL: os.Getenv("HDFS_URL"),
+ }
+ slogger.Infof("::hdfsURL:: %s", hdfsConfigObject.hdfsURL)
+ return hdfsConfigObject
+
+}
+
+// HdfsConfig contains hdfs related config items
+type HdfsConfig struct {
+ hdfsURL string
+}
+
+// GetHdfsURL returns HdfsURL
+func (h HdfsConfig) GetHdfsURL() string {
+ return h.hdfsURL
+} \ No newline at end of file
diff --git a/vnfs/DAaaS/microservices/GoApps/src/go-hdfs-writer/pkg/utils/hdfsUtils.go b/vnfs/DAaaS/microservices/GoApps/src/go-hdfs-writer/pkg/utils/hdfsUtils.go
new file mode 100644
index 00000000..1a93a5ad
--- /dev/null
+++ b/vnfs/DAaaS/microservices/GoApps/src/go-hdfs-writer/pkg/utils/hdfsUtils.go
@@ -0,0 +1,33 @@
+package utils
+
+import (
+ "fmt"
+ "github.com/colinmarc/hdfs"
+ //"sync"
+ //"go.uber.org/zap"
+)
+
+
+//var clientOnce sync.Once
+//var hdfsClient *hdfs.Client
+//var slogger *zap.SugaredLogger
+
+
+//GetHdfsClientInstance returns a singleton hdfsClient instance
+// func GetHdfsClientInstance(hdfsURL string) (*hdfs.Client){
+// clientOnce.Do(func(){
+// hdfsClient = createHdfsClient(hdfsURL)
+// })
+// return hdfsClient
+// }
+
+//CreateHdfsClient creates a hdfs client and returns hdfs client
+func CreateHdfsClient(hdfsURL string) (*hdfs.Client){
+ slogger := GetLoggerInstance()
+ hdfsClient, hdfsConnectError := hdfs.New(hdfsURL)
+ if hdfsConnectError !=nil {
+ slogger.Fatalf(":::Error in create hdfsClient::: %v", hdfsConnectError)
+ fmt.Printf("::Unable to initialize hdfsURL, check logs")
+ }
+ return hdfsClient
+} \ No newline at end of file
diff --git a/vnfs/DAaaS/microservices/GoApps/src/go-hdfs-writer/pkg/utils/kafka-config.go b/vnfs/DAaaS/microservices/GoApps/src/go-hdfs-writer/pkg/utils/kafka-config.go
new file mode 100644
index 00000000..080bfd4b
--- /dev/null
+++ b/vnfs/DAaaS/microservices/GoApps/src/go-hdfs-writer/pkg/utils/kafka-config.go
@@ -0,0 +1,55 @@
+package utils
+
+
+import (
+ "os"
+)
+
+// SetKafkaParametersByObjectMap sets the value of the kafka parameters
+// and sets the KafkaConfig object
+func SetKafkaParametersByObjectMap(m map[string]interface{}) KafkaConfig {
+ kc := KafkaConfig{}
+ kc.broker = m["broker"].(string)
+ kc.group = m["group"].(string)
+ kc.topic = m["topic"].(string)
+
+ return kc
+}
+
+// SetKafkaParametersByEnvVariables sets the kafka parameters
+func SetKafkaParametersByEnvVariables() KafkaConfig {
+ slogger := GetLoggerInstance()
+
+ kafkaConfigObject := KafkaConfig{
+ broker: os.Getenv("BROKER"),
+ group: os.Getenv("GROUP"),
+ topic: os.Getenv("TOPIC"),
+ }
+ slogger.Infof("::broker:: %s", kafkaConfigObject.broker)
+ slogger.Infof("::group:: %s", kafkaConfigObject.group)
+ slogger.Infof("::topic:: %s", kafkaConfigObject.topic)
+
+ return kafkaConfigObject
+}
+
+// KafkaConfig contains all the config parameters needed for kafka. This can be extended over time
+type KafkaConfig struct {
+ broker string
+ group string
+ topic string
+}
+
+// GetBroker returns kafka broker configured
+func (k KafkaConfig) GetBroker() string {
+ return k.broker
+}
+
+// GetGroup returns kafka group configured
+func (k KafkaConfig) GetGroup() string {
+ return k.group
+}
+
+// GetTopic returns kafka topic configured
+func (k KafkaConfig) GetTopic() string {
+ return k.topic
+} \ No newline at end of file
diff --git a/vnfs/DAaaS/microservices/GoApps/src/go-hdfs-writer/pkg/utils/logutils.go b/vnfs/DAaaS/microservices/GoApps/src/go-hdfs-writer/pkg/utils/logutils.go
new file mode 100644
index 00000000..0f72e718
--- /dev/null
+++ b/vnfs/DAaaS/microservices/GoApps/src/go-hdfs-writer/pkg/utils/logutils.go
@@ -0,0 +1,32 @@
+package utils
+
+import (
+ "go.uber.org/zap"
+ "fmt"
+ "sync"
+)
+
+
+
+var logOnce sync.Once
+var logger *zap.SugaredLogger
+
+//GetLoggerInstance returns a singleton instance of logger
+func GetLoggerInstance() (*zap.SugaredLogger){
+ logOnce.Do(func(){
+ logger = createLogger()
+ })
+ return logger
+}
+
+
+//createLogger returns a SugaredLogger, sugaredLogger can be directly used to generate logs
+func createLogger() (*zap.SugaredLogger){
+ logger, err := zap.NewDevelopment()
+ if err != nil {
+ fmt.Printf("can't initialize zap logger: %v", err)
+ }
+ defer logger.Sync()
+ slogger := logger.Sugar()
+ return slogger
+} \ No newline at end of file
diff --git a/vnfs/DAaaS/microservices/GoApps/src/go-hdfs-writer/pkg/utils/readJson.go b/vnfs/DAaaS/microservices/GoApps/src/go-hdfs-writer/pkg/utils/readJson.go
new file mode 100644
index 00000000..bfab64e6
--- /dev/null
+++ b/vnfs/DAaaS/microservices/GoApps/src/go-hdfs-writer/pkg/utils/readJson.go
@@ -0,0 +1,28 @@
+package utils
+
+import (
+ "os"
+ "io/ioutil"
+)
+
+
+//ReadJSON reads the content of a give file and returns as a string
+// used for small config files only.
+func ReadJSON(path string) string {
+ slogger := GetLoggerInstance()
+ jsonFile, err := os.Open(path)
+ if err!=nil{
+ //fmt.Print(err)
+ slogger.Errorf("Unable to open file: %s", path)
+ slogger.Errorf("Error::: %s", err)
+
+ }else{
+ slogger.Infof("Successfully opened config.json")
+ }
+
+ defer jsonFile.Close()
+ byteValue, _ := ioutil.ReadAll(jsonFile)
+ s := string(byteValue)
+ return s
+}
+
diff --git a/vnfs/DAaaS/microservices/GoApps/src/go-hdfs-writer/skaffold.yaml b/vnfs/DAaaS/microservices/GoApps/src/go-hdfs-writer/skaffold.yaml
new file mode 100644
index 00000000..c207967f
--- /dev/null
+++ b/vnfs/DAaaS/microservices/GoApps/src/go-hdfs-writer/skaffold.yaml
@@ -0,0 +1,19 @@
+apiVersion: skaffold/v1beta6
+kind: Config
+build:
+ tagPolicy:
+ sha256: {}
+ artifacts:
+ - context: .
+ image: hdfs-writer
+ local:
+ useBuildkit: false
+ useDockerCLI: false
+deploy:
+ kubectl:
+ manifests:
+ - kubernetes-manifests/**
+profiles:
+ - name: cloudbuild
+ build:
+ googleCloudBuild: {}