summaryrefslogtreecommitdiffstats
path: root/vnfs/DAaaS/microservices/GoApps/src/go-hdfs-writer/pkg/handler
diff options
context:
space:
mode:
authorRajamohan Raj <rajamohan.raj@intel.com>2019-10-31 23:51:29 +0000
committerMarco Platania <platania@research.att.com>2019-11-04 14:02:09 +0000
commit68d118176bb53c36b31a7060cfa16ad5acac1765 (patch)
treec6dfd74445126f24e8d63457e33720fc5e0d38bb /vnfs/DAaaS/microservices/GoApps/src/go-hdfs-writer/pkg/handler
parent68042495ef1e1e6dff7fed7fc2691b01cf672fe1 (diff)
HDFS-WriterApp-Fixed all the code review comments
Fixed all the code review comments by Kiran 1. Implemented a boolean channel instead of empty struct channel for signal, use WaitGroup to ensure all writers finish cleanup. 2. Introduce JSON tags for configs 4. remove all panic and fatalf code to ensure that the app doesnt crash anytime. 5. remove unneccessary hdfsWriter null checks. 6.remove the 'run' variable used in the infinite loop, replaced with 'return' Issue-ID: ONAPARC-453 Change-Id: Ic77c59dc75a8898a3cf34999850e6687d40e7faa Signed-off-by: Rajamohan Raj <rajamohan.raj@intel.com>
Diffstat (limited to 'vnfs/DAaaS/microservices/GoApps/src/go-hdfs-writer/pkg/handler')
-rw-r--r--vnfs/DAaaS/microservices/GoApps/src/go-hdfs-writer/pkg/handler/handler.go154
1 files changed, 87 insertions, 67 deletions
diff --git a/vnfs/DAaaS/microservices/GoApps/src/go-hdfs-writer/pkg/handler/handler.go b/vnfs/DAaaS/microservices/GoApps/src/go-hdfs-writer/pkg/handler/handler.go
index 65021b4a..568cb8c6 100644
--- a/vnfs/DAaaS/microservices/GoApps/src/go-hdfs-writer/pkg/handler/handler.go
+++ b/vnfs/DAaaS/microservices/GoApps/src/go-hdfs-writer/pkg/handler/handler.go
@@ -1,96 +1,116 @@
package handler
-
import (
- "fmt"
- "net/http"
- "io/ioutil"
"encoding/json"
+ "fmt"
"github.com/gorilla/mux"
+ "io/ioutil"
+ "net/http"
+ "strings"
- guuid "github.com/google/uuid"
pipeline "hdfs-writer/pkg/pipeline"
utils "hdfs-writer/pkg/utils"
)
-
var slogger = utils.GetLoggerInstance()
-// ChannelMap is the global map to store writerNames as key and channels as values.
-var ChannelMap =make(map[string]chan struct{})
-
-
-// This is a sample test request handler
-func testFunc(w http.ResponseWriter, r *http.Request){
- slogger.Info("Invoking testFunc ...")
- w.WriteHeader(http.StatusOK)
- fmt.Fprintln(w,"HTTP Test successful ")
-}
// CreateRouter returns a http handler for the registered URLs
-func CreateRouter() http.Handler{
+func CreateRouter() http.Handler {
router := mux.NewRouter().StrictSlash(true)
slogger.Info("Created router ...")
- router.HandleFunc("/test", testFunc).Methods("GET")
- router.HandleFunc("/createWriter", createWriter).Methods("POST")
- router.HandleFunc("/deleteWriter/{writerName}", deleteWriter).Methods("DELETE")
+ router.HandleFunc("/v1/writer", createWriter).Methods("POST")
+ router.HandleFunc("/v1/writer/{writerName}", deleteWriter).Methods("DELETE")
+ router.HandleFunc("/v1/writers", getAllWriters).Methods("GET")
return router
}
-
// CreateWriter creates a pipeline
-func createWriter(w http.ResponseWriter, r *http.Request){
- reqBody, _ := ioutil.ReadAll(r.Body)
+func createWriter(w http.ResponseWriter, r *http.Request) {
+ if r.Body == nil {
+ http.Error(w, "Empty body", http.StatusBadRequest)
+ return
+ }
+ reqBody, err := ioutil.ReadAll(r.Body)
+ if err != nil {
+ http.Error(w, err.Error(), http.StatusUnprocessableEntity)
+ return
+ }
slogger.Info(string(reqBody))
- var results map[string]interface{}
- json.Unmarshal(reqBody, &results)
- if len(results)==0{
- slogger.Fatalf("Unable to read from the config json file, unable to create configObject map")
+ var results utils.Pipeline
+ error := json.Unmarshal(reqBody, &results)
+ if error != nil {
+ unableToParse := fmt.Sprintf("Could not unmarshal the JSON in create request :: %s", err.Error())
+ fmt.Fprintln(w, unableToParse)
+ return
}
- writerStr := "writer"
- writer := results[writerStr].(map[string]interface{})
- kafkaConfigMapObj := writer["kafkaConfig"].(map[string]interface{})
- hdfsConfigObj := writer["hdfsConfig"].(map[string]interface{})
-
- kc := utils.SetKafkaParametersByObjectMap(kafkaConfigMapObj)
- hc := utils.SetHdfsParametersByObjectMap(hdfsConfigObj)
-
- //populate the channelMap
- pipelineChan := make(chan struct{})
- slogger.Infof("Channel created by post :: %v", pipelineChan)
- uuid := guuid.New().String()
- //slogger.Infof("guuid :: %s",uuid)
- slogger.Infof(":: Storing writerName and channel in ChannelMap :: ")
- writerName := writerStr+"-"+uuid[len(uuid)-4:]
- slogger.Infof("::writerName:: %s ",writerName)
- ChannelMap[writerName] = pipelineChan
-
- // envoke the go routine to build pipeline
- go pipeline.BuildWriterPipeline(kc,hc, writerName, ChannelMap[writerName])
+ if validateKafkaConfig(results.KafkaConfiguration) == false {
+ http.Error(w, "Validation failed for kafka config items, check logs ..", http.StatusBadRequest)
+ return
+ }
+ if validateHdfsConfig(results.HdfsConfiguration) == false {
+ http.Error(w, "Validation failed for hdfs config items, check logs ..", http.StatusBadRequest)
+ return
+ }
+ writerName := pipeline.CreatePipeline(results.KafkaConfiguration, results.HdfsConfiguration)
successMessage := fmt.Sprintf("Created the writer ::%s", writerName)
- w.WriteHeader(http.StatusOK)
- fmt.Fprintln(w,successMessage)
+ w.Header().Set("Content-Type", "application/json")
+ w.WriteHeader(http.StatusCreated)
+ fmt.Fprintln(w, successMessage)
}
-
// deleteWriter deletes a given writer pipeline
-func deleteWriter(w http.ResponseWriter, r *http.Request){
+func deleteWriter(w http.ResponseWriter, r *http.Request) {
vars := mux.Vars(r)
writerName := vars["writerName"]
- if _, keyExists := ChannelMap[writerName]; keyExists{
- slogger.Infof("::Writer to be closed:: %s",writerName)
- toBeClosedChannel := ChannelMap[writerName]
- close(toBeClosedChannel)
- // deleting the channel from ChannelMap after closure to
- // avoid closing the closed channel
- delete(ChannelMap, writerName)
-
+ if _, keyExists := pipeline.ChannelMap[writerName]; keyExists {
+ pipeline.DeletePipeline(writerName)
w.WriteHeader(http.StatusOK)
- deleteMessage := fmt.Sprintf("Deleted writer :: %s",writerName)
- fmt.Fprintln(w,deleteMessage)
-
- }else{
- notFoundMessage := fmt.Sprintf("Could not find writer :: %s",writerName)
- fmt.Fprintln(w,notFoundMessage)
+ deleteMessage := fmt.Sprintf("Deleted writer :: %s", writerName)
+ fmt.Fprintln(w, deleteMessage)
+ } else {
+ notFoundMessage := fmt.Sprintf("Could not find writer :: %s", writerName)
+ fmt.Fprintln(w, notFoundMessage)
+
+ }
+}
+
+// validateKafkaConfig validates the kafka config items and returns true if they are valid.
+func validateKafkaConfig(k utils.KafkaConfig) bool {
+ if strings.TrimSpace(k.Broker) == "" {
+ fmt.Println("Broker is empty!")
+ slogger.Infof("Broker is empty!")
+ return false
+ }
+ if strings.TrimSpace(k.Group) == "" {
+ fmt.Println("Group is empty!")
+ slogger.Infof("Group is empty!")
+ return false
}
-
-} \ No newline at end of file
+ if strings.TrimSpace(k.Topic) == "" {
+ fmt.Println("Topic is empty!")
+ slogger.Infof("Topic is empty!")
+ return false
+ }
+ return true
+}
+
+// validateHdfsConfig validates the kafka config items and returns true if they are valid.
+func validateHdfsConfig(h utils.HdfsConfig) bool {
+ if strings.TrimSpace(h.HdfsURL) == "" {
+ fmt.Println("HdfsURL is empty!")
+ return false
+ }
+ return true
+}
+
+// getAllWriters list down the active writers
+func getAllWriters(w http.ResponseWriter, r *http.Request) {
+ slogger.Info("Listing all the writers ...")
+ var listOfWriters []string
+ for k := range pipeline.ChannelMap {
+ listOfWriters = append(listOfWriters, k)
+ }
+ w.Header().Set("Content-Type", "application/json")
+ w.WriteHeader(http.StatusOK)
+ w.Write([]byte(fmt.Sprintf(`{"Writers" : "%v"}`, listOfWriters)))
+}