aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorRajamohan Raj <rajamohan.raj@intel.com>2019-10-31 23:51:29 +0000
committerMarco Platania <platania@research.att.com>2019-11-04 14:02:09 +0000
commit68d118176bb53c36b31a7060cfa16ad5acac1765 (patch)
treec6dfd74445126f24e8d63457e33720fc5e0d38bb
parent68042495ef1e1e6dff7fed7fc2691b01cf672fe1 (diff)
HDFS-WriterApp-Fixed all the code review comments
Fixed all the code review comments by Kiran 1. Implemented a boolean channel instead of empty struct channel for signal, use WaitGroup to ensure all writers finish cleanup. 2. Introduce JSON tags for configs 4. remove all panic and fatalf code to ensure that the app doesnt crash anytime. 5. remove unneccessary hdfsWriter null checks. 6.remove the 'run' variable used in the infinite loop, replaced with 'return' Issue-ID: ONAPARC-453 Change-Id: Ic77c59dc75a8898a3cf34999850e6687d40e7faa Signed-off-by: Rajamohan Raj <rajamohan.raj@intel.com>
-rw-r--r--vnfs/DAaaS/microservices/GoApps/src/go-hdfs-writer/Dockerfile2
-rw-r--r--vnfs/DAaaS/microservices/GoApps/src/go-hdfs-writer/cmd/hdfs-writer/main.go20
-rw-r--r--vnfs/DAaaS/microservices/GoApps/src/go-hdfs-writer/pkg/handler/handler.go154
-rw-r--r--vnfs/DAaaS/microservices/GoApps/src/go-hdfs-writer/pkg/pipeline/pipeline.go153
-rw-r--r--vnfs/DAaaS/microservices/GoApps/src/go-hdfs-writer/pkg/sample-rest-requests/createWriter.json3
-rw-r--r--vnfs/DAaaS/microservices/GoApps/src/go-hdfs-writer/pkg/utils/hdfs-config.go37
-rw-r--r--vnfs/DAaaS/microservices/GoApps/src/go-hdfs-writer/pkg/utils/hdfsUtils.go15
-rw-r--r--vnfs/DAaaS/microservices/GoApps/src/go-hdfs-writer/pkg/utils/kafka-config.go55
-rw-r--r--vnfs/DAaaS/microservices/GoApps/src/go-hdfs-writer/pkg/utils/types.go19
-rw-r--r--vnfs/DAaaS/microservices/GoApps/src/go-hdfs-writer/skaffold.yaml20
10 files changed, 225 insertions, 253 deletions
diff --git a/vnfs/DAaaS/microservices/GoApps/src/go-hdfs-writer/Dockerfile b/vnfs/DAaaS/microservices/GoApps/src/go-hdfs-writer/Dockerfile
index ee476717..dd2b475d 100644
--- a/vnfs/DAaaS/microservices/GoApps/src/go-hdfs-writer/Dockerfile
+++ b/vnfs/DAaaS/microservices/GoApps/src/go-hdfs-writer/Dockerfile
@@ -7,7 +7,7 @@ RUN go get -u github.com/go-delve/delve/cmd/dlv
WORKDIR /src/hdfs-writer
RUN mkdir /librdkafka-dir && cd /librdkafka-dir
-RUN git clone https://github.com/edenhill/librdkafka.git && \
+RUN git clone https://github.com/edenhill/librdkafka.git && \
cd librdkafka && \
./configure --prefix /usr && \
make && \
diff --git a/vnfs/DAaaS/microservices/GoApps/src/go-hdfs-writer/cmd/hdfs-writer/main.go b/vnfs/DAaaS/microservices/GoApps/src/go-hdfs-writer/cmd/hdfs-writer/main.go
index a79a3e06..4f3cfcbc 100644
--- a/vnfs/DAaaS/microservices/GoApps/src/go-hdfs-writer/cmd/hdfs-writer/main.go
+++ b/vnfs/DAaaS/microservices/GoApps/src/go-hdfs-writer/cmd/hdfs-writer/main.go
@@ -6,9 +6,9 @@ import (
"net/http"
"os"
"os/signal"
- "time"
handler "hdfs-writer/pkg/handler"
+ pipeline "hdfs-writer/pkg/pipeline"
utils "hdfs-writer/pkg/utils"
)
@@ -22,23 +22,24 @@ func main() {
Addr: ":9393",
}
- connectionsClose := make(chan struct{})
+ connectionsClose := make(chan bool)
go func() {
c := make(chan os.Signal, 1)
signal.Notify(c, os.Interrupt)
<-c // function literal waiting to receive Interrupt signal
fmt.Printf(":::Got the kill signal:::")
slogger.Info(":::Got the kill signal:::")
- for eachWriter, eachChannel := range handler.ChannelMap {
+ for eachWriter, eachChannel := range pipeline.ChannelMap {
+ slogger.Info("Begin:: Closing writer goroutines::")
slogger.Infof("Closing writer goroutine :: %s", eachWriter)
- slogger.Infof("eachChannel:: %v", eachChannel)
- close(eachChannel)
- // This wait time ensures that the each of the channel is killed before
- // main routine finishes.
- time.Sleep(time.Second * 5)
+ delete(pipeline.ChannelMap, eachWriter)
+ eachChannel <- true
}
- //once all goroutines are signalled, send close to main thread
+
httpServer.Shutdown(context.Background())
+ /*once all goroutines are signalled and httpServer is shutdown,
+ send close to main thread */
+ connectionsClose <- true
close(connectionsClose)
}()
@@ -47,5 +48,6 @@ func main() {
if err != nil && err != http.ErrServerClosed {
slogger.Fatal(err)
}
+ pipeline.Wg.Wait()
<-connectionsClose //main thread waiting to receive close signal
}
diff --git a/vnfs/DAaaS/microservices/GoApps/src/go-hdfs-writer/pkg/handler/handler.go b/vnfs/DAaaS/microservices/GoApps/src/go-hdfs-writer/pkg/handler/handler.go
index 65021b4a..568cb8c6 100644
--- a/vnfs/DAaaS/microservices/GoApps/src/go-hdfs-writer/pkg/handler/handler.go
+++ b/vnfs/DAaaS/microservices/GoApps/src/go-hdfs-writer/pkg/handler/handler.go
@@ -1,96 +1,116 @@
package handler
-
import (
- "fmt"
- "net/http"
- "io/ioutil"
"encoding/json"
+ "fmt"
"github.com/gorilla/mux"
+ "io/ioutil"
+ "net/http"
+ "strings"
- guuid "github.com/google/uuid"
pipeline "hdfs-writer/pkg/pipeline"
utils "hdfs-writer/pkg/utils"
)
-
var slogger = utils.GetLoggerInstance()
-// ChannelMap is the global map to store writerNames as key and channels as values.
-var ChannelMap =make(map[string]chan struct{})
-
-
-// This is a sample test request handler
-func testFunc(w http.ResponseWriter, r *http.Request){
- slogger.Info("Invoking testFunc ...")
- w.WriteHeader(http.StatusOK)
- fmt.Fprintln(w,"HTTP Test successful ")
-}
// CreateRouter returns a http handler for the registered URLs
-func CreateRouter() http.Handler{
+func CreateRouter() http.Handler {
router := mux.NewRouter().StrictSlash(true)
slogger.Info("Created router ...")
- router.HandleFunc("/test", testFunc).Methods("GET")
- router.HandleFunc("/createWriter", createWriter).Methods("POST")
- router.HandleFunc("/deleteWriter/{writerName}", deleteWriter).Methods("DELETE")
+ router.HandleFunc("/v1/writer", createWriter).Methods("POST")
+ router.HandleFunc("/v1/writer/{writerName}", deleteWriter).Methods("DELETE")
+ router.HandleFunc("/v1/writers", getAllWriters).Methods("GET")
return router
}
-
// CreateWriter creates a pipeline
-func createWriter(w http.ResponseWriter, r *http.Request){
- reqBody, _ := ioutil.ReadAll(r.Body)
+func createWriter(w http.ResponseWriter, r *http.Request) {
+ if r.Body == nil {
+ http.Error(w, "Empty body", http.StatusBadRequest)
+ return
+ }
+ reqBody, err := ioutil.ReadAll(r.Body)
+ if err != nil {
+ http.Error(w, err.Error(), http.StatusUnprocessableEntity)
+ return
+ }
slogger.Info(string(reqBody))
- var results map[string]interface{}
- json.Unmarshal(reqBody, &results)
- if len(results)==0{
- slogger.Fatalf("Unable to read from the config json file, unable to create configObject map")
+ var results utils.Pipeline
+ error := json.Unmarshal(reqBody, &results)
+ if error != nil {
+ unableToParse := fmt.Sprintf("Could not unmarshal the JSON in create request :: %s", err.Error())
+ fmt.Fprintln(w, unableToParse)
+ return
}
- writerStr := "writer"
- writer := results[writerStr].(map[string]interface{})
- kafkaConfigMapObj := writer["kafkaConfig"].(map[string]interface{})
- hdfsConfigObj := writer["hdfsConfig"].(map[string]interface{})
-
- kc := utils.SetKafkaParametersByObjectMap(kafkaConfigMapObj)
- hc := utils.SetHdfsParametersByObjectMap(hdfsConfigObj)
-
- //populate the channelMap
- pipelineChan := make(chan struct{})
- slogger.Infof("Channel created by post :: %v", pipelineChan)
- uuid := guuid.New().String()
- //slogger.Infof("guuid :: %s",uuid)
- slogger.Infof(":: Storing writerName and channel in ChannelMap :: ")
- writerName := writerStr+"-"+uuid[len(uuid)-4:]
- slogger.Infof("::writerName:: %s ",writerName)
- ChannelMap[writerName] = pipelineChan
-
- // envoke the go routine to build pipeline
- go pipeline.BuildWriterPipeline(kc,hc, writerName, ChannelMap[writerName])
+ if validateKafkaConfig(results.KafkaConfiguration) == false {
+ http.Error(w, "Validation failed for kafka config items, check logs ..", http.StatusBadRequest)
+ return
+ }
+ if validateHdfsConfig(results.HdfsConfiguration) == false {
+ http.Error(w, "Validation failed for hdfs config items, check logs ..", http.StatusBadRequest)
+ return
+ }
+ writerName := pipeline.CreatePipeline(results.KafkaConfiguration, results.HdfsConfiguration)
successMessage := fmt.Sprintf("Created the writer ::%s", writerName)
- w.WriteHeader(http.StatusOK)
- fmt.Fprintln(w,successMessage)
+ w.Header().Set("Content-Type", "application/json")
+ w.WriteHeader(http.StatusCreated)
+ fmt.Fprintln(w, successMessage)
}
-
// deleteWriter deletes a given writer pipeline
-func deleteWriter(w http.ResponseWriter, r *http.Request){
+func deleteWriter(w http.ResponseWriter, r *http.Request) {
vars := mux.Vars(r)
writerName := vars["writerName"]
- if _, keyExists := ChannelMap[writerName]; keyExists{
- slogger.Infof("::Writer to be closed:: %s",writerName)
- toBeClosedChannel := ChannelMap[writerName]
- close(toBeClosedChannel)
- // deleting the channel from ChannelMap after closure to
- // avoid closing the closed channel
- delete(ChannelMap, writerName)
-
+ if _, keyExists := pipeline.ChannelMap[writerName]; keyExists {
+ pipeline.DeletePipeline(writerName)
w.WriteHeader(http.StatusOK)
- deleteMessage := fmt.Sprintf("Deleted writer :: %s",writerName)
- fmt.Fprintln(w,deleteMessage)
-
- }else{
- notFoundMessage := fmt.Sprintf("Could not find writer :: %s",writerName)
- fmt.Fprintln(w,notFoundMessage)
+ deleteMessage := fmt.Sprintf("Deleted writer :: %s", writerName)
+ fmt.Fprintln(w, deleteMessage)
+ } else {
+ notFoundMessage := fmt.Sprintf("Could not find writer :: %s", writerName)
+ fmt.Fprintln(w, notFoundMessage)
+
+ }
+}
+
+// validateKafkaConfig validates the kafka config items and returns true if they are valid.
+func validateKafkaConfig(k utils.KafkaConfig) bool {
+ if strings.TrimSpace(k.Broker) == "" {
+ fmt.Println("Broker is empty!")
+ slogger.Infof("Broker is empty!")
+ return false
+ }
+ if strings.TrimSpace(k.Group) == "" {
+ fmt.Println("Group is empty!")
+ slogger.Infof("Group is empty!")
+ return false
}
-
-} \ No newline at end of file
+ if strings.TrimSpace(k.Topic) == "" {
+ fmt.Println("Topic is empty!")
+ slogger.Infof("Topic is empty!")
+ return false
+ }
+ return true
+}
+
+// validateHdfsConfig validates the kafka config items and returns true if they are valid.
+func validateHdfsConfig(h utils.HdfsConfig) bool {
+ if strings.TrimSpace(h.HdfsURL) == "" {
+ fmt.Println("HdfsURL is empty!")
+ return false
+ }
+ return true
+}
+
+// getAllWriters list down the active writers
+func getAllWriters(w http.ResponseWriter, r *http.Request) {
+ slogger.Info("Listing all the writers ...")
+ var listOfWriters []string
+ for k := range pipeline.ChannelMap {
+ listOfWriters = append(listOfWriters, k)
+ }
+ w.Header().Set("Content-Type", "application/json")
+ w.WriteHeader(http.StatusOK)
+ w.Write([]byte(fmt.Sprintf(`{"Writers" : "%v"}`, listOfWriters)))
+}
diff --git a/vnfs/DAaaS/microservices/GoApps/src/go-hdfs-writer/pkg/pipeline/pipeline.go b/vnfs/DAaaS/microservices/GoApps/src/go-hdfs-writer/pkg/pipeline/pipeline.go
index c5dbd3cd..2e192e99 100644
--- a/vnfs/DAaaS/microservices/GoApps/src/go-hdfs-writer/pkg/pipeline/pipeline.go
+++ b/vnfs/DAaaS/microservices/GoApps/src/go-hdfs-writer/pkg/pipeline/pipeline.go
@@ -2,52 +2,84 @@ package pipeline
import (
"fmt"
- "os"
"github.com/colinmarc/hdfs"
"github.com/confluentinc/confluent-kafka-go/kafka"
+ guuid "github.com/google/uuid"
+ "os"
+ "sync"
+
utils "hdfs-writer/pkg/utils"
-
)
-// BuildWriterPipeline builds a pipeline
-func BuildWriterPipeline(k utils.KafkaConfig, h utils.HdfsConfig, writerName string, sigchan chan struct{}) {
- slogger := utils.GetLoggerInstance()
+var slogger = utils.GetLoggerInstance()
+
+// ChannelMap is the global map to store writerNames as key and channels as values.
+//var ChannelMap =make(map[string]chan struct{})
+var ChannelMap = make(map[string]chan bool)
+
+// Wg is of type WaitGroup ensures all the writers have enough time to cleanup a
+var Wg sync.WaitGroup
+var writerStr = "writer"
+
+// CreatePipeline initiates the building of a pipeline
+func CreatePipeline(kc utils.KafkaConfig, hc utils.HdfsConfig) string {
+ //pipelineChan := make(chan struct{})
+ pipelineChan := make(chan bool)
+ uuid := guuid.New().String()
+ slogger.Infof(":: Storing writerName and channel in ChannelMap :: ")
+ writerName := writerStr + "-" + uuid[len(uuid)-4:]
+ slogger.Infof("::writerName:: %s ", writerName)
+ ChannelMap[writerName] = pipelineChan
+
+ //Every create request shall add 1 to the WaitGroup
+ Wg.Add(1)
+ // envoke the go routine to build pipeline
+ go buildWriterPipeline(kc, hc, writerName, ChannelMap[writerName])
+ return writerName
+}
+
+// buildWriterPipeline builds a pipeline
+func buildWriterPipeline(k utils.KafkaConfig, h utils.HdfsConfig, writerName string, sigchan chan bool) {
+
topics := make([]string, 1)
- topics[0] = k.GetTopic()
-
- c,err := kafka.NewConsumer(&kafka.ConfigMap{
- "bootstrap.servers": k.GetBroker(),
+ topics[0] = k.Topic
+
+ c, err := kafka.NewConsumer(&kafka.ConfigMap{
+ "bootstrap.servers": k.Broker,
"broker.address.family": "v4",
- "group.id": k.GetGroup(),
+ "group.id": k.Group,
"session.timeout.ms": 6000,
"auto.offset.reset": "earliest"})
if err != nil {
fmt.Fprintf(os.Stderr, "Failed to create consumer: %s\n", err)
- os.Exit(1)
+ slogger.Info("Failed to create consumer: %s", err.Error())
+ delete(ChannelMap, writerName)
+ Wg.Done()
+ return
}
fmt.Printf("Created Consumer %v\n", c)
err = c.SubscribeTopics(topics, nil)
- run := true
setUpPipeline := false
var hdfsFileWriter *hdfs.FileWriter
var hdfsFileWriterError error
// HDFS CLIENT CREATION
//client := utils.GetHdfsClientInstance(h.GetHdfsURL())
- client := utils.CreateHdfsClient(h.GetHdfsURL())
-
+ client := utils.CreateHdfsClient(h.HdfsURL)
- for run==true {
+ for {
select {
case sig := <-sigchan:
+ defer Wg.Done()
client.Close()
- if hdfsFileWriter!=nil{
+ if hdfsFileWriter != nil {
cleanup(hdfsFileWriter)
}
slogger.Infof("\nCaught signal %v: terminating the go-routine of writer :: %s\n", sig, writerName)
- run = false
+ close(sigchan)
+ return
default:
//slogger.Info("Running default option ....")
ev := c.Poll(100)
@@ -55,51 +87,52 @@ func BuildWriterPipeline(k utils.KafkaConfig, h utils.HdfsConfig, writerName str
continue
}
//:: BEGIN : Switch between different types of messages that come out of kafka
- switch e := ev.(type){
+ switch e := ev.(type) {
case *kafka.Message:
slogger.Infof("::: Message on %s\n%s\n", e.TopicPartition, e.Value)
dataStr := string(e.Value)
slogger.Infof("byte array ::: %s", []byte(dataStr))
- fileInfo, fileInfoError := client.Stat("/" + k.GetTopic())
+ fileInfo, fileInfoError := client.Stat("/" + k.Topic)
// create file if it doesnt exists already
if fileInfoError != nil {
- slogger.Infof("Error::: %s",fileInfoError)
- slogger.Infof("Creating file::: %s", "/"+k.GetTopic())
- hdfsFileWriterError = client.CreateEmptyFile("/"+k.GetTopic())
- if hdfsFileWriterError !=nil {
+ slogger.Infof("Error::: %s", fileInfoError)
+ slogger.Infof("Creating file::: %s", "/"+k.Topic)
+ hdfsFileWriterError = client.CreateEmptyFile("/" + k.Topic)
+ if hdfsFileWriterError != nil {
slogger.Infof("Creation of empty file ::: %s failed\n Error:: %s",
- "/"+k.GetTopic(), hdfsFileWriterError.Error())
- panic(fmt.Sprintf("Creation of empty file ::: %s failed", k.GetTopic()))
+ "/"+k.Topic, hdfsFileWriterError.Error())
+ continue
}
- _= client.Chmod("/"+k.GetTopic(), 0777);
+ _ = client.Chmod("/"+k.Topic, 0777)
}
newDataStr := dataStr + "\n"
// file exists case, so just append
- hdfsFileWriter, hdfsFileWriterError = client.Append("/"+fileInfo.Name())
-
- if hdfsFileWriterError != nil || hdfsFileWriter==nil{
- if(hdfsFileWriter==nil){
+ hdfsFileWriter, hdfsFileWriterError = client.Append("/" + fileInfo.Name())
+
+ if hdfsFileWriterError != nil {
+ if hdfsFileWriter == nil {
slogger.Infof("hdfsFileWriter is NULL !!")
}
slogger.Infof(":::Appending to file : %s failed:::\nError occured:::%s\n",
- "/"+k.GetTopic(),hdfsFileWriterError)
- panic(fmt.Sprintf("Appending to file : %s failed", k.GetTopic()))
- }
- bytesWritten, error := hdfsFileWriter.Write([]byte(newDataStr))
- if bytesWritten > 0 && error == nil {
- slogger.Infof("::: Wrote %s to HDFS:::", newDataStr)
- slogger.Infof("::: Wrote %d bytes to HDFS:::", bytesWritten)
-
- if setUpPipeline==false{
- slogger.Infof(" The pipeline with topic: %s and hdfs url %s is setup,"+
- "watching for more messages.. ",k.GetTopic(), h.GetHdfsURL())
- setUpPipeline = true
- }
+ "/"+k.Topic, hdfsFileWriterError)
+ continue
} else {
- slogger.Info("::: Unable to write to HDFS\n :::Error:: %s",error)
+ bytesWritten, error := hdfsFileWriter.Write([]byte(newDataStr))
+ if bytesWritten > 0 && error == nil {
+ slogger.Infof("::: Wrote %s to HDFS:::", newDataStr)
+ slogger.Infof("::: Wrote %d bytes to HDFS:::", bytesWritten)
+
+ if setUpPipeline == false {
+ slogger.Infof(" The pipeline with topic: %s and hdfs url %s is setup,"+
+ "watching for more messages.. ", k.Topic, h.HdfsURL)
+ setUpPipeline = true
+ }
+ hdfsFileWriter.Close()
+ } else {
+ slogger.Info("::: Unable to write to HDFS\n :::Error:: %s", error)
+ }
}
- hdfsFileWriter.Close()
-
+
case kafka.Error:
// Errors should generally be considered
// informational, the client will try to
@@ -108,23 +141,33 @@ func BuildWriterPipeline(k utils.KafkaConfig, h utils.HdfsConfig, writerName str
// the application if all brokers are down.
fmt.Fprintf(os.Stderr, "%% Error: %v: %v\n", e.Code(), e)
if e.Code() == kafka.ErrAllBrokersDown {
- run = false
+ return
}
-
+
default:
fmt.Printf("Ignored %v\n", e)
} //:: END : Switch between different types of messages that come out of kafka
} // END: select channel
} // END : infinite loop
-
- fmt.Printf("Closing the consumer")
}
-func cleanup(h *hdfs.FileWriter){
- if h!=nil{
+func cleanup(h *hdfs.FileWriter) {
+ if h != nil {
err := h.Close()
- if err!=nil{
- fmt.Printf(":::Error occured while closing the hdfs writer::: \n%s", err.Error())
+ if err != nil {
+ fmt.Printf(":::Error occured while closing hdfs writer::: \n%s", err.Error())
}
+
}
-} \ No newline at end of file
+ fmt.Printf("\n:::Clean up executed ::: \n")
+}
+
+// DeletePipeline deletes a writer pipeline
+func DeletePipeline(writerName string) {
+ slogger.Infof("::Writer to be closed:: %s", writerName)
+ toBeClosedChannel := ChannelMap[writerName]
+ toBeClosedChannel <- true
+ // deleting the channel from ChannelMap after closure to
+ // avoid closing the closed channel
+ delete(ChannelMap, writerName)
+}
diff --git a/vnfs/DAaaS/microservices/GoApps/src/go-hdfs-writer/pkg/sample-rest-requests/createWriter.json b/vnfs/DAaaS/microservices/GoApps/src/go-hdfs-writer/pkg/sample-rest-requests/createWriter.json
index 9a41d91b..055dac1f 100644
--- a/vnfs/DAaaS/microservices/GoApps/src/go-hdfs-writer/pkg/sample-rest-requests/createWriter.json
+++ b/vnfs/DAaaS/microservices/GoApps/src/go-hdfs-writer/pkg/sample-rest-requests/createWriter.json
@@ -1,4 +1,4 @@
-{"writer": {
+{
"kafkaConfig": {
"broker": "kafka-cluster-kafka-bootstrap:9092",
"group": "grp1",
@@ -7,5 +7,4 @@
"hdfsConfig": {
"hdfs_url": "hdfs1-namenode:8020"
}
-}
} \ No newline at end of file
diff --git a/vnfs/DAaaS/microservices/GoApps/src/go-hdfs-writer/pkg/utils/hdfs-config.go b/vnfs/DAaaS/microservices/GoApps/src/go-hdfs-writer/pkg/utils/hdfs-config.go
deleted file mode 100644
index ac33bc6a..00000000
--- a/vnfs/DAaaS/microservices/GoApps/src/go-hdfs-writer/pkg/utils/hdfs-config.go
+++ /dev/null
@@ -1,37 +0,0 @@
-package utils
-
-import (
- "os"
-)
-
-// SetHdfsParametersByObjectMap set the value of the hdfs config parameters
-// and return HdfsConfig object
-func SetHdfsParametersByObjectMap(m map[string]interface{}) HdfsConfig{
-
- hc := HdfsConfig{}
- hc.hdfsURL = m["hdfs_url"].(string)
- return hc
-
-}
-
-// SetHdfsParametersByEnvVariables sets the hdfs parameters
-func SetHdfsParametersByEnvVariables() HdfsConfig {
-
- slogger := GetLoggerInstance()
- hdfsConfigObject := HdfsConfig{
- hdfsURL: os.Getenv("HDFS_URL"),
- }
- slogger.Infof("::hdfsURL:: %s", hdfsConfigObject.hdfsURL)
- return hdfsConfigObject
-
-}
-
-// HdfsConfig contains hdfs related config items
-type HdfsConfig struct {
- hdfsURL string
-}
-
-// GetHdfsURL returns HdfsURL
-func (h HdfsConfig) GetHdfsURL() string {
- return h.hdfsURL
-} \ No newline at end of file
diff --git a/vnfs/DAaaS/microservices/GoApps/src/go-hdfs-writer/pkg/utils/hdfsUtils.go b/vnfs/DAaaS/microservices/GoApps/src/go-hdfs-writer/pkg/utils/hdfsUtils.go
index 1a93a5ad..8edcec19 100644
--- a/vnfs/DAaaS/microservices/GoApps/src/go-hdfs-writer/pkg/utils/hdfsUtils.go
+++ b/vnfs/DAaaS/microservices/GoApps/src/go-hdfs-writer/pkg/utils/hdfsUtils.go
@@ -3,24 +3,9 @@ package utils
import (
"fmt"
"github.com/colinmarc/hdfs"
- //"sync"
- //"go.uber.org/zap"
)
-//var clientOnce sync.Once
-//var hdfsClient *hdfs.Client
-//var slogger *zap.SugaredLogger
-
-
-//GetHdfsClientInstance returns a singleton hdfsClient instance
-// func GetHdfsClientInstance(hdfsURL string) (*hdfs.Client){
-// clientOnce.Do(func(){
-// hdfsClient = createHdfsClient(hdfsURL)
-// })
-// return hdfsClient
-// }
-
//CreateHdfsClient creates a hdfs client and returns hdfs client
func CreateHdfsClient(hdfsURL string) (*hdfs.Client){
slogger := GetLoggerInstance()
diff --git a/vnfs/DAaaS/microservices/GoApps/src/go-hdfs-writer/pkg/utils/kafka-config.go b/vnfs/DAaaS/microservices/GoApps/src/go-hdfs-writer/pkg/utils/kafka-config.go
deleted file mode 100644
index 080bfd4b..00000000
--- a/vnfs/DAaaS/microservices/GoApps/src/go-hdfs-writer/pkg/utils/kafka-config.go
+++ /dev/null
@@ -1,55 +0,0 @@
-package utils
-
-
-import (
- "os"
-)
-
-// SetKafkaParametersByObjectMap sets the value of the kafka parameters
-// and sets the KafkaConfig object
-func SetKafkaParametersByObjectMap(m map[string]interface{}) KafkaConfig {
- kc := KafkaConfig{}
- kc.broker = m["broker"].(string)
- kc.group = m["group"].(string)
- kc.topic = m["topic"].(string)
-
- return kc
-}
-
-// SetKafkaParametersByEnvVariables sets the kafka parameters
-func SetKafkaParametersByEnvVariables() KafkaConfig {
- slogger := GetLoggerInstance()
-
- kafkaConfigObject := KafkaConfig{
- broker: os.Getenv("BROKER"),
- group: os.Getenv("GROUP"),
- topic: os.Getenv("TOPIC"),
- }
- slogger.Infof("::broker:: %s", kafkaConfigObject.broker)
- slogger.Infof("::group:: %s", kafkaConfigObject.group)
- slogger.Infof("::topic:: %s", kafkaConfigObject.topic)
-
- return kafkaConfigObject
-}
-
-// KafkaConfig contains all the config parameters needed for kafka. This can be extended over time
-type KafkaConfig struct {
- broker string
- group string
- topic string
-}
-
-// GetBroker returns kafka broker configured
-func (k KafkaConfig) GetBroker() string {
- return k.broker
-}
-
-// GetGroup returns kafka group configured
-func (k KafkaConfig) GetGroup() string {
- return k.group
-}
-
-// GetTopic returns kafka topic configured
-func (k KafkaConfig) GetTopic() string {
- return k.topic
-} \ No newline at end of file
diff --git a/vnfs/DAaaS/microservices/GoApps/src/go-hdfs-writer/pkg/utils/types.go b/vnfs/DAaaS/microservices/GoApps/src/go-hdfs-writer/pkg/utils/types.go
new file mode 100644
index 00000000..3db3e420
--- /dev/null
+++ b/vnfs/DAaaS/microservices/GoApps/src/go-hdfs-writer/pkg/utils/types.go
@@ -0,0 +1,19 @@
+package utils
+
+// Pipeline type represents a stucture of a general pipeline
+type Pipeline struct{
+ KafkaConfiguration KafkaConfig `json:"kafkaConfig"`
+ HdfsConfiguration HdfsConfig `json:"hdfsConfig"`
+}
+
+// HdfsConfig type represents the config items of HDFS
+type HdfsConfig struct {
+ HdfsURL string `json:"hdfs_url"`
+}
+
+// KafkaConfig type represents the config items of Kafka
+type KafkaConfig struct {
+ Broker string `json:"broker"`
+ Group string `json:"group"`
+ Topic string `json:"topic"`
+} \ No newline at end of file
diff --git a/vnfs/DAaaS/microservices/GoApps/src/go-hdfs-writer/skaffold.yaml b/vnfs/DAaaS/microservices/GoApps/src/go-hdfs-writer/skaffold.yaml
index c207967f..22e173f9 100644
--- a/vnfs/DAaaS/microservices/GoApps/src/go-hdfs-writer/skaffold.yaml
+++ b/vnfs/DAaaS/microservices/GoApps/src/go-hdfs-writer/skaffold.yaml
@@ -1,19 +1,15 @@
-apiVersion: skaffold/v1beta6
+apiVersion: skaffold/v1beta13
kind: Config
build:
+ artifacts:
+ - image: hdfs-writer
tagPolicy:
sha256: {}
- artifacts:
- - context: .
- image: hdfs-writer
- local:
- useBuildkit: false
- useDockerCLI: false
deploy:
kubectl:
manifests:
- - kubernetes-manifests/**
-profiles:
- - name: cloudbuild
- build:
- googleCloudBuild: {}
+ - kubernetes-manifests/configmap_hdfs.yaml
+ - kubernetes-manifests/configmap_kafka.yaml
+ - kubernetes-manifests/configmap_writer.yaml
+ - kubernetes-manifests/hdfs_writer_deployment.yaml
+ - kubernetes-manifests/hdfs_writer_service.yaml