aboutsummaryrefslogtreecommitdiffstats
path: root/vnfs/DAaaS/microservices/GoApps/src/go-hdfs-writer/pkg/pipeline/pipeline.go
diff options
context:
space:
mode:
Diffstat (limited to 'vnfs/DAaaS/microservices/GoApps/src/go-hdfs-writer/pkg/pipeline/pipeline.go')
-rw-r--r--vnfs/DAaaS/microservices/GoApps/src/go-hdfs-writer/pkg/pipeline/pipeline.go130
1 files changed, 130 insertions, 0 deletions
diff --git a/vnfs/DAaaS/microservices/GoApps/src/go-hdfs-writer/pkg/pipeline/pipeline.go b/vnfs/DAaaS/microservices/GoApps/src/go-hdfs-writer/pkg/pipeline/pipeline.go
new file mode 100644
index 00000000..c5dbd3cd
--- /dev/null
+++ b/vnfs/DAaaS/microservices/GoApps/src/go-hdfs-writer/pkg/pipeline/pipeline.go
@@ -0,0 +1,130 @@
+package pipeline
+
+import (
+ "fmt"
+ "os"
+ "github.com/colinmarc/hdfs"
+ "github.com/confluentinc/confluent-kafka-go/kafka"
+ utils "hdfs-writer/pkg/utils"
+
+)
+
+// BuildWriterPipeline builds a pipeline
+func BuildWriterPipeline(k utils.KafkaConfig, h utils.HdfsConfig, writerName string, sigchan chan struct{}) {
+ slogger := utils.GetLoggerInstance()
+ topics := make([]string, 1)
+ topics[0] = k.GetTopic()
+
+ c,err := kafka.NewConsumer(&kafka.ConfigMap{
+ "bootstrap.servers": k.GetBroker(),
+ "broker.address.family": "v4",
+ "group.id": k.GetGroup(),
+ "session.timeout.ms": 6000,
+ "auto.offset.reset": "earliest"})
+
+ if err != nil {
+ fmt.Fprintf(os.Stderr, "Failed to create consumer: %s\n", err)
+ os.Exit(1)
+ }
+ fmt.Printf("Created Consumer %v\n", c)
+ err = c.SubscribeTopics(topics, nil)
+
+ run := true
+ setUpPipeline := false
+
+ var hdfsFileWriter *hdfs.FileWriter
+ var hdfsFileWriterError error
+ // HDFS CLIENT CREATION
+ //client := utils.GetHdfsClientInstance(h.GetHdfsURL())
+ client := utils.CreateHdfsClient(h.GetHdfsURL())
+
+
+ for run==true {
+ select {
+ case sig := <-sigchan:
+ client.Close()
+ if hdfsFileWriter!=nil{
+ cleanup(hdfsFileWriter)
+ }
+ slogger.Infof("\nCaught signal %v: terminating the go-routine of writer :: %s\n", sig, writerName)
+ run = false
+ default:
+ //slogger.Info("Running default option ....")
+ ev := c.Poll(100)
+ if ev == nil {
+ continue
+ }
+ //:: BEGIN : Switch between different types of messages that come out of kafka
+ switch e := ev.(type){
+ case *kafka.Message:
+ slogger.Infof("::: Message on %s\n%s\n", e.TopicPartition, e.Value)
+ dataStr := string(e.Value)
+ slogger.Infof("byte array ::: %s", []byte(dataStr))
+ fileInfo, fileInfoError := client.Stat("/" + k.GetTopic())
+ // create file if it doesnt exists already
+ if fileInfoError != nil {
+ slogger.Infof("Error::: %s",fileInfoError)
+ slogger.Infof("Creating file::: %s", "/"+k.GetTopic())
+ hdfsFileWriterError = client.CreateEmptyFile("/"+k.GetTopic())
+ if hdfsFileWriterError !=nil {
+ slogger.Infof("Creation of empty file ::: %s failed\n Error:: %s",
+ "/"+k.GetTopic(), hdfsFileWriterError.Error())
+ panic(fmt.Sprintf("Creation of empty file ::: %s failed", k.GetTopic()))
+ }
+ _= client.Chmod("/"+k.GetTopic(), 0777);
+ }
+ newDataStr := dataStr + "\n"
+ // file exists case, so just append
+ hdfsFileWriter, hdfsFileWriterError = client.Append("/"+fileInfo.Name())
+
+ if hdfsFileWriterError != nil || hdfsFileWriter==nil{
+ if(hdfsFileWriter==nil){
+ slogger.Infof("hdfsFileWriter is NULL !!")
+ }
+ slogger.Infof(":::Appending to file : %s failed:::\nError occured:::%s\n",
+ "/"+k.GetTopic(),hdfsFileWriterError)
+ panic(fmt.Sprintf("Appending to file : %s failed", k.GetTopic()))
+ }
+ bytesWritten, error := hdfsFileWriter.Write([]byte(newDataStr))
+ if bytesWritten > 0 && error == nil {
+ slogger.Infof("::: Wrote %s to HDFS:::", newDataStr)
+ slogger.Infof("::: Wrote %d bytes to HDFS:::", bytesWritten)
+
+ if setUpPipeline==false{
+ slogger.Infof(" The pipeline with topic: %s and hdfs url %s is setup,"+
+ "watching for more messages.. ",k.GetTopic(), h.GetHdfsURL())
+ setUpPipeline = true
+ }
+ } else {
+ slogger.Info("::: Unable to write to HDFS\n :::Error:: %s",error)
+ }
+ hdfsFileWriter.Close()
+
+ case kafka.Error:
+ // Errors should generally be considered
+ // informational, the client will try to
+ // automatically recover.
+ // But in this example we choose to terminate
+ // the application if all brokers are down.
+ fmt.Fprintf(os.Stderr, "%% Error: %v: %v\n", e.Code(), e)
+ if e.Code() == kafka.ErrAllBrokersDown {
+ run = false
+ }
+
+ default:
+ fmt.Printf("Ignored %v\n", e)
+ } //:: END : Switch between different types of messages that come out of kafka
+ } // END: select channel
+ } // END : infinite loop
+
+ fmt.Printf("Closing the consumer")
+}
+
+func cleanup(h *hdfs.FileWriter){
+ if h!=nil{
+ err := h.Close()
+ if err!=nil{
+ fmt.Printf(":::Error occured while closing the hdfs writer::: \n%s", err.Error())
+ }
+ }
+} \ No newline at end of file