diff options
Diffstat (limited to 'vnfs/DAaaS/microservices/GoApps/src/go-hdfs-writer/pkg/handler/handler.go')
-rw-r--r-- | vnfs/DAaaS/microservices/GoApps/src/go-hdfs-writer/pkg/handler/handler.go | 154 |
1 files changed, 87 insertions, 67 deletions
diff --git a/vnfs/DAaaS/microservices/GoApps/src/go-hdfs-writer/pkg/handler/handler.go b/vnfs/DAaaS/microservices/GoApps/src/go-hdfs-writer/pkg/handler/handler.go index 65021b4a..568cb8c6 100644 --- a/vnfs/DAaaS/microservices/GoApps/src/go-hdfs-writer/pkg/handler/handler.go +++ b/vnfs/DAaaS/microservices/GoApps/src/go-hdfs-writer/pkg/handler/handler.go @@ -1,96 +1,116 @@ package handler - import ( - "fmt" - "net/http" - "io/ioutil" "encoding/json" + "fmt" "github.com/gorilla/mux" + "io/ioutil" + "net/http" + "strings" - guuid "github.com/google/uuid" pipeline "hdfs-writer/pkg/pipeline" utils "hdfs-writer/pkg/utils" ) - var slogger = utils.GetLoggerInstance() -// ChannelMap is the global map to store writerNames as key and channels as values. -var ChannelMap =make(map[string]chan struct{}) - - -// This is a sample test request handler -func testFunc(w http.ResponseWriter, r *http.Request){ - slogger.Info("Invoking testFunc ...") - w.WriteHeader(http.StatusOK) - fmt.Fprintln(w,"HTTP Test successful ") -} // CreateRouter returns a http handler for the registered URLs -func CreateRouter() http.Handler{ +func CreateRouter() http.Handler { router := mux.NewRouter().StrictSlash(true) slogger.Info("Created router ...") - router.HandleFunc("/test", testFunc).Methods("GET") - router.HandleFunc("/createWriter", createWriter).Methods("POST") - router.HandleFunc("/deleteWriter/{writerName}", deleteWriter).Methods("DELETE") + router.HandleFunc("/v1/writer", createWriter).Methods("POST") + router.HandleFunc("/v1/writer/{writerName}", deleteWriter).Methods("DELETE") + router.HandleFunc("/v1/writers", getAllWriters).Methods("GET") return router } - // CreateWriter creates a pipeline -func createWriter(w http.ResponseWriter, r *http.Request){ - reqBody, _ := ioutil.ReadAll(r.Body) +func createWriter(w http.ResponseWriter, r *http.Request) { + if r.Body == nil { + http.Error(w, "Empty body", http.StatusBadRequest) + return + } + reqBody, err := ioutil.ReadAll(r.Body) + if err != nil { + http.Error(w, err.Error(), http.StatusUnprocessableEntity) + return + } slogger.Info(string(reqBody)) - var results map[string]interface{} - json.Unmarshal(reqBody, &results) - if len(results)==0{ - slogger.Fatalf("Unable to read from the config json file, unable to create configObject map") + var results utils.Pipeline + error := json.Unmarshal(reqBody, &results) + if error != nil { + unableToParse := fmt.Sprintf("Could not unmarshal the JSON in create request :: %s", err.Error()) + fmt.Fprintln(w, unableToParse) + return } - writerStr := "writer" - writer := results[writerStr].(map[string]interface{}) - kafkaConfigMapObj := writer["kafkaConfig"].(map[string]interface{}) - hdfsConfigObj := writer["hdfsConfig"].(map[string]interface{}) - - kc := utils.SetKafkaParametersByObjectMap(kafkaConfigMapObj) - hc := utils.SetHdfsParametersByObjectMap(hdfsConfigObj) - - //populate the channelMap - pipelineChan := make(chan struct{}) - slogger.Infof("Channel created by post :: %v", pipelineChan) - uuid := guuid.New().String() - //slogger.Infof("guuid :: %s",uuid) - slogger.Infof(":: Storing writerName and channel in ChannelMap :: ") - writerName := writerStr+"-"+uuid[len(uuid)-4:] - slogger.Infof("::writerName:: %s ",writerName) - ChannelMap[writerName] = pipelineChan - - // envoke the go routine to build pipeline - go pipeline.BuildWriterPipeline(kc,hc, writerName, ChannelMap[writerName]) + if validateKafkaConfig(results.KafkaConfiguration) == false { + http.Error(w, "Validation failed for kafka config items, check logs ..", http.StatusBadRequest) + return + } + if validateHdfsConfig(results.HdfsConfiguration) == false { + http.Error(w, "Validation failed for hdfs config items, check logs ..", http.StatusBadRequest) + return + } + writerName := pipeline.CreatePipeline(results.KafkaConfiguration, results.HdfsConfiguration) successMessage := fmt.Sprintf("Created the writer ::%s", writerName) - w.WriteHeader(http.StatusOK) - fmt.Fprintln(w,successMessage) + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(http.StatusCreated) + fmt.Fprintln(w, successMessage) } - // deleteWriter deletes a given writer pipeline -func deleteWriter(w http.ResponseWriter, r *http.Request){ +func deleteWriter(w http.ResponseWriter, r *http.Request) { vars := mux.Vars(r) writerName := vars["writerName"] - if _, keyExists := ChannelMap[writerName]; keyExists{ - slogger.Infof("::Writer to be closed:: %s",writerName) - toBeClosedChannel := ChannelMap[writerName] - close(toBeClosedChannel) - // deleting the channel from ChannelMap after closure to - // avoid closing the closed channel - delete(ChannelMap, writerName) - + if _, keyExists := pipeline.ChannelMap[writerName]; keyExists { + pipeline.DeletePipeline(writerName) w.WriteHeader(http.StatusOK) - deleteMessage := fmt.Sprintf("Deleted writer :: %s",writerName) - fmt.Fprintln(w,deleteMessage) - - }else{ - notFoundMessage := fmt.Sprintf("Could not find writer :: %s",writerName) - fmt.Fprintln(w,notFoundMessage) + deleteMessage := fmt.Sprintf("Deleted writer :: %s", writerName) + fmt.Fprintln(w, deleteMessage) + } else { + notFoundMessage := fmt.Sprintf("Could not find writer :: %s", writerName) + fmt.Fprintln(w, notFoundMessage) + + } +} + +// validateKafkaConfig validates the kafka config items and returns true if they are valid. +func validateKafkaConfig(k utils.KafkaConfig) bool { + if strings.TrimSpace(k.Broker) == "" { + fmt.Println("Broker is empty!") + slogger.Infof("Broker is empty!") + return false + } + if strings.TrimSpace(k.Group) == "" { + fmt.Println("Group is empty!") + slogger.Infof("Group is empty!") + return false } - -}
\ No newline at end of file + if strings.TrimSpace(k.Topic) == "" { + fmt.Println("Topic is empty!") + slogger.Infof("Topic is empty!") + return false + } + return true +} + +// validateHdfsConfig validates the kafka config items and returns true if they are valid. +func validateHdfsConfig(h utils.HdfsConfig) bool { + if strings.TrimSpace(h.HdfsURL) == "" { + fmt.Println("HdfsURL is empty!") + return false + } + return true +} + +// getAllWriters list down the active writers +func getAllWriters(w http.ResponseWriter, r *http.Request) { + slogger.Info("Listing all the writers ...") + var listOfWriters []string + for k := range pipeline.ChannelMap { + listOfWriters = append(listOfWriters, k) + } + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(http.StatusOK) + w.Write([]byte(fmt.Sprintf(`{"Writers" : "%v"}`, listOfWriters))) +} |