diff options
Diffstat (limited to 'vnfs/DAaaS/microservices/GoApps/src/go-hdfs-writer/pkg/pipeline/pipeline.go')
-rw-r--r-- | vnfs/DAaaS/microservices/GoApps/src/go-hdfs-writer/pkg/pipeline/pipeline.go | 153 |
1 files changed, 98 insertions, 55 deletions
diff --git a/vnfs/DAaaS/microservices/GoApps/src/go-hdfs-writer/pkg/pipeline/pipeline.go b/vnfs/DAaaS/microservices/GoApps/src/go-hdfs-writer/pkg/pipeline/pipeline.go index c5dbd3cd..2e192e99 100644 --- a/vnfs/DAaaS/microservices/GoApps/src/go-hdfs-writer/pkg/pipeline/pipeline.go +++ b/vnfs/DAaaS/microservices/GoApps/src/go-hdfs-writer/pkg/pipeline/pipeline.go @@ -2,52 +2,84 @@ package pipeline import ( "fmt" - "os" "github.com/colinmarc/hdfs" "github.com/confluentinc/confluent-kafka-go/kafka" + guuid "github.com/google/uuid" + "os" + "sync" + utils "hdfs-writer/pkg/utils" - ) -// BuildWriterPipeline builds a pipeline -func BuildWriterPipeline(k utils.KafkaConfig, h utils.HdfsConfig, writerName string, sigchan chan struct{}) { - slogger := utils.GetLoggerInstance() +var slogger = utils.GetLoggerInstance() + +// ChannelMap is the global map to store writerNames as key and channels as values. +//var ChannelMap =make(map[string]chan struct{}) +var ChannelMap = make(map[string]chan bool) + +// Wg is of type WaitGroup ensures all the writers have enough time to cleanup a +var Wg sync.WaitGroup +var writerStr = "writer" + +// CreatePipeline initiates the building of a pipeline +func CreatePipeline(kc utils.KafkaConfig, hc utils.HdfsConfig) string { + //pipelineChan := make(chan struct{}) + pipelineChan := make(chan bool) + uuid := guuid.New().String() + slogger.Infof(":: Storing writerName and channel in ChannelMap :: ") + writerName := writerStr + "-" + uuid[len(uuid)-4:] + slogger.Infof("::writerName:: %s ", writerName) + ChannelMap[writerName] = pipelineChan + + //Every create request shall add 1 to the WaitGroup + Wg.Add(1) + // envoke the go routine to build pipeline + go buildWriterPipeline(kc, hc, writerName, ChannelMap[writerName]) + return writerName +} + +// buildWriterPipeline builds a pipeline +func buildWriterPipeline(k utils.KafkaConfig, h utils.HdfsConfig, writerName string, sigchan chan bool) { + topics := make([]string, 1) - topics[0] = k.GetTopic() - - c,err := kafka.NewConsumer(&kafka.ConfigMap{ - "bootstrap.servers": k.GetBroker(), + topics[0] = k.Topic + + c, err := kafka.NewConsumer(&kafka.ConfigMap{ + "bootstrap.servers": k.Broker, "broker.address.family": "v4", - "group.id": k.GetGroup(), + "group.id": k.Group, "session.timeout.ms": 6000, "auto.offset.reset": "earliest"}) if err != nil { fmt.Fprintf(os.Stderr, "Failed to create consumer: %s\n", err) - os.Exit(1) + slogger.Info("Failed to create consumer: %s", err.Error()) + delete(ChannelMap, writerName) + Wg.Done() + return } fmt.Printf("Created Consumer %v\n", c) err = c.SubscribeTopics(topics, nil) - run := true setUpPipeline := false var hdfsFileWriter *hdfs.FileWriter var hdfsFileWriterError error // HDFS CLIENT CREATION //client := utils.GetHdfsClientInstance(h.GetHdfsURL()) - client := utils.CreateHdfsClient(h.GetHdfsURL()) - + client := utils.CreateHdfsClient(h.HdfsURL) - for run==true { + for { select { case sig := <-sigchan: + defer Wg.Done() client.Close() - if hdfsFileWriter!=nil{ + if hdfsFileWriter != nil { cleanup(hdfsFileWriter) } slogger.Infof("\nCaught signal %v: terminating the go-routine of writer :: %s\n", sig, writerName) - run = false + close(sigchan) + return default: //slogger.Info("Running default option ....") ev := c.Poll(100) @@ -55,51 +87,52 @@ func BuildWriterPipeline(k utils.KafkaConfig, h utils.HdfsConfig, writerName str continue } //:: BEGIN : Switch between different types of messages that come out of kafka - switch e := ev.(type){ + switch e := ev.(type) { case *kafka.Message: slogger.Infof("::: Message on %s\n%s\n", e.TopicPartition, e.Value) dataStr := string(e.Value) slogger.Infof("byte array ::: %s", []byte(dataStr)) - fileInfo, fileInfoError := client.Stat("/" + k.GetTopic()) + fileInfo, fileInfoError := client.Stat("/" + k.Topic) // create file if it doesnt exists already if fileInfoError != nil { - slogger.Infof("Error::: %s",fileInfoError) - slogger.Infof("Creating file::: %s", "/"+k.GetTopic()) - hdfsFileWriterError = client.CreateEmptyFile("/"+k.GetTopic()) - if hdfsFileWriterError !=nil { + slogger.Infof("Error::: %s", fileInfoError) + slogger.Infof("Creating file::: %s", "/"+k.Topic) + hdfsFileWriterError = client.CreateEmptyFile("/" + k.Topic) + if hdfsFileWriterError != nil { slogger.Infof("Creation of empty file ::: %s failed\n Error:: %s", - "/"+k.GetTopic(), hdfsFileWriterError.Error()) - panic(fmt.Sprintf("Creation of empty file ::: %s failed", k.GetTopic())) + "/"+k.Topic, hdfsFileWriterError.Error()) + continue } - _= client.Chmod("/"+k.GetTopic(), 0777); + _ = client.Chmod("/"+k.Topic, 0777) } newDataStr := dataStr + "\n" // file exists case, so just append - hdfsFileWriter, hdfsFileWriterError = client.Append("/"+fileInfo.Name()) - - if hdfsFileWriterError != nil || hdfsFileWriter==nil{ - if(hdfsFileWriter==nil){ + hdfsFileWriter, hdfsFileWriterError = client.Append("/" + fileInfo.Name()) + + if hdfsFileWriterError != nil { + if hdfsFileWriter == nil { slogger.Infof("hdfsFileWriter is NULL !!") } slogger.Infof(":::Appending to file : %s failed:::\nError occured:::%s\n", - "/"+k.GetTopic(),hdfsFileWriterError) - panic(fmt.Sprintf("Appending to file : %s failed", k.GetTopic())) - } - bytesWritten, error := hdfsFileWriter.Write([]byte(newDataStr)) - if bytesWritten > 0 && error == nil { - slogger.Infof("::: Wrote %s to HDFS:::", newDataStr) - slogger.Infof("::: Wrote %d bytes to HDFS:::", bytesWritten) - - if setUpPipeline==false{ - slogger.Infof(" The pipeline with topic: %s and hdfs url %s is setup,"+ - "watching for more messages.. ",k.GetTopic(), h.GetHdfsURL()) - setUpPipeline = true - } + "/"+k.Topic, hdfsFileWriterError) + continue } else { - slogger.Info("::: Unable to write to HDFS\n :::Error:: %s",error) + bytesWritten, error := hdfsFileWriter.Write([]byte(newDataStr)) + if bytesWritten > 0 && error == nil { + slogger.Infof("::: Wrote %s to HDFS:::", newDataStr) + slogger.Infof("::: Wrote %d bytes to HDFS:::", bytesWritten) + + if setUpPipeline == false { + slogger.Infof(" The pipeline with topic: %s and hdfs url %s is setup,"+ + "watching for more messages.. ", k.Topic, h.HdfsURL) + setUpPipeline = true + } + hdfsFileWriter.Close() + } else { + slogger.Info("::: Unable to write to HDFS\n :::Error:: %s", error) + } } - hdfsFileWriter.Close() - + case kafka.Error: // Errors should generally be considered // informational, the client will try to @@ -108,23 +141,33 @@ func BuildWriterPipeline(k utils.KafkaConfig, h utils.HdfsConfig, writerName str // the application if all brokers are down. fmt.Fprintf(os.Stderr, "%% Error: %v: %v\n", e.Code(), e) if e.Code() == kafka.ErrAllBrokersDown { - run = false + return } - + default: fmt.Printf("Ignored %v\n", e) } //:: END : Switch between different types of messages that come out of kafka } // END: select channel } // END : infinite loop - - fmt.Printf("Closing the consumer") } -func cleanup(h *hdfs.FileWriter){ - if h!=nil{ +func cleanup(h *hdfs.FileWriter) { + if h != nil { err := h.Close() - if err!=nil{ - fmt.Printf(":::Error occured while closing the hdfs writer::: \n%s", err.Error()) + if err != nil { + fmt.Printf(":::Error occured while closing hdfs writer::: \n%s", err.Error()) } + } -}
\ No newline at end of file + fmt.Printf("\n:::Clean up executed ::: \n") +} + +// DeletePipeline deletes a writer pipeline +func DeletePipeline(writerName string) { + slogger.Infof("::Writer to be closed:: %s", writerName) + toBeClosedChannel := ChannelMap[writerName] + toBeClosedChannel <- true + // deleting the channel from ChannelMap after closure to + // avoid closing the closed channel + delete(ChannelMap, writerName) +} |