From 99f7370360201104ddfc99b5e766b4e32e8524cc Mon Sep 17 00:00:00 2001 From: Rajamohan Raj Date: Tue, 15 Oct 2019 00:48:18 +0000 Subject: HDFSWriter microservice working copy Issue-ID: ONAPARC-453 Signed-off-by: Rajamohan Raj Change-Id: I11c91b642e466763c1ca6f5734bf81fb260e2b39 --- .../src/go-hdfs-writer/pkg/pipeline/pipeline.go | 130 +++++++++++++++++++++ 1 file changed, 130 insertions(+) create mode 100644 vnfs/DAaaS/microservices/GoApps/src/go-hdfs-writer/pkg/pipeline/pipeline.go (limited to 'vnfs/DAaaS/microservices/GoApps/src/go-hdfs-writer/pkg/pipeline') diff --git a/vnfs/DAaaS/microservices/GoApps/src/go-hdfs-writer/pkg/pipeline/pipeline.go b/vnfs/DAaaS/microservices/GoApps/src/go-hdfs-writer/pkg/pipeline/pipeline.go new file mode 100644 index 00000000..c5dbd3cd --- /dev/null +++ b/vnfs/DAaaS/microservices/GoApps/src/go-hdfs-writer/pkg/pipeline/pipeline.go @@ -0,0 +1,130 @@ +package pipeline + +import ( + "fmt" + "os" + "github.com/colinmarc/hdfs" + "github.com/confluentinc/confluent-kafka-go/kafka" + utils "hdfs-writer/pkg/utils" + +) + +// BuildWriterPipeline builds a pipeline +func BuildWriterPipeline(k utils.KafkaConfig, h utils.HdfsConfig, writerName string, sigchan chan struct{}) { + slogger := utils.GetLoggerInstance() + topics := make([]string, 1) + topics[0] = k.GetTopic() + + c,err := kafka.NewConsumer(&kafka.ConfigMap{ + "bootstrap.servers": k.GetBroker(), + "broker.address.family": "v4", + "group.id": k.GetGroup(), + "session.timeout.ms": 6000, + "auto.offset.reset": "earliest"}) + + if err != nil { + fmt.Fprintf(os.Stderr, "Failed to create consumer: %s\n", err) + os.Exit(1) + } + fmt.Printf("Created Consumer %v\n", c) + err = c.SubscribeTopics(topics, nil) + + run := true + setUpPipeline := false + + var hdfsFileWriter *hdfs.FileWriter + var hdfsFileWriterError error + // HDFS CLIENT CREATION + //client := utils.GetHdfsClientInstance(h.GetHdfsURL()) + client := utils.CreateHdfsClient(h.GetHdfsURL()) + + + for run==true { + select { + case sig := <-sigchan: + client.Close() + if hdfsFileWriter!=nil{ + cleanup(hdfsFileWriter) + } + slogger.Infof("\nCaught signal %v: terminating the go-routine of writer :: %s\n", sig, writerName) + run = false + default: + //slogger.Info("Running default option ....") + ev := c.Poll(100) + if ev == nil { + continue + } + //:: BEGIN : Switch between different types of messages that come out of kafka + switch e := ev.(type){ + case *kafka.Message: + slogger.Infof("::: Message on %s\n%s\n", e.TopicPartition, e.Value) + dataStr := string(e.Value) + slogger.Infof("byte array ::: %s", []byte(dataStr)) + fileInfo, fileInfoError := client.Stat("/" + k.GetTopic()) + // create file if it doesnt exists already + if fileInfoError != nil { + slogger.Infof("Error::: %s",fileInfoError) + slogger.Infof("Creating file::: %s", "/"+k.GetTopic()) + hdfsFileWriterError = client.CreateEmptyFile("/"+k.GetTopic()) + if hdfsFileWriterError !=nil { + slogger.Infof("Creation of empty file ::: %s failed\n Error:: %s", + "/"+k.GetTopic(), hdfsFileWriterError.Error()) + panic(fmt.Sprintf("Creation of empty file ::: %s failed", k.GetTopic())) + } + _= client.Chmod("/"+k.GetTopic(), 0777); + } + newDataStr := dataStr + "\n" + // file exists case, so just append + hdfsFileWriter, hdfsFileWriterError = client.Append("/"+fileInfo.Name()) + + if hdfsFileWriterError != nil || hdfsFileWriter==nil{ + if(hdfsFileWriter==nil){ + slogger.Infof("hdfsFileWriter is NULL !!") + } + slogger.Infof(":::Appending to file : %s failed:::\nError occured:::%s\n", + "/"+k.GetTopic(),hdfsFileWriterError) + panic(fmt.Sprintf("Appending to file : %s failed", k.GetTopic())) + } + bytesWritten, error := hdfsFileWriter.Write([]byte(newDataStr)) + if bytesWritten > 0 && error == nil { + slogger.Infof("::: Wrote %s to HDFS:::", newDataStr) + slogger.Infof("::: Wrote %d bytes to HDFS:::", bytesWritten) + + if setUpPipeline==false{ + slogger.Infof(" The pipeline with topic: %s and hdfs url %s is setup,"+ + "watching for more messages.. ",k.GetTopic(), h.GetHdfsURL()) + setUpPipeline = true + } + } else { + slogger.Info("::: Unable to write to HDFS\n :::Error:: %s",error) + } + hdfsFileWriter.Close() + + case kafka.Error: + // Errors should generally be considered + // informational, the client will try to + // automatically recover. + // But in this example we choose to terminate + // the application if all brokers are down. + fmt.Fprintf(os.Stderr, "%% Error: %v: %v\n", e.Code(), e) + if e.Code() == kafka.ErrAllBrokersDown { + run = false + } + + default: + fmt.Printf("Ignored %v\n", e) + } //:: END : Switch between different types of messages that come out of kafka + } // END: select channel + } // END : infinite loop + + fmt.Printf("Closing the consumer") +} + +func cleanup(h *hdfs.FileWriter){ + if h!=nil{ + err := h.Close() + if err!=nil{ + fmt.Printf(":::Error occured while closing the hdfs writer::: \n%s", err.Error()) + } + } +} \ No newline at end of file -- cgit