From 99f7370360201104ddfc99b5e766b4e32e8524cc Mon Sep 17 00:00:00 2001 From: Rajamohan Raj Date: Tue, 15 Oct 2019 00:48:18 +0000 Subject: HDFSWriter microservice working copy Issue-ID: ONAPARC-453 Signed-off-by: Rajamohan Raj Change-Id: I11c91b642e466763c1ca6f5734bf81fb260e2b39 --- .../GoApps/src/go-hdfs-writer/Dockerfile | 38 ++++++ .../src/go-hdfs-writer/cmd/hdfs-writer/main.go | 51 ++++++++ .../microservices/GoApps/src/go-hdfs-writer/go.mod | 17 +++ .../microservices/GoApps/src/go-hdfs-writer/go.sum | 27 +++++ .../kubernetes-manifests/configmap_hdfs.yaml | 10 ++ .../kubernetes-manifests/configmap_kafka.yaml | 11 ++ .../kubernetes-manifests/configmap_writer.yaml | 22 ++++ .../hdfs_writer_deployment.yaml | 63 ++++++++++ .../kubernetes-manifests/hdfs_writer_service.yaml | 14 +++ .../src/go-hdfs-writer/pkg/handler/handler.go | 96 +++++++++++++++ .../src/go-hdfs-writer/pkg/pipeline/pipeline.go | 130 +++++++++++++++++++++ .../pkg/sample-rest-requests/createWriter.json | 11 ++ .../src/go-hdfs-writer/pkg/utils/hdfs-config.go | 37 ++++++ .../src/go-hdfs-writer/pkg/utils/hdfsUtils.go | 33 ++++++ .../src/go-hdfs-writer/pkg/utils/kafka-config.go | 55 +++++++++ .../src/go-hdfs-writer/pkg/utils/logutils.go | 32 +++++ .../src/go-hdfs-writer/pkg/utils/readJson.go | 28 +++++ .../GoApps/src/go-hdfs-writer/skaffold.yaml | 19 +++ 18 files changed, 694 insertions(+) create mode 100644 vnfs/DAaaS/microservices/GoApps/src/go-hdfs-writer/Dockerfile create mode 100644 vnfs/DAaaS/microservices/GoApps/src/go-hdfs-writer/cmd/hdfs-writer/main.go create mode 100644 vnfs/DAaaS/microservices/GoApps/src/go-hdfs-writer/go.mod create mode 100644 vnfs/DAaaS/microservices/GoApps/src/go-hdfs-writer/go.sum create mode 100644 vnfs/DAaaS/microservices/GoApps/src/go-hdfs-writer/kubernetes-manifests/configmap_hdfs.yaml create mode 100644 vnfs/DAaaS/microservices/GoApps/src/go-hdfs-writer/kubernetes-manifests/configmap_kafka.yaml create mode 100644 vnfs/DAaaS/microservices/GoApps/src/go-hdfs-writer/kubernetes-manifests/configmap_writer.yaml create mode 100644 vnfs/DAaaS/microservices/GoApps/src/go-hdfs-writer/kubernetes-manifests/hdfs_writer_deployment.yaml create mode 100644 vnfs/DAaaS/microservices/GoApps/src/go-hdfs-writer/kubernetes-manifests/hdfs_writer_service.yaml create mode 100644 vnfs/DAaaS/microservices/GoApps/src/go-hdfs-writer/pkg/handler/handler.go create mode 100644 vnfs/DAaaS/microservices/GoApps/src/go-hdfs-writer/pkg/pipeline/pipeline.go create mode 100644 vnfs/DAaaS/microservices/GoApps/src/go-hdfs-writer/pkg/sample-rest-requests/createWriter.json create mode 100644 vnfs/DAaaS/microservices/GoApps/src/go-hdfs-writer/pkg/utils/hdfs-config.go create mode 100644 vnfs/DAaaS/microservices/GoApps/src/go-hdfs-writer/pkg/utils/hdfsUtils.go create mode 100644 vnfs/DAaaS/microservices/GoApps/src/go-hdfs-writer/pkg/utils/kafka-config.go create mode 100644 vnfs/DAaaS/microservices/GoApps/src/go-hdfs-writer/pkg/utils/logutils.go create mode 100644 vnfs/DAaaS/microservices/GoApps/src/go-hdfs-writer/pkg/utils/readJson.go create mode 100644 vnfs/DAaaS/microservices/GoApps/src/go-hdfs-writer/skaffold.yaml (limited to 'vnfs/DAaaS/microservices/GoApps/src/go-hdfs-writer') diff --git a/vnfs/DAaaS/microservices/GoApps/src/go-hdfs-writer/Dockerfile b/vnfs/DAaaS/microservices/GoApps/src/go-hdfs-writer/Dockerfile new file mode 100644 index 00000000..ee476717 --- /dev/null +++ b/vnfs/DAaaS/microservices/GoApps/src/go-hdfs-writer/Dockerfile @@ -0,0 +1,38 @@ +# Use base golang image from Docker Hub +FROM golang:1.12.7 + +# Download the dlv (delve) debugger for go (you can comment this out if unused) +RUN go get -u github.com/go-delve/delve/cmd/dlv + +WORKDIR /src/hdfs-writer + +RUN mkdir /librdkafka-dir && cd /librdkafka-dir +RUN git clone https://github.com/edenhill/librdkafka.git && \ +cd librdkafka && \ +./configure --prefix /usr && \ +make && \ +make install + +# Install dependencies in go.mod and go.sum +COPY go.mod go.sum ./ +RUN go mod download + +# Copy rest of the application source code +COPY . ./ + +# Compile the application to /app. +RUN go build -o /hdfs-writer -v ./cmd/hdfs-writer + +# If you want to use the debugger, you need to modify the entrypoint to the +# container and point it to the "dlv debug" command: +# * UNCOMMENT the following ENTRYPOINT statement, +# * COMMENT OUT the last ENTRYPOINT statement +# Start the "dlv debug" server on port 3000 of the container. (Note that the +# application process will NOT start until the debugger is attached.) +#ENTRYPOINT ["dlv", "debug", "./cmd/hdfs-writer", "--api-version=2", "--headless", "--listen=:3001", "--log", "--log-dest=/home.dlv.log"] + +# If you want to run WITHOUT the debugging server: +# * COMMENT OUT the previous ENTRYPOINT statements, +# * UNCOMMENT the following ENTRYPOINT statement. +#ENTRYPOINT ["/bin/sleep", "3600"] +ENTRYPOINT ["/hdfs-writer"] \ No newline at end of file diff --git a/vnfs/DAaaS/microservices/GoApps/src/go-hdfs-writer/cmd/hdfs-writer/main.go b/vnfs/DAaaS/microservices/GoApps/src/go-hdfs-writer/cmd/hdfs-writer/main.go new file mode 100644 index 00000000..a79a3e06 --- /dev/null +++ b/vnfs/DAaaS/microservices/GoApps/src/go-hdfs-writer/cmd/hdfs-writer/main.go @@ -0,0 +1,51 @@ +package main + +import ( + "context" + "fmt" + "net/http" + "os" + "os/signal" + "time" + + handler "hdfs-writer/pkg/handler" + utils "hdfs-writer/pkg/utils" +) + +func main() { + + slogger := utils.GetLoggerInstance() + + // Create the server + httpServer := &http.Server{ + Handler: handler.CreateRouter(), + Addr: ":9393", + } + + connectionsClose := make(chan struct{}) + go func() { + c := make(chan os.Signal, 1) + signal.Notify(c, os.Interrupt) + <-c // function literal waiting to receive Interrupt signal + fmt.Printf(":::Got the kill signal:::") + slogger.Info(":::Got the kill signal:::") + for eachWriter, eachChannel := range handler.ChannelMap { + slogger.Infof("Closing writer goroutine :: %s", eachWriter) + slogger.Infof("eachChannel:: %v", eachChannel) + close(eachChannel) + // This wait time ensures that the each of the channel is killed before + // main routine finishes. + time.Sleep(time.Second * 5) + } + //once all goroutines are signalled, send close to main thread + httpServer.Shutdown(context.Background()) + close(connectionsClose) + }() + + // Sever starts listening + err := httpServer.ListenAndServe() + if err != nil && err != http.ErrServerClosed { + slogger.Fatal(err) + } + <-connectionsClose //main thread waiting to receive close signal +} diff --git a/vnfs/DAaaS/microservices/GoApps/src/go-hdfs-writer/go.mod b/vnfs/DAaaS/microservices/GoApps/src/go-hdfs-writer/go.mod new file mode 100644 index 00000000..75df2038 --- /dev/null +++ b/vnfs/DAaaS/microservices/GoApps/src/go-hdfs-writer/go.mod @@ -0,0 +1,17 @@ +module hdfs-writer + +go 1.12 + +require ( + github.com/colinmarc/hdfs v1.1.3 + github.com/confluentinc/confluent-kafka-go v1.1.0 + github.com/golang/protobuf v1.3.2 // indirect + github.com/google/uuid v1.1.1 + github.com/gorilla/mux v1.7.3 + github.com/pkg/errors v0.8.1 // indirect + github.com/stretchr/testify v1.3.0 // indirect + go.uber.org/atomic v1.4.0 // indirect + go.uber.org/multierr v1.1.0 // indirect + go.uber.org/zap v1.10.0 + gopkg.in/confluentinc/confluent-kafka-go.v1 v1.1.0 +) diff --git a/vnfs/DAaaS/microservices/GoApps/src/go-hdfs-writer/go.sum b/vnfs/DAaaS/microservices/GoApps/src/go-hdfs-writer/go.sum new file mode 100644 index 00000000..53cc1db4 --- /dev/null +++ b/vnfs/DAaaS/microservices/GoApps/src/go-hdfs-writer/go.sum @@ -0,0 +1,27 @@ +github.com/colinmarc/hdfs v1.1.3 h1:662salalXLFmp+ctD+x0aG+xOg62lnVnOJHksXYpFBw= +github.com/colinmarc/hdfs v1.1.3/go.mod h1:0DumPviB681UcSuJErAbDIOx6SIaJWj463TymfZG02I= +github.com/confluentinc/confluent-kafka-go v1.1.0 h1:HIW7Nkm8IeKRotC34mGY06DwQMf9Mp9PZMyqDxid2wI= +github.com/confluentinc/confluent-kafka-go v1.1.0/go.mod h1:u2zNLny2xq+5rWeTQjFHbDzzNuba4P1vo31r9r4uAdg= +github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8= +github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/golang/protobuf v1.3.2 h1:6nsPYzhq5kReh6QImI3k5qWzO4PEbvbIW2cwSfR/6xs= +github.com/golang/protobuf v1.3.2/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= +github.com/google/uuid v1.1.1 h1:Gkbcsh/GbpXz7lPftLA3P6TYMwjCLYm83jiFQZF/3gY= +github.com/google/uuid v1.1.1/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/gorilla/mux v1.7.3 h1:gnP5JzjVOuiZD07fKKToCAOjS0yOpj/qPETTXCCS6hw= +github.com/gorilla/mux v1.7.3/go.mod h1:1lud6UwP+6orDFRuTfBEV8e9/aOM/c4fVVCaMa2zaAs= +github.com/pkg/errors v0.8.1 h1:iURUrRGxPUNPdy5/HRSm+Yj6okJ6UtLINN0Q9M4+h3I= +github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/testify v1.3.0 h1:TivCn/peBQ7UY8ooIcPgZFpTNSz0Q2U6UrFlUfqbe0Q= +github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= +go.uber.org/atomic v1.4.0 h1:cxzIVoETapQEqDhQu3QfnvXAV4AlzcvUCxkVUFw3+EU= +go.uber.org/atomic v1.4.0/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE= +go.uber.org/multierr v1.1.0 h1:HoEmRHQPVSqub6w2z2d2EOVs2fjyFRGyofhKuyDq0QI= +go.uber.org/multierr v1.1.0/go.mod h1:wR5kodmAFQ0UK8QlbwjlSNy0Z68gJhDJUG5sjR94q/0= +go.uber.org/zap v1.10.0 h1:ORx85nbTijNz8ljznvCMR1ZBIPKFn3jQrag10X2AsuM= +go.uber.org/zap v1.10.0/go.mod h1:vwi/ZaCAaUcBkycHslxD9B2zi4UTXhF60s6SWpuDF0Q= +gopkg.in/confluentinc/confluent-kafka-go.v1 v1.1.0 h1:roy97m/3wj9/o8OuU3sZ5wildk30ep38k2x8nhNbKrI= +gopkg.in/confluentinc/confluent-kafka-go.v1 v1.1.0/go.mod h1:ZdI3yfYmdNSLQPNCpO1y00EHyWaHG5EnQEyL/ntAegY= diff --git a/vnfs/DAaaS/microservices/GoApps/src/go-hdfs-writer/kubernetes-manifests/configmap_hdfs.yaml b/vnfs/DAaaS/microservices/GoApps/src/go-hdfs-writer/kubernetes-manifests/configmap_hdfs.yaml new file mode 100644 index 00000000..1fdd7ca2 --- /dev/null +++ b/vnfs/DAaaS/microservices/GoApps/src/go-hdfs-writer/kubernetes-manifests/configmap_hdfs.yaml @@ -0,0 +1,10 @@ +# Preserving a sample config of hdfs +#NOTE : The kafka config shall come through the REST request as part of writer config +apiVersion: v1 +kind: ConfigMap +metadata: + name: configmap-hdfs +data: + #hdfs_url: hdfs1-namenode-1.hdfs1-namenode.hdfs1:8020 + hdfs_url: hdfs1-namenode:8020 + \ No newline at end of file diff --git a/vnfs/DAaaS/microservices/GoApps/src/go-hdfs-writer/kubernetes-manifests/configmap_kafka.yaml b/vnfs/DAaaS/microservices/GoApps/src/go-hdfs-writer/kubernetes-manifests/configmap_kafka.yaml new file mode 100644 index 00000000..503896b3 --- /dev/null +++ b/vnfs/DAaaS/microservices/GoApps/src/go-hdfs-writer/kubernetes-manifests/configmap_kafka.yaml @@ -0,0 +1,11 @@ +# Preserving a sample config of kafka broker +#NOTE : The kafka config shall come through the REST request as part of writer config +apiVersion: v1 +kind: ConfigMap +metadata: + name: configmap-kafka +data: + #broker: kafka-cluster-kafka-1.kafka-cluster-kafka-brokers.hdfs1.svc.cluster.local:9092 + broker: kafka-cluster-kafka-bootstrap:9092 + group: grp1i + topic: newTopc9 \ No newline at end of file diff --git a/vnfs/DAaaS/microservices/GoApps/src/go-hdfs-writer/kubernetes-manifests/configmap_writer.yaml b/vnfs/DAaaS/microservices/GoApps/src/go-hdfs-writer/kubernetes-manifests/configmap_writer.yaml new file mode 100644 index 00000000..f5d0833b --- /dev/null +++ b/vnfs/DAaaS/microservices/GoApps/src/go-hdfs-writer/kubernetes-manifests/configmap_writer.yaml @@ -0,0 +1,22 @@ +#Preserving a sample config of writer. +#NOTE : The writer config shall come through the REST request +apiVersion: v1 +kind: ConfigMap +metadata: + name: config-json +data: + config.json: |- + [ + {"writer": { + "kafkaConfig": { + "broker": "kafka-cluster-kafka-bootstrap:9092", + "group": "grp1", + "topic": "newTopic9" + }, + "hdfsConfig": { + "hdfs_url": "hdfs1-namenode:8020" + } + } + } + ] + \ No newline at end of file diff --git a/vnfs/DAaaS/microservices/GoApps/src/go-hdfs-writer/kubernetes-manifests/hdfs_writer_deployment.yaml b/vnfs/DAaaS/microservices/GoApps/src/go-hdfs-writer/kubernetes-manifests/hdfs_writer_deployment.yaml new file mode 100644 index 00000000..393a1d78 --- /dev/null +++ b/vnfs/DAaaS/microservices/GoApps/src/go-hdfs-writer/kubernetes-manifests/hdfs_writer_deployment.yaml @@ -0,0 +1,63 @@ +# This Deployment manifest defines: +# - single-replica deployment of the container image, with label "app: go-hello-world" +# - Pod exposes port 8080 +# - specify PORT environment variable to the container process +# Syntax reference https://kubernetes.io/docs/concepts/configuration/overview/ +apiVersion: apps/v1 +kind: Deployment +metadata: + name: go-hdfs-writer +spec: + replicas: 1 + selector: + matchLabels: + app: hdfs-writer + template: + metadata: + labels: + app: hdfs-writer + spec: + containers: + - name: server + image: hdfs-writer + volumeMounts: + - name: config-volume + mountPath: src/hdfs-writer/cmd/hdfs-writer/config.json + subPath: config.json + ports: + - containerPort: 8080 + env: + - name: PORT + value: "8080" + - name: BROKER + valueFrom: + configMapKeyRef: + name: configmap-kafka + key: broker + - name: GROUP + valueFrom: + configMapKeyRef: + name: configmap-kafka + key: group + - name: TOPIC + valueFrom: + configMapKeyRef: + name: configmap-kafka + key: topic + - name: HDFS_URL + valueFrom: + configMapKeyRef: + name: configmap-hdfs + key: hdfs_url + resources: + requests: + memory: "640Mi" + cpu: "2500m" + limits: + memory: "1280Mi" + cpu: "5000m" + volumes: + - name: config-volume + configMap: + name: config-json + terminationGracePeriodSeconds: 3 diff --git a/vnfs/DAaaS/microservices/GoApps/src/go-hdfs-writer/kubernetes-manifests/hdfs_writer_service.yaml b/vnfs/DAaaS/microservices/GoApps/src/go-hdfs-writer/kubernetes-manifests/hdfs_writer_service.yaml new file mode 100644 index 00000000..596ad5fd --- /dev/null +++ b/vnfs/DAaaS/microservices/GoApps/src/go-hdfs-writer/kubernetes-manifests/hdfs_writer_service.yaml @@ -0,0 +1,14 @@ +# This is required for testing using the POSTMAN +kind: Service +apiVersion: v1 +metadata: + name: hdfs-writer-svc +spec: + type: NodePort + selector: + app: hdfs-writer + ports: + - nodePort: 30303 + port: 9393 + targetPort: 9393 + \ No newline at end of file diff --git a/vnfs/DAaaS/microservices/GoApps/src/go-hdfs-writer/pkg/handler/handler.go b/vnfs/DAaaS/microservices/GoApps/src/go-hdfs-writer/pkg/handler/handler.go new file mode 100644 index 00000000..65021b4a --- /dev/null +++ b/vnfs/DAaaS/microservices/GoApps/src/go-hdfs-writer/pkg/handler/handler.go @@ -0,0 +1,96 @@ +package handler + + +import ( + "fmt" + "net/http" + "io/ioutil" + "encoding/json" + "github.com/gorilla/mux" + + guuid "github.com/google/uuid" + pipeline "hdfs-writer/pkg/pipeline" + utils "hdfs-writer/pkg/utils" +) + + +var slogger = utils.GetLoggerInstance() +// ChannelMap is the global map to store writerNames as key and channels as values. +var ChannelMap =make(map[string]chan struct{}) + + +// This is a sample test request handler +func testFunc(w http.ResponseWriter, r *http.Request){ + slogger.Info("Invoking testFunc ...") + w.WriteHeader(http.StatusOK) + fmt.Fprintln(w,"HTTP Test successful ") +} + +// CreateRouter returns a http handler for the registered URLs +func CreateRouter() http.Handler{ + router := mux.NewRouter().StrictSlash(true) + slogger.Info("Created router ...") + router.HandleFunc("/test", testFunc).Methods("GET") + router.HandleFunc("/createWriter", createWriter).Methods("POST") + router.HandleFunc("/deleteWriter/{writerName}", deleteWriter).Methods("DELETE") + return router +} + + +// CreateWriter creates a pipeline +func createWriter(w http.ResponseWriter, r *http.Request){ + reqBody, _ := ioutil.ReadAll(r.Body) + slogger.Info(string(reqBody)) + var results map[string]interface{} + json.Unmarshal(reqBody, &results) + if len(results)==0{ + slogger.Fatalf("Unable to read from the config json file, unable to create configObject map") + } + writerStr := "writer" + writer := results[writerStr].(map[string]interface{}) + kafkaConfigMapObj := writer["kafkaConfig"].(map[string]interface{}) + hdfsConfigObj := writer["hdfsConfig"].(map[string]interface{}) + + kc := utils.SetKafkaParametersByObjectMap(kafkaConfigMapObj) + hc := utils.SetHdfsParametersByObjectMap(hdfsConfigObj) + + //populate the channelMap + pipelineChan := make(chan struct{}) + slogger.Infof("Channel created by post :: %v", pipelineChan) + uuid := guuid.New().String() + //slogger.Infof("guuid :: %s",uuid) + slogger.Infof(":: Storing writerName and channel in ChannelMap :: ") + writerName := writerStr+"-"+uuid[len(uuid)-4:] + slogger.Infof("::writerName:: %s ",writerName) + ChannelMap[writerName] = pipelineChan + + // envoke the go routine to build pipeline + go pipeline.BuildWriterPipeline(kc,hc, writerName, ChannelMap[writerName]) + successMessage := fmt.Sprintf("Created the writer ::%s", writerName) + w.WriteHeader(http.StatusOK) + fmt.Fprintln(w,successMessage) +} + + +// deleteWriter deletes a given writer pipeline +func deleteWriter(w http.ResponseWriter, r *http.Request){ + vars := mux.Vars(r) + writerName := vars["writerName"] + if _, keyExists := ChannelMap[writerName]; keyExists{ + slogger.Infof("::Writer to be closed:: %s",writerName) + toBeClosedChannel := ChannelMap[writerName] + close(toBeClosedChannel) + // deleting the channel from ChannelMap after closure to + // avoid closing the closed channel + delete(ChannelMap, writerName) + + w.WriteHeader(http.StatusOK) + deleteMessage := fmt.Sprintf("Deleted writer :: %s",writerName) + fmt.Fprintln(w,deleteMessage) + + }else{ + notFoundMessage := fmt.Sprintf("Could not find writer :: %s",writerName) + fmt.Fprintln(w,notFoundMessage) + } + +} \ No newline at end of file diff --git a/vnfs/DAaaS/microservices/GoApps/src/go-hdfs-writer/pkg/pipeline/pipeline.go b/vnfs/DAaaS/microservices/GoApps/src/go-hdfs-writer/pkg/pipeline/pipeline.go new file mode 100644 index 00000000..c5dbd3cd --- /dev/null +++ b/vnfs/DAaaS/microservices/GoApps/src/go-hdfs-writer/pkg/pipeline/pipeline.go @@ -0,0 +1,130 @@ +package pipeline + +import ( + "fmt" + "os" + "github.com/colinmarc/hdfs" + "github.com/confluentinc/confluent-kafka-go/kafka" + utils "hdfs-writer/pkg/utils" + +) + +// BuildWriterPipeline builds a pipeline +func BuildWriterPipeline(k utils.KafkaConfig, h utils.HdfsConfig, writerName string, sigchan chan struct{}) { + slogger := utils.GetLoggerInstance() + topics := make([]string, 1) + topics[0] = k.GetTopic() + + c,err := kafka.NewConsumer(&kafka.ConfigMap{ + "bootstrap.servers": k.GetBroker(), + "broker.address.family": "v4", + "group.id": k.GetGroup(), + "session.timeout.ms": 6000, + "auto.offset.reset": "earliest"}) + + if err != nil { + fmt.Fprintf(os.Stderr, "Failed to create consumer: %s\n", err) + os.Exit(1) + } + fmt.Printf("Created Consumer %v\n", c) + err = c.SubscribeTopics(topics, nil) + + run := true + setUpPipeline := false + + var hdfsFileWriter *hdfs.FileWriter + var hdfsFileWriterError error + // HDFS CLIENT CREATION + //client := utils.GetHdfsClientInstance(h.GetHdfsURL()) + client := utils.CreateHdfsClient(h.GetHdfsURL()) + + + for run==true { + select { + case sig := <-sigchan: + client.Close() + if hdfsFileWriter!=nil{ + cleanup(hdfsFileWriter) + } + slogger.Infof("\nCaught signal %v: terminating the go-routine of writer :: %s\n", sig, writerName) + run = false + default: + //slogger.Info("Running default option ....") + ev := c.Poll(100) + if ev == nil { + continue + } + //:: BEGIN : Switch between different types of messages that come out of kafka + switch e := ev.(type){ + case *kafka.Message: + slogger.Infof("::: Message on %s\n%s\n", e.TopicPartition, e.Value) + dataStr := string(e.Value) + slogger.Infof("byte array ::: %s", []byte(dataStr)) + fileInfo, fileInfoError := client.Stat("/" + k.GetTopic()) + // create file if it doesnt exists already + if fileInfoError != nil { + slogger.Infof("Error::: %s",fileInfoError) + slogger.Infof("Creating file::: %s", "/"+k.GetTopic()) + hdfsFileWriterError = client.CreateEmptyFile("/"+k.GetTopic()) + if hdfsFileWriterError !=nil { + slogger.Infof("Creation of empty file ::: %s failed\n Error:: %s", + "/"+k.GetTopic(), hdfsFileWriterError.Error()) + panic(fmt.Sprintf("Creation of empty file ::: %s failed", k.GetTopic())) + } + _= client.Chmod("/"+k.GetTopic(), 0777); + } + newDataStr := dataStr + "\n" + // file exists case, so just append + hdfsFileWriter, hdfsFileWriterError = client.Append("/"+fileInfo.Name()) + + if hdfsFileWriterError != nil || hdfsFileWriter==nil{ + if(hdfsFileWriter==nil){ + slogger.Infof("hdfsFileWriter is NULL !!") + } + slogger.Infof(":::Appending to file : %s failed:::\nError occured:::%s\n", + "/"+k.GetTopic(),hdfsFileWriterError) + panic(fmt.Sprintf("Appending to file : %s failed", k.GetTopic())) + } + bytesWritten, error := hdfsFileWriter.Write([]byte(newDataStr)) + if bytesWritten > 0 && error == nil { + slogger.Infof("::: Wrote %s to HDFS:::", newDataStr) + slogger.Infof("::: Wrote %d bytes to HDFS:::", bytesWritten) + + if setUpPipeline==false{ + slogger.Infof(" The pipeline with topic: %s and hdfs url %s is setup,"+ + "watching for more messages.. ",k.GetTopic(), h.GetHdfsURL()) + setUpPipeline = true + } + } else { + slogger.Info("::: Unable to write to HDFS\n :::Error:: %s",error) + } + hdfsFileWriter.Close() + + case kafka.Error: + // Errors should generally be considered + // informational, the client will try to + // automatically recover. + // But in this example we choose to terminate + // the application if all brokers are down. + fmt.Fprintf(os.Stderr, "%% Error: %v: %v\n", e.Code(), e) + if e.Code() == kafka.ErrAllBrokersDown { + run = false + } + + default: + fmt.Printf("Ignored %v\n", e) + } //:: END : Switch between different types of messages that come out of kafka + } // END: select channel + } // END : infinite loop + + fmt.Printf("Closing the consumer") +} + +func cleanup(h *hdfs.FileWriter){ + if h!=nil{ + err := h.Close() + if err!=nil{ + fmt.Printf(":::Error occured while closing the hdfs writer::: \n%s", err.Error()) + } + } +} \ No newline at end of file diff --git a/vnfs/DAaaS/microservices/GoApps/src/go-hdfs-writer/pkg/sample-rest-requests/createWriter.json b/vnfs/DAaaS/microservices/GoApps/src/go-hdfs-writer/pkg/sample-rest-requests/createWriter.json new file mode 100644 index 00000000..9a41d91b --- /dev/null +++ b/vnfs/DAaaS/microservices/GoApps/src/go-hdfs-writer/pkg/sample-rest-requests/createWriter.json @@ -0,0 +1,11 @@ +{"writer": { + "kafkaConfig": { + "broker": "kafka-cluster-kafka-bootstrap:9092", + "group": "grp1", + "topic": "newTopic9" + }, + "hdfsConfig": { + "hdfs_url": "hdfs1-namenode:8020" + } +} +} \ No newline at end of file diff --git a/vnfs/DAaaS/microservices/GoApps/src/go-hdfs-writer/pkg/utils/hdfs-config.go b/vnfs/DAaaS/microservices/GoApps/src/go-hdfs-writer/pkg/utils/hdfs-config.go new file mode 100644 index 00000000..ac33bc6a --- /dev/null +++ b/vnfs/DAaaS/microservices/GoApps/src/go-hdfs-writer/pkg/utils/hdfs-config.go @@ -0,0 +1,37 @@ +package utils + +import ( + "os" +) + +// SetHdfsParametersByObjectMap set the value of the hdfs config parameters +// and return HdfsConfig object +func SetHdfsParametersByObjectMap(m map[string]interface{}) HdfsConfig{ + + hc := HdfsConfig{} + hc.hdfsURL = m["hdfs_url"].(string) + return hc + +} + +// SetHdfsParametersByEnvVariables sets the hdfs parameters +func SetHdfsParametersByEnvVariables() HdfsConfig { + + slogger := GetLoggerInstance() + hdfsConfigObject := HdfsConfig{ + hdfsURL: os.Getenv("HDFS_URL"), + } + slogger.Infof("::hdfsURL:: %s", hdfsConfigObject.hdfsURL) + return hdfsConfigObject + +} + +// HdfsConfig contains hdfs related config items +type HdfsConfig struct { + hdfsURL string +} + +// GetHdfsURL returns HdfsURL +func (h HdfsConfig) GetHdfsURL() string { + return h.hdfsURL +} \ No newline at end of file diff --git a/vnfs/DAaaS/microservices/GoApps/src/go-hdfs-writer/pkg/utils/hdfsUtils.go b/vnfs/DAaaS/microservices/GoApps/src/go-hdfs-writer/pkg/utils/hdfsUtils.go new file mode 100644 index 00000000..1a93a5ad --- /dev/null +++ b/vnfs/DAaaS/microservices/GoApps/src/go-hdfs-writer/pkg/utils/hdfsUtils.go @@ -0,0 +1,33 @@ +package utils + +import ( + "fmt" + "github.com/colinmarc/hdfs" + //"sync" + //"go.uber.org/zap" +) + + +//var clientOnce sync.Once +//var hdfsClient *hdfs.Client +//var slogger *zap.SugaredLogger + + +//GetHdfsClientInstance returns a singleton hdfsClient instance +// func GetHdfsClientInstance(hdfsURL string) (*hdfs.Client){ +// clientOnce.Do(func(){ +// hdfsClient = createHdfsClient(hdfsURL) +// }) +// return hdfsClient +// } + +//CreateHdfsClient creates a hdfs client and returns hdfs client +func CreateHdfsClient(hdfsURL string) (*hdfs.Client){ + slogger := GetLoggerInstance() + hdfsClient, hdfsConnectError := hdfs.New(hdfsURL) + if hdfsConnectError !=nil { + slogger.Fatalf(":::Error in create hdfsClient::: %v", hdfsConnectError) + fmt.Printf("::Unable to initialize hdfsURL, check logs") + } + return hdfsClient +} \ No newline at end of file diff --git a/vnfs/DAaaS/microservices/GoApps/src/go-hdfs-writer/pkg/utils/kafka-config.go b/vnfs/DAaaS/microservices/GoApps/src/go-hdfs-writer/pkg/utils/kafka-config.go new file mode 100644 index 00000000..080bfd4b --- /dev/null +++ b/vnfs/DAaaS/microservices/GoApps/src/go-hdfs-writer/pkg/utils/kafka-config.go @@ -0,0 +1,55 @@ +package utils + + +import ( + "os" +) + +// SetKafkaParametersByObjectMap sets the value of the kafka parameters +// and sets the KafkaConfig object +func SetKafkaParametersByObjectMap(m map[string]interface{}) KafkaConfig { + kc := KafkaConfig{} + kc.broker = m["broker"].(string) + kc.group = m["group"].(string) + kc.topic = m["topic"].(string) + + return kc +} + +// SetKafkaParametersByEnvVariables sets the kafka parameters +func SetKafkaParametersByEnvVariables() KafkaConfig { + slogger := GetLoggerInstance() + + kafkaConfigObject := KafkaConfig{ + broker: os.Getenv("BROKER"), + group: os.Getenv("GROUP"), + topic: os.Getenv("TOPIC"), + } + slogger.Infof("::broker:: %s", kafkaConfigObject.broker) + slogger.Infof("::group:: %s", kafkaConfigObject.group) + slogger.Infof("::topic:: %s", kafkaConfigObject.topic) + + return kafkaConfigObject +} + +// KafkaConfig contains all the config parameters needed for kafka. This can be extended over time +type KafkaConfig struct { + broker string + group string + topic string +} + +// GetBroker returns kafka broker configured +func (k KafkaConfig) GetBroker() string { + return k.broker +} + +// GetGroup returns kafka group configured +func (k KafkaConfig) GetGroup() string { + return k.group +} + +// GetTopic returns kafka topic configured +func (k KafkaConfig) GetTopic() string { + return k.topic +} \ No newline at end of file diff --git a/vnfs/DAaaS/microservices/GoApps/src/go-hdfs-writer/pkg/utils/logutils.go b/vnfs/DAaaS/microservices/GoApps/src/go-hdfs-writer/pkg/utils/logutils.go new file mode 100644 index 00000000..0f72e718 --- /dev/null +++ b/vnfs/DAaaS/microservices/GoApps/src/go-hdfs-writer/pkg/utils/logutils.go @@ -0,0 +1,32 @@ +package utils + +import ( + "go.uber.org/zap" + "fmt" + "sync" +) + + + +var logOnce sync.Once +var logger *zap.SugaredLogger + +//GetLoggerInstance returns a singleton instance of logger +func GetLoggerInstance() (*zap.SugaredLogger){ + logOnce.Do(func(){ + logger = createLogger() + }) + return logger +} + + +//createLogger returns a SugaredLogger, sugaredLogger can be directly used to generate logs +func createLogger() (*zap.SugaredLogger){ + logger, err := zap.NewDevelopment() + if err != nil { + fmt.Printf("can't initialize zap logger: %v", err) + } + defer logger.Sync() + slogger := logger.Sugar() + return slogger +} \ No newline at end of file diff --git a/vnfs/DAaaS/microservices/GoApps/src/go-hdfs-writer/pkg/utils/readJson.go b/vnfs/DAaaS/microservices/GoApps/src/go-hdfs-writer/pkg/utils/readJson.go new file mode 100644 index 00000000..bfab64e6 --- /dev/null +++ b/vnfs/DAaaS/microservices/GoApps/src/go-hdfs-writer/pkg/utils/readJson.go @@ -0,0 +1,28 @@ +package utils + +import ( + "os" + "io/ioutil" +) + + +//ReadJSON reads the content of a give file and returns as a string +// used for small config files only. +func ReadJSON(path string) string { + slogger := GetLoggerInstance() + jsonFile, err := os.Open(path) + if err!=nil{ + //fmt.Print(err) + slogger.Errorf("Unable to open file: %s", path) + slogger.Errorf("Error::: %s", err) + + }else{ + slogger.Infof("Successfully opened config.json") + } + + defer jsonFile.Close() + byteValue, _ := ioutil.ReadAll(jsonFile) + s := string(byteValue) + return s +} + diff --git a/vnfs/DAaaS/microservices/GoApps/src/go-hdfs-writer/skaffold.yaml b/vnfs/DAaaS/microservices/GoApps/src/go-hdfs-writer/skaffold.yaml new file mode 100644 index 00000000..c207967f --- /dev/null +++ b/vnfs/DAaaS/microservices/GoApps/src/go-hdfs-writer/skaffold.yaml @@ -0,0 +1,19 @@ +apiVersion: skaffold/v1beta6 +kind: Config +build: + tagPolicy: + sha256: {} + artifacts: + - context: . + image: hdfs-writer + local: + useBuildkit: false + useDockerCLI: false +deploy: + kubectl: + manifests: + - kubernetes-manifests/** +profiles: + - name: cloudbuild + build: + googleCloudBuild: {} -- cgit 1.2.3-korg