aboutsummaryrefslogtreecommitdiffstats
path: root/vnfs/DAaaS/microservices/GoApps/src/go-hdfs-writer/pkg/handler/handler.go
diff options
context:
space:
mode:
Diffstat (limited to 'vnfs/DAaaS/microservices/GoApps/src/go-hdfs-writer/pkg/handler/handler.go')
-rw-r--r--vnfs/DAaaS/microservices/GoApps/src/go-hdfs-writer/pkg/handler/handler.go96
1 files changed, 96 insertions, 0 deletions
diff --git a/vnfs/DAaaS/microservices/GoApps/src/go-hdfs-writer/pkg/handler/handler.go b/vnfs/DAaaS/microservices/GoApps/src/go-hdfs-writer/pkg/handler/handler.go
new file mode 100644
index 00000000..65021b4a
--- /dev/null
+++ b/vnfs/DAaaS/microservices/GoApps/src/go-hdfs-writer/pkg/handler/handler.go
@@ -0,0 +1,96 @@
+package handler
+
+
+import (
+ "fmt"
+ "net/http"
+ "io/ioutil"
+ "encoding/json"
+ "github.com/gorilla/mux"
+
+ guuid "github.com/google/uuid"
+ pipeline "hdfs-writer/pkg/pipeline"
+ utils "hdfs-writer/pkg/utils"
+)
+
+
+var slogger = utils.GetLoggerInstance()
+// ChannelMap is the global map to store writerNames as key and channels as values.
+var ChannelMap =make(map[string]chan struct{})
+
+
+// This is a sample test request handler
+func testFunc(w http.ResponseWriter, r *http.Request){
+ slogger.Info("Invoking testFunc ...")
+ w.WriteHeader(http.StatusOK)
+ fmt.Fprintln(w,"HTTP Test successful ")
+}
+
+// CreateRouter returns a http handler for the registered URLs
+func CreateRouter() http.Handler{
+ router := mux.NewRouter().StrictSlash(true)
+ slogger.Info("Created router ...")
+ router.HandleFunc("/test", testFunc).Methods("GET")
+ router.HandleFunc("/createWriter", createWriter).Methods("POST")
+ router.HandleFunc("/deleteWriter/{writerName}", deleteWriter).Methods("DELETE")
+ return router
+}
+
+
+// CreateWriter creates a pipeline
+func createWriter(w http.ResponseWriter, r *http.Request){
+ reqBody, _ := ioutil.ReadAll(r.Body)
+ slogger.Info(string(reqBody))
+ var results map[string]interface{}
+ json.Unmarshal(reqBody, &results)
+ if len(results)==0{
+ slogger.Fatalf("Unable to read from the config json file, unable to create configObject map")
+ }
+ writerStr := "writer"
+ writer := results[writerStr].(map[string]interface{})
+ kafkaConfigMapObj := writer["kafkaConfig"].(map[string]interface{})
+ hdfsConfigObj := writer["hdfsConfig"].(map[string]interface{})
+
+ kc := utils.SetKafkaParametersByObjectMap(kafkaConfigMapObj)
+ hc := utils.SetHdfsParametersByObjectMap(hdfsConfigObj)
+
+ //populate the channelMap
+ pipelineChan := make(chan struct{})
+ slogger.Infof("Channel created by post :: %v", pipelineChan)
+ uuid := guuid.New().String()
+ //slogger.Infof("guuid :: %s",uuid)
+ slogger.Infof(":: Storing writerName and channel in ChannelMap :: ")
+ writerName := writerStr+"-"+uuid[len(uuid)-4:]
+ slogger.Infof("::writerName:: %s ",writerName)
+ ChannelMap[writerName] = pipelineChan
+
+ // envoke the go routine to build pipeline
+ go pipeline.BuildWriterPipeline(kc,hc, writerName, ChannelMap[writerName])
+ successMessage := fmt.Sprintf("Created the writer ::%s", writerName)
+ w.WriteHeader(http.StatusOK)
+ fmt.Fprintln(w,successMessage)
+}
+
+
+// deleteWriter deletes a given writer pipeline
+func deleteWriter(w http.ResponseWriter, r *http.Request){
+ vars := mux.Vars(r)
+ writerName := vars["writerName"]
+ if _, keyExists := ChannelMap[writerName]; keyExists{
+ slogger.Infof("::Writer to be closed:: %s",writerName)
+ toBeClosedChannel := ChannelMap[writerName]
+ close(toBeClosedChannel)
+ // deleting the channel from ChannelMap after closure to
+ // avoid closing the closed channel
+ delete(ChannelMap, writerName)
+
+ w.WriteHeader(http.StatusOK)
+ deleteMessage := fmt.Sprintf("Deleted writer :: %s",writerName)
+ fmt.Fprintln(w,deleteMessage)
+
+ }else{
+ notFoundMessage := fmt.Sprintf("Could not find writer :: %s",writerName)
+ fmt.Fprintln(w,notFoundMessage)
+ }
+
+} \ No newline at end of file