diff options
author | 2019-10-15 00:48:18 +0000 | |
---|---|---|
committer | 2019-10-15 13:30:04 +0000 | |
commit | 99f7370360201104ddfc99b5e766b4e32e8524cc (patch) | |
tree | 9e7dbc4db186e85afd0654b64bdb4cabdb7b81ab /vnfs/DAaaS/microservices/GoApps/src/go-hdfs-writer/pkg/handler | |
parent | 3289af42fd3af32fd07c565d072c65743249ebce (diff) |
HDFSWriter microservice working copy
Issue-ID: ONAPARC-453
Signed-off-by: Rajamohan Raj <rajamohan.raj@intel.com>
Change-Id: I11c91b642e466763c1ca6f5734bf81fb260e2b39
Diffstat (limited to 'vnfs/DAaaS/microservices/GoApps/src/go-hdfs-writer/pkg/handler')
-rw-r--r-- | vnfs/DAaaS/microservices/GoApps/src/go-hdfs-writer/pkg/handler/handler.go | 96 |
1 files changed, 96 insertions, 0 deletions
diff --git a/vnfs/DAaaS/microservices/GoApps/src/go-hdfs-writer/pkg/handler/handler.go b/vnfs/DAaaS/microservices/GoApps/src/go-hdfs-writer/pkg/handler/handler.go new file mode 100644 index 00000000..65021b4a --- /dev/null +++ b/vnfs/DAaaS/microservices/GoApps/src/go-hdfs-writer/pkg/handler/handler.go @@ -0,0 +1,96 @@ +package handler + + +import ( + "fmt" + "net/http" + "io/ioutil" + "encoding/json" + "github.com/gorilla/mux" + + guuid "github.com/google/uuid" + pipeline "hdfs-writer/pkg/pipeline" + utils "hdfs-writer/pkg/utils" +) + + +var slogger = utils.GetLoggerInstance() +// ChannelMap is the global map to store writerNames as key and channels as values. +var ChannelMap =make(map[string]chan struct{}) + + +// This is a sample test request handler +func testFunc(w http.ResponseWriter, r *http.Request){ + slogger.Info("Invoking testFunc ...") + w.WriteHeader(http.StatusOK) + fmt.Fprintln(w,"HTTP Test successful ") +} + +// CreateRouter returns a http handler for the registered URLs +func CreateRouter() http.Handler{ + router := mux.NewRouter().StrictSlash(true) + slogger.Info("Created router ...") + router.HandleFunc("/test", testFunc).Methods("GET") + router.HandleFunc("/createWriter", createWriter).Methods("POST") + router.HandleFunc("/deleteWriter/{writerName}", deleteWriter).Methods("DELETE") + return router +} + + +// CreateWriter creates a pipeline +func createWriter(w http.ResponseWriter, r *http.Request){ + reqBody, _ := ioutil.ReadAll(r.Body) + slogger.Info(string(reqBody)) + var results map[string]interface{} + json.Unmarshal(reqBody, &results) + if len(results)==0{ + slogger.Fatalf("Unable to read from the config json file, unable to create configObject map") + } + writerStr := "writer" + writer := results[writerStr].(map[string]interface{}) + kafkaConfigMapObj := writer["kafkaConfig"].(map[string]interface{}) + hdfsConfigObj := writer["hdfsConfig"].(map[string]interface{}) + + kc := utils.SetKafkaParametersByObjectMap(kafkaConfigMapObj) + hc := utils.SetHdfsParametersByObjectMap(hdfsConfigObj) + + //populate the channelMap + pipelineChan := make(chan struct{}) + slogger.Infof("Channel created by post :: %v", pipelineChan) + uuid := guuid.New().String() + //slogger.Infof("guuid :: %s",uuid) + slogger.Infof(":: Storing writerName and channel in ChannelMap :: ") + writerName := writerStr+"-"+uuid[len(uuid)-4:] + slogger.Infof("::writerName:: %s ",writerName) + ChannelMap[writerName] = pipelineChan + + // envoke the go routine to build pipeline + go pipeline.BuildWriterPipeline(kc,hc, writerName, ChannelMap[writerName]) + successMessage := fmt.Sprintf("Created the writer ::%s", writerName) + w.WriteHeader(http.StatusOK) + fmt.Fprintln(w,successMessage) +} + + +// deleteWriter deletes a given writer pipeline +func deleteWriter(w http.ResponseWriter, r *http.Request){ + vars := mux.Vars(r) + writerName := vars["writerName"] + if _, keyExists := ChannelMap[writerName]; keyExists{ + slogger.Infof("::Writer to be closed:: %s",writerName) + toBeClosedChannel := ChannelMap[writerName] + close(toBeClosedChannel) + // deleting the channel from ChannelMap after closure to + // avoid closing the closed channel + delete(ChannelMap, writerName) + + w.WriteHeader(http.StatusOK) + deleteMessage := fmt.Sprintf("Deleted writer :: %s",writerName) + fmt.Fprintln(w,deleteMessage) + + }else{ + notFoundMessage := fmt.Sprintf("Could not find writer :: %s",writerName) + fmt.Fprintln(w,notFoundMessage) + } + +}
\ No newline at end of file |