aboutsummaryrefslogtreecommitdiffstats
path: root/vnfs/DAaaS/microservices/GoApps/src/go-hdfs-writer/pkg/handler/handler.go
blob: 65021b4a0ccdb96a3f22a14f273575d1af5e9df9 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
package handler


import (
	"fmt"
	"net/http"
	"io/ioutil"
	"encoding/json"
	"github.com/gorilla/mux"

	guuid "github.com/google/uuid"
	pipeline "hdfs-writer/pkg/pipeline"
	utils "hdfs-writer/pkg/utils"
)


var slogger = utils.GetLoggerInstance()
// ChannelMap is the global map to store writerNames as key and channels as values.
var ChannelMap =make(map[string]chan struct{})


// This is a sample test request handler
func testFunc(w http.ResponseWriter, r *http.Request){
	slogger.Info("Invoking testFunc ...")
	w.WriteHeader(http.StatusOK)
	fmt.Fprintln(w,"HTTP Test successful ")
}

// CreateRouter returns a http handler for the registered URLs
func CreateRouter() http.Handler{
	router := mux.NewRouter().StrictSlash(true)
	slogger.Info("Created router ...")
	router.HandleFunc("/test", testFunc).Methods("GET")
	router.HandleFunc("/createWriter", createWriter).Methods("POST")
	router.HandleFunc("/deleteWriter/{writerName}", deleteWriter).Methods("DELETE")
	return router
}


// CreateWriter creates a pipeline
func createWriter(w http.ResponseWriter, r *http.Request){
	reqBody, _ := ioutil.ReadAll(r.Body)
	slogger.Info(string(reqBody))
	var results map[string]interface{}
	json.Unmarshal(reqBody, &results)
	if len(results)==0{
		slogger.Fatalf("Unable to read from the config json file, unable to create configObject map")
	}
	writerStr := "writer"
	writer := results[writerStr].(map[string]interface{})
	kafkaConfigMapObj := writer["kafkaConfig"].(map[string]interface{})
	hdfsConfigObj := writer["hdfsConfig"].(map[string]interface{})

	kc := utils.SetKafkaParametersByObjectMap(kafkaConfigMapObj)
	hc := utils.SetHdfsParametersByObjectMap(hdfsConfigObj)

	//populate the channelMap
	pipelineChan := make(chan struct{})
	slogger.Infof("Channel created by post :: %v", pipelineChan)
	uuid := guuid.New().String()
	//slogger.Infof("guuid :: %s",uuid)
	slogger.Infof(":: Storing writerName and channel in ChannelMap :: ")
	writerName := writerStr+"-"+uuid[len(uuid)-4:]
	slogger.Infof("::writerName:: %s ",writerName)
	ChannelMap[writerName] = pipelineChan
	
	// envoke the go routine to build pipeline
	go pipeline.BuildWriterPipeline(kc,hc, writerName, ChannelMap[writerName])
	successMessage := fmt.Sprintf("Created the writer ::%s", writerName)
	w.WriteHeader(http.StatusOK)
	fmt.Fprintln(w,successMessage)
}


// deleteWriter deletes a given writer pipeline
func deleteWriter(w http.ResponseWriter, r *http.Request){
	vars := mux.Vars(r)
	writerName := vars["writerName"]
	if _, keyExists := ChannelMap[writerName]; keyExists{
		slogger.Infof("::Writer to be closed:: %s",writerName)
		toBeClosedChannel := ChannelMap[writerName]
		close(toBeClosedChannel)
		// deleting the channel from ChannelMap after closure to 
		// avoid closing the closed channel
		delete(ChannelMap, writerName)

		w.WriteHeader(http.StatusOK)
		deleteMessage := fmt.Sprintf("Deleted writer :: %s",writerName)
		fmt.Fprintln(w,deleteMessage)
		
	}else{
		notFoundMessage := fmt.Sprintf("Could not find writer :: %s",writerName)
		fmt.Fprintln(w,notFoundMessage)
	}
	
}