From 99f7370360201104ddfc99b5e766b4e32e8524cc Mon Sep 17 00:00:00 2001 From: Rajamohan Raj Date: Tue, 15 Oct 2019 00:48:18 +0000 Subject: HDFSWriter microservice working copy Issue-ID: ONAPARC-453 Signed-off-by: Rajamohan Raj Change-Id: I11c91b642e466763c1ca6f5734bf81fb260e2b39 --- .../src/go-hdfs-writer/pkg/handler/handler.go | 96 ++++++++++++++++++++++ 1 file changed, 96 insertions(+) create mode 100644 vnfs/DAaaS/microservices/GoApps/src/go-hdfs-writer/pkg/handler/handler.go (limited to 'vnfs/DAaaS/microservices/GoApps/src/go-hdfs-writer/pkg/handler') diff --git a/vnfs/DAaaS/microservices/GoApps/src/go-hdfs-writer/pkg/handler/handler.go b/vnfs/DAaaS/microservices/GoApps/src/go-hdfs-writer/pkg/handler/handler.go new file mode 100644 index 00000000..65021b4a --- /dev/null +++ b/vnfs/DAaaS/microservices/GoApps/src/go-hdfs-writer/pkg/handler/handler.go @@ -0,0 +1,96 @@ +package handler + + +import ( + "fmt" + "net/http" + "io/ioutil" + "encoding/json" + "github.com/gorilla/mux" + + guuid "github.com/google/uuid" + pipeline "hdfs-writer/pkg/pipeline" + utils "hdfs-writer/pkg/utils" +) + + +var slogger = utils.GetLoggerInstance() +// ChannelMap is the global map to store writerNames as key and channels as values. +var ChannelMap =make(map[string]chan struct{}) + + +// This is a sample test request handler +func testFunc(w http.ResponseWriter, r *http.Request){ + slogger.Info("Invoking testFunc ...") + w.WriteHeader(http.StatusOK) + fmt.Fprintln(w,"HTTP Test successful ") +} + +// CreateRouter returns a http handler for the registered URLs +func CreateRouter() http.Handler{ + router := mux.NewRouter().StrictSlash(true) + slogger.Info("Created router ...") + router.HandleFunc("/test", testFunc).Methods("GET") + router.HandleFunc("/createWriter", createWriter).Methods("POST") + router.HandleFunc("/deleteWriter/{writerName}", deleteWriter).Methods("DELETE") + return router +} + + +// CreateWriter creates a pipeline +func createWriter(w http.ResponseWriter, r *http.Request){ + reqBody, _ := ioutil.ReadAll(r.Body) + slogger.Info(string(reqBody)) + var results map[string]interface{} + json.Unmarshal(reqBody, &results) + if len(results)==0{ + slogger.Fatalf("Unable to read from the config json file, unable to create configObject map") + } + writerStr := "writer" + writer := results[writerStr].(map[string]interface{}) + kafkaConfigMapObj := writer["kafkaConfig"].(map[string]interface{}) + hdfsConfigObj := writer["hdfsConfig"].(map[string]interface{}) + + kc := utils.SetKafkaParametersByObjectMap(kafkaConfigMapObj) + hc := utils.SetHdfsParametersByObjectMap(hdfsConfigObj) + + //populate the channelMap + pipelineChan := make(chan struct{}) + slogger.Infof("Channel created by post :: %v", pipelineChan) + uuid := guuid.New().String() + //slogger.Infof("guuid :: %s",uuid) + slogger.Infof(":: Storing writerName and channel in ChannelMap :: ") + writerName := writerStr+"-"+uuid[len(uuid)-4:] + slogger.Infof("::writerName:: %s ",writerName) + ChannelMap[writerName] = pipelineChan + + // envoke the go routine to build pipeline + go pipeline.BuildWriterPipeline(kc,hc, writerName, ChannelMap[writerName]) + successMessage := fmt.Sprintf("Created the writer ::%s", writerName) + w.WriteHeader(http.StatusOK) + fmt.Fprintln(w,successMessage) +} + + +// deleteWriter deletes a given writer pipeline +func deleteWriter(w http.ResponseWriter, r *http.Request){ + vars := mux.Vars(r) + writerName := vars["writerName"] + if _, keyExists := ChannelMap[writerName]; keyExists{ + slogger.Infof("::Writer to be closed:: %s",writerName) + toBeClosedChannel := ChannelMap[writerName] + close(toBeClosedChannel) + // deleting the channel from ChannelMap after closure to + // avoid closing the closed channel + delete(ChannelMap, writerName) + + w.WriteHeader(http.StatusOK) + deleteMessage := fmt.Sprintf("Deleted writer :: %s",writerName) + fmt.Fprintln(w,deleteMessage) + + }else{ + notFoundMessage := fmt.Sprintf("Could not find writer :: %s",writerName) + fmt.Fprintln(w,notFoundMessage) + } + +} \ No newline at end of file -- cgit