aboutsummaryrefslogtreecommitdiffstats
path: root/vnfs/DAaaS/microservices/GoApps/src/go-hdfs-writer/pkg/handler/handler.go
diff options
context:
space:
mode:
Diffstat (limited to 'vnfs/DAaaS/microservices/GoApps/src/go-hdfs-writer/pkg/handler/handler.go')
-rw-r--r--vnfs/DAaaS/microservices/GoApps/src/go-hdfs-writer/pkg/handler/handler.go154
1 files changed, 87 insertions, 67 deletions
diff --git a/vnfs/DAaaS/microservices/GoApps/src/go-hdfs-writer/pkg/handler/handler.go b/vnfs/DAaaS/microservices/GoApps/src/go-hdfs-writer/pkg/handler/handler.go
index 65021b4a..568cb8c6 100644
--- a/vnfs/DAaaS/microservices/GoApps/src/go-hdfs-writer/pkg/handler/handler.go
+++ b/vnfs/DAaaS/microservices/GoApps/src/go-hdfs-writer/pkg/handler/handler.go
@@ -1,96 +1,116 @@
package handler
-
import (
- "fmt"
- "net/http"
- "io/ioutil"
"encoding/json"
+ "fmt"
"github.com/gorilla/mux"
+ "io/ioutil"
+ "net/http"
+ "strings"
- guuid "github.com/google/uuid"
pipeline "hdfs-writer/pkg/pipeline"
utils "hdfs-writer/pkg/utils"
)
-
var slogger = utils.GetLoggerInstance()
-// ChannelMap is the global map to store writerNames as key and channels as values.
-var ChannelMap =make(map[string]chan struct{})
-
-
-// This is a sample test request handler
-func testFunc(w http.ResponseWriter, r *http.Request){
- slogger.Info("Invoking testFunc ...")
- w.WriteHeader(http.StatusOK)
- fmt.Fprintln(w,"HTTP Test successful ")
-}
// CreateRouter returns a http handler for the registered URLs
-func CreateRouter() http.Handler{
+func CreateRouter() http.Handler {
router := mux.NewRouter().StrictSlash(true)
slogger.Info("Created router ...")
- router.HandleFunc("/test", testFunc).Methods("GET")
- router.HandleFunc("/createWriter", createWriter).Methods("POST")
- router.HandleFunc("/deleteWriter/{writerName}", deleteWriter).Methods("DELETE")
+ router.HandleFunc("/v1/writer", createWriter).Methods("POST")
+ router.HandleFunc("/v1/writer/{writerName}", deleteWriter).Methods("DELETE")
+ router.HandleFunc("/v1/writers", getAllWriters).Methods("GET")
return router
}
-
// CreateWriter creates a pipeline
-func createWriter(w http.ResponseWriter, r *http.Request){
- reqBody, _ := ioutil.ReadAll(r.Body)
+func createWriter(w http.ResponseWriter, r *http.Request) {
+ if r.Body == nil {
+ http.Error(w, "Empty body", http.StatusBadRequest)
+ return
+ }
+ reqBody, err := ioutil.ReadAll(r.Body)
+ if err != nil {
+ http.Error(w, err.Error(), http.StatusUnprocessableEntity)
+ return
+ }
slogger.Info(string(reqBody))
- var results map[string]interface{}
- json.Unmarshal(reqBody, &results)
- if len(results)==0{
- slogger.Fatalf("Unable to read from the config json file, unable to create configObject map")
+ var results utils.Pipeline
+ error := json.Unmarshal(reqBody, &results)
+ if error != nil {
+ unableToParse := fmt.Sprintf("Could not unmarshal the JSON in create request :: %s", err.Error())
+ fmt.Fprintln(w, unableToParse)
+ return
}
- writerStr := "writer"
- writer := results[writerStr].(map[string]interface{})
- kafkaConfigMapObj := writer["kafkaConfig"].(map[string]interface{})
- hdfsConfigObj := writer["hdfsConfig"].(map[string]interface{})
-
- kc := utils.SetKafkaParametersByObjectMap(kafkaConfigMapObj)
- hc := utils.SetHdfsParametersByObjectMap(hdfsConfigObj)
-
- //populate the channelMap
- pipelineChan := make(chan struct{})
- slogger.Infof("Channel created by post :: %v", pipelineChan)
- uuid := guuid.New().String()
- //slogger.Infof("guuid :: %s",uuid)
- slogger.Infof(":: Storing writerName and channel in ChannelMap :: ")
- writerName := writerStr+"-"+uuid[len(uuid)-4:]
- slogger.Infof("::writerName:: %s ",writerName)
- ChannelMap[writerName] = pipelineChan
-
- // envoke the go routine to build pipeline
- go pipeline.BuildWriterPipeline(kc,hc, writerName, ChannelMap[writerName])
+ if validateKafkaConfig(results.KafkaConfiguration) == false {
+ http.Error(w, "Validation failed for kafka config items, check logs ..", http.StatusBadRequest)
+ return
+ }
+ if validateHdfsConfig(results.HdfsConfiguration) == false {
+ http.Error(w, "Validation failed for hdfs config items, check logs ..", http.StatusBadRequest)
+ return
+ }
+ writerName := pipeline.CreatePipeline(results.KafkaConfiguration, results.HdfsConfiguration)
successMessage := fmt.Sprintf("Created the writer ::%s", writerName)
- w.WriteHeader(http.StatusOK)
- fmt.Fprintln(w,successMessage)
+ w.Header().Set("Content-Type", "application/json")
+ w.WriteHeader(http.StatusCreated)
+ fmt.Fprintln(w, successMessage)
}
-
// deleteWriter deletes a given writer pipeline
-func deleteWriter(w http.ResponseWriter, r *http.Request){
+func deleteWriter(w http.ResponseWriter, r *http.Request) {
vars := mux.Vars(r)
writerName := vars["writerName"]
- if _, keyExists := ChannelMap[writerName]; keyExists{
- slogger.Infof("::Writer to be closed:: %s",writerName)
- toBeClosedChannel := ChannelMap[writerName]
- close(toBeClosedChannel)
- // deleting the channel from ChannelMap after closure to
- // avoid closing the closed channel
- delete(ChannelMap, writerName)
-
+ if _, keyExists := pipeline.ChannelMap[writerName]; keyExists {
+ pipeline.DeletePipeline(writerName)
w.WriteHeader(http.StatusOK)
- deleteMessage := fmt.Sprintf("Deleted writer :: %s",writerName)
- fmt.Fprintln(w,deleteMessage)
-
- }else{
- notFoundMessage := fmt.Sprintf("Could not find writer :: %s",writerName)
- fmt.Fprintln(w,notFoundMessage)
+ deleteMessage := fmt.Sprintf("Deleted writer :: %s", writerName)
+ fmt.Fprintln(w, deleteMessage)
+ } else {
+ notFoundMessage := fmt.Sprintf("Could not find writer :: %s", writerName)
+ fmt.Fprintln(w, notFoundMessage)
+
+ }
+}
+
+// validateKafkaConfig validates the kafka config items and returns true if they are valid.
+func validateKafkaConfig(k utils.KafkaConfig) bool {
+ if strings.TrimSpace(k.Broker) == "" {
+ fmt.Println("Broker is empty!")
+ slogger.Infof("Broker is empty!")
+ return false
+ }
+ if strings.TrimSpace(k.Group) == "" {
+ fmt.Println("Group is empty!")
+ slogger.Infof("Group is empty!")
+ return false
}
-
-} \ No newline at end of file
+ if strings.TrimSpace(k.Topic) == "" {
+ fmt.Println("Topic is empty!")
+ slogger.Infof("Topic is empty!")
+ return false
+ }
+ return true
+}
+
+// validateHdfsConfig validates the kafka config items and returns true if they are valid.
+func validateHdfsConfig(h utils.HdfsConfig) bool {
+ if strings.TrimSpace(h.HdfsURL) == "" {
+ fmt.Println("HdfsURL is empty!")
+ return false
+ }
+ return true
+}
+
+// getAllWriters list down the active writers
+func getAllWriters(w http.ResponseWriter, r *http.Request) {
+ slogger.Info("Listing all the writers ...")
+ var listOfWriters []string
+ for k := range pipeline.ChannelMap {
+ listOfWriters = append(listOfWriters, k)
+ }
+ w.Header().Set("Content-Type", "application/json")
+ w.WriteHeader(http.StatusOK)
+ w.Write([]byte(fmt.Sprintf(`{"Writers" : "%v"}`, listOfWriters)))
+}