aboutsummaryrefslogtreecommitdiffstats
path: root/vnfs
diff options
context:
space:
mode:
Diffstat (limited to 'vnfs')
-rw-r--r--vnfs/DAaaS/microservices/GoApps/src/go-hdfs-writer/Dockerfile2
-rw-r--r--vnfs/DAaaS/microservices/GoApps/src/go-hdfs-writer/cmd/hdfs-writer/main.go20
-rw-r--r--vnfs/DAaaS/microservices/GoApps/src/go-hdfs-writer/pkg/handler/handler.go154
-rw-r--r--vnfs/DAaaS/microservices/GoApps/src/go-hdfs-writer/pkg/pipeline/pipeline.go153
-rw-r--r--vnfs/DAaaS/microservices/GoApps/src/go-hdfs-writer/pkg/sample-rest-requests/createWriter.json3
-rw-r--r--vnfs/DAaaS/microservices/GoApps/src/go-hdfs-writer/pkg/utils/hdfs-config.go37
-rw-r--r--vnfs/DAaaS/microservices/GoApps/src/go-hdfs-writer/pkg/utils/hdfsUtils.go15
-rw-r--r--vnfs/DAaaS/microservices/GoApps/src/go-hdfs-writer/pkg/utils/kafka-config.go55
-rw-r--r--vnfs/DAaaS/microservices/GoApps/src/go-hdfs-writer/pkg/utils/types.go19
-rw-r--r--vnfs/DAaaS/microservices/GoApps/src/go-hdfs-writer/skaffold.yaml20
10 files changed, 225 insertions, 253 deletions
diff --git a/vnfs/DAaaS/microservices/GoApps/src/go-hdfs-writer/Dockerfile b/vnfs/DAaaS/microservices/GoApps/src/go-hdfs-writer/Dockerfile
index ee476717..dd2b475d 100644
--- a/vnfs/DAaaS/microservices/GoApps/src/go-hdfs-writer/Dockerfile
+++ b/vnfs/DAaaS/microservices/GoApps/src/go-hdfs-writer/Dockerfile
@@ -7,7 +7,7 @@ RUN go get -u github.com/go-delve/delve/cmd/dlv
WORKDIR /src/hdfs-writer
RUN mkdir /librdkafka-dir && cd /librdkafka-dir
-RUN git clone https://github.com/edenhill/librdkafka.git && \
+RUN git clone https://github.com/edenhill/librdkafka.git && \
cd librdkafka && \
./configure --prefix /usr && \
make && \
diff --git a/vnfs/DAaaS/microservices/GoApps/src/go-hdfs-writer/cmd/hdfs-writer/main.go b/vnfs/DAaaS/microservices/GoApps/src/go-hdfs-writer/cmd/hdfs-writer/main.go
index a79a3e06..4f3cfcbc 100644
--- a/vnfs/DAaaS/microservices/GoApps/src/go-hdfs-writer/cmd/hdfs-writer/main.go
+++ b/vnfs/DAaaS/microservices/GoApps/src/go-hdfs-writer/cmd/hdfs-writer/main.go
@@ -6,9 +6,9 @@ import (
"net/http"
"os"
"os/signal"
- "time"
handler "hdfs-writer/pkg/handler"
+ pipeline "hdfs-writer/pkg/pipeline"
utils "hdfs-writer/pkg/utils"
)
@@ -22,23 +22,24 @@ func main() {
Addr: ":9393",
}
- connectionsClose := make(chan struct{})
+ connectionsClose := make(chan bool)
go func() {
c := make(chan os.Signal, 1)
signal.Notify(c, os.Interrupt)
<-c // function literal waiting to receive Interrupt signal
fmt.Printf(":::Got the kill signal:::")
slogger.Info(":::Got the kill signal:::")
- for eachWriter, eachChannel := range handler.ChannelMap {
+ for eachWriter, eachChannel := range pipeline.ChannelMap {
+ slogger.Info("Begin:: Closing writer goroutines::")
slogger.Infof("Closing writer goroutine :: %s", eachWriter)
- slogger.Infof("eachChannel:: %v", eachChannel)
- close(eachChannel)
- // This wait time ensures that the each of the channel is killed before
- // main routine finishes.
- time.Sleep(time.Second * 5)
+ delete(pipeline.ChannelMap, eachWriter)
+ eachChannel <- true
}
- //once all goroutines are signalled, send close to main thread
+
httpServer.Shutdown(context.Background())
+ /*once all goroutines are signalled and httpServer is shutdown,
+ send close to main thread */
+ connectionsClose <- true
close(connectionsClose)
}()
@@ -47,5 +48,6 @@ func main() {
if err != nil && err != http.ErrServerClosed {
slogger.Fatal(err)
}
+ pipeline.Wg.Wait()
<-connectionsClose //main thread waiting to receive close signal
}
diff --git a/vnfs/DAaaS/microservices/GoApps/src/go-hdfs-writer/pkg/handler/handler.go b/vnfs/DAaaS/microservices/GoApps/src/go-hdfs-writer/pkg/handler/handler.go
index 65021b4a..568cb8c6 100644
--- a/vnfs/DAaaS/microservices/GoApps/src/go-hdfs-writer/pkg/handler/handler.go
+++ b/vnfs/DAaaS/microservices/GoApps/src/go-hdfs-writer/pkg/handler/handler.go
@@ -1,96 +1,116 @@
package handler
-
import (
- "fmt"
- "net/http"
- "io/ioutil"
"encoding/json"
+ "fmt"
"github.com/gorilla/mux"
+ "io/ioutil"
+ "net/http"
+ "strings"
- guuid "github.com/google/uuid"
pipeline "hdfs-writer/pkg/pipeline"
utils "hdfs-writer/pkg/utils"
)
-
var slogger = utils.GetLoggerInstance()
-// ChannelMap is the global map to store writerNames as key and channels as values.
-var ChannelMap =make(map[string]chan struct{})
-
-
-// This is a sample test request handler
-func testFunc(w http.ResponseWriter, r *http.Request){
- slogger.Info("Invoking testFunc ...")
- w.WriteHeader(http.StatusOK)
- fmt.Fprintln(w,"HTTP Test successful ")
-}
// CreateRouter returns a http handler for the registered URLs
-func CreateRouter() http.Handler{
+func CreateRouter() http.Handler {
router := mux.NewRouter().StrictSlash(true)
slogger.Info("Created router ...")
- router.HandleFunc("/test", testFunc).Methods("GET")
- router.HandleFunc("/createWriter", createWriter).Methods("POST")
- router.HandleFunc("/deleteWriter/{writerName}", deleteWriter).Methods("DELETE")
+ router.HandleFunc("/v1/writer", createWriter).Methods("POST")
+ router.HandleFunc("/v1/writer/{writerName}", deleteWriter).Methods("DELETE")
+ router.HandleFunc("/v1/writers", getAllWriters).Methods("GET")
return router
}
-
// CreateWriter creates a pipeline
-func createWriter(w http.ResponseWriter, r *http.Request){
- reqBody, _ := ioutil.ReadAll(r.Body)
+func createWriter(w http.ResponseWriter, r *http.Request) {
+ if r.Body == nil {
+ http.Error(w, "Empty body", http.StatusBadRequest)
+ return
+ }
+ reqBody, err := ioutil.ReadAll(r.Body)
+ if err != nil {
+ http.Error(w, err.Error(), http.StatusUnprocessableEntity)
+ return
+ }
slogger.Info(string(reqBody))
- var results map[string]interface{}
- json.Unmarshal(reqBody, &results)
- if len(results)==0{
- slogger.Fatalf("Unable to read from the config json file, unable to create configObject map")
+ var results utils.Pipeline
+ error := json.Unmarshal(reqBody, &results)
+ if error != nil {
+ unableToParse := fmt.Sprintf("Could not unmarshal the JSON in create request :: %s", err.Error())
+ fmt.Fprintln(w, unableToParse)
+ return
}
- writerStr := "writer"
- writer := results[writerStr].(map[string]interface{})
- kafkaConfigMapObj := writer["kafkaConfig"].(map[string]interface{})
- hdfsConfigObj := writer["hdfsConfig"].(map[string]interface{})
-
- kc := utils.SetKafkaParametersByObjectMap(kafkaConfigMapObj)
- hc := utils.SetHdfsParametersByObjectMap(hdfsConfigObj)
-
- //populate the channelMap
- pipelineChan := make(chan struct{})
- slogger.Infof("Channel created by post :: %v", pipelineChan)
- uuid := guuid.New().String()
- //slogger.Infof("guuid :: %s",uuid)
- slogger.Infof(":: Storing writerName and channel in ChannelMap :: ")
- writerName := writerStr+"-"+uuid[len(uuid)-4:]
- slogger.Infof("::writerName:: %s ",writerName)
- ChannelMap[writerName] = pipelineChan
-
- // envoke the go routine to build pipeline
- go pipeline.BuildWriterPipeline(kc,hc, writerName, ChannelMap[writerName])
+ if validateKafkaConfig(results.KafkaConfiguration) == false {
+ http.Error(w, "Validation failed for kafka config items, check logs ..", http.StatusBadRequest)
+ return
+ }
+ if validateHdfsConfig(results.HdfsConfiguration) == false {
+ http.Error(w, "Validation failed for hdfs config items, check logs ..", http.StatusBadRequest)
+ return
+ }
+ writerName := pipeline.CreatePipeline(results.KafkaConfiguration, results.HdfsConfiguration)
successMessage := fmt.Sprintf("Created the writer ::%s", writerName)
- w.WriteHeader(http.StatusOK)
- fmt.Fprintln(w,successMessage)
+ w.Header().Set("Content-Type", "application/json")
+ w.WriteHeader(http.StatusCreated)
+ fmt.Fprintln(w, successMessage)
}
-
// deleteWriter deletes a given writer pipeline
-func deleteWriter(w http.ResponseWriter, r *http.Request){
+func deleteWriter(w http.ResponseWriter, r *http.Request) {
vars := mux.Vars(r)
writerName := vars["writerName"]
- if _, keyExists := ChannelMap[writerName]; keyExists{
- slogger.Infof("::Writer to be closed:: %s",writerName)
- toBeClosedChannel := ChannelMap[writerName]
- close(toBeClosedChannel)
- // deleting the channel from ChannelMap after closure to
- // avoid closing the closed channel
- delete(ChannelMap, writerName)
-
+ if _, keyExists := pipeline.ChannelMap[writerName]; keyExists {
+ pipeline.DeletePipeline(writerName)
w.WriteHeader(http.StatusOK)
- deleteMessage := fmt.Sprintf("Deleted writer :: %s",writerName)
- fmt.Fprintln(w,deleteMessage)
-
- }else{
- notFoundMessage := fmt.Sprintf("Could not find writer :: %s",writerName)
- fmt.Fprintln(w,notFoundMessage)
+ deleteMessage := fmt.Sprintf("Deleted writer :: %s", writerName)
+ fmt.Fprintln(w, deleteMessage)
+ } else {
+ notFoundMessage := fmt.Sprintf("Could not find writer :: %s", writerName)
+ fmt.Fprintln(w, notFoundMessage)
+
+ }
+}
+
+// validateKafkaConfig validates the kafka config items and returns true if they are valid.
+func validateKafkaConfig(k utils.KafkaConfig) bool {
+ if strings.TrimSpace(k.Broker) == "" {
+ fmt.Println("Broker is empty!")
+ slogger.Infof("Broker is empty!")
+ return false
+ }
+ if strings.TrimSpace(k.Group) == "" {
+ fmt.Println("Group is empty!")
+ slogger.Infof("Group is empty!")
+ return false
}
-
-} \ No newline at end of file
+ if strings.TrimSpace(k.Topic) == "" {
+ fmt.Println("Topic is empty!")
+ slogger.Infof("Topic is empty!")
+ return false
+ }
+ return true
+}
+
+// validateHdfsConfig validates the kafka config items and returns true if they are valid.
+func validateHdfsConfig(h utils.HdfsConfig) bool {
+ if strings.TrimSpace(h.HdfsURL) == "" {
+ fmt.Println("HdfsURL is empty!")
+ return false
+ }
+ return true
+}
+
+// getAllWriters list down the active writers
+func getAllWriters(w http.ResponseWriter, r *http.Request) {
+ slogger.Info("Listing all the writers ...")
+ var listOfWriters []string
+ for k := range pipeline.ChannelMap {
+ listOfWriters = append(listOfWriters, k)
+ }
+ w.Header().Set("Content-Type", "application/json")
+ w.WriteHeader(http.StatusOK)
+ w.Write([]byte(fmt.Sprintf(`{"Writers" : "%v"}`, listOfWriters)))
+}
diff --git a/vnfs/DAaaS/microservices/GoApps/src/go-hdfs-writer/pkg/pipeline/pipeline.go b/vnfs/DAaaS/microservices/GoApps/src/go-hdfs-writer/pkg/pipeline/pipeline.go
index c5dbd3cd..2e192e99 100644
--- a/vnfs/DAaaS/microservices/GoApps/src/go-hdfs-writer/pkg/pipeline/pipeline.go
+++ b/vnfs/DAaaS/microservices/GoApps/src/go-hdfs-writer/pkg/pipeline/pipeline.go
@@ -2,52 +2,84 @@ package pipeline
import (
"fmt"
- "os"
"github.com/colinmarc/hdfs"
"github.com/confluentinc/confluent-kafka-go/kafka"
+ guuid "github.com/google/uuid"
+ "os"
+ "sync"
+
utils "hdfs-writer/pkg/utils"
-
)
-// BuildWriterPipeline builds a pipeline
-func BuildWriterPipeline(k utils.KafkaConfig, h utils.HdfsConfig, writerName string, sigchan chan struct{}) {
- slogger := utils.GetLoggerInstance()
+var slogger = utils.GetLoggerInstance()
+
+// ChannelMap is the global map to store writerNames as key and channels as values.
+//var ChannelMap =make(map[string]chan struct{})
+var ChannelMap = make(map[string]chan bool)
+
+// Wg is of type WaitGroup ensures all the writers have enough time to cleanup a
+var Wg sync.WaitGroup
+var writerStr = "writer"
+
+// CreatePipeline initiates the building of a pipeline
+func CreatePipeline(kc utils.KafkaConfig, hc utils.HdfsConfig) string {
+ //pipelineChan := make(chan struct{})
+ pipelineChan := make(chan bool)
+ uuid := guuid.New().String()
+ slogger.Infof(":: Storing writerName and channel in ChannelMap :: ")
+ writerName := writerStr + "-" + uuid[len(uuid)-4:]
+ slogger.Infof("::writerName:: %s ", writerName)
+ ChannelMap[writerName] = pipelineChan
+
+ //Every create request shall add 1 to the WaitGroup
+ Wg.Add(1)
+ // envoke the go routine to build pipeline
+ go buildWriterPipeline(kc, hc, writerName, ChannelMap[writerName])
+ return writerName
+}
+
+// buildWriterPipeline builds a pipeline
+func buildWriterPipeline(k utils.KafkaConfig, h utils.HdfsConfig, writerName string, sigchan chan bool) {
+
topics := make([]string, 1)
- topics[0] = k.GetTopic()
-
- c,err := kafka.NewConsumer(&kafka.ConfigMap{
- "bootstrap.servers": k.GetBroker(),
+ topics[0] = k.Topic
+
+ c, err := kafka.NewConsumer(&kafka.ConfigMap{
+ "bootstrap.servers": k.Broker,
"broker.address.family": "v4",
- "group.id": k.GetGroup(),
+ "group.id": k.Group,
"session.timeout.ms": 6000,
"auto.offset.reset": "earliest"})
if err != nil {
fmt.Fprintf(os.Stderr, "Failed to create consumer: %s\n", err)
- os.Exit(1)
+ slogger.Info("Failed to create consumer: %s", err.Error())
+ delete(ChannelMap, writerName)
+ Wg.Done()
+ return
}
fmt.Printf("Created Consumer %v\n", c)
err = c.SubscribeTopics(topics, nil)
- run := true
setUpPipeline := false
var hdfsFileWriter *hdfs.FileWriter
var hdfsFileWriterError error
// HDFS CLIENT CREATION
//client := utils.GetHdfsClientInstance(h.GetHdfsURL())
- client := utils.CreateHdfsClient(h.GetHdfsURL())
-
+ client := utils.CreateHdfsClient(h.HdfsURL)
- for run==true {
+ for {
select {
case sig := <-sigchan:
+ defer Wg.Done()
client.Close()
- if hdfsFileWriter!=nil{
+ if hdfsFileWriter != nil {
cleanup(hdfsFileWriter)
}
slogger.Infof("\nCaught signal %v: terminating the go-routine of writer :: %s\n", sig, writerName)
- run = false
+ close(sigchan)
+ return
default:
//slogger.Info("Running default option ....")
ev := c.Poll(100)
@@ -55,51 +87,52 @@ func BuildWriterPipeline(k utils.KafkaConfig, h utils.HdfsConfig, writerName str
continue
}
//:: BEGIN : Switch between different types of messages that come out of kafka
- switch e := ev.(type){
+ switch e := ev.(type) {
case *kafka.Message:
slogger.Infof("::: Message on %s\n%s\n", e.TopicPartition, e.Value)
dataStr := string(e.Value)
slogger.Infof("byte array ::: %s", []byte(dataStr))
- fileInfo, fileInfoError := client.Stat("/" + k.GetTopic())
+ fileInfo, fileInfoError := client.Stat("/" + k.Topic)
// create file if it doesnt exists already
if fileInfoError != nil {
- slogger.Infof("Error::: %s",fileInfoError)
- slogger.Infof("Creating file::: %s", "/"+k.GetTopic())
- hdfsFileWriterError = client.CreateEmptyFile("/"+k.GetTopic())
- if hdfsFileWriterError !=nil {
+ slogger.Infof("Error::: %s", fileInfoError)
+ slogger.Infof("Creating file::: %s", "/"+k.Topic)
+ hdfsFileWriterError = client.CreateEmptyFile("/" + k.Topic)
+ if hdfsFileWriterError != nil {
slogger.Infof("Creation of empty file ::: %s failed\n Error:: %s",
- "/"+k.GetTopic(), hdfsFileWriterError.Error())
- panic(fmt.Sprintf("Creation of empty file ::: %s failed", k.GetTopic()))
+ "/"+k.Topic, hdfsFileWriterError.Error())
+ continue
}
- _= client.Chmod("/"+k.GetTopic(), 0777);
+ _ = client.Chmod("/"+k.Topic, 0777)
}
newDataStr := dataStr + "\n"
// file exists case, so just append
- hdfsFileWriter, hdfsFileWriterError = client.Append("/"+fileInfo.Name())
-
- if hdfsFileWriterError != nil || hdfsFileWriter==nil{
- if(hdfsFileWriter==nil){
+ hdfsFileWriter, hdfsFileWriterError = client.Append("/" + fileInfo.Name())
+
+ if hdfsFileWriterError != nil {
+ if hdfsFileWriter == nil {
slogger.Infof("hdfsFileWriter is NULL !!")
}
slogger.Infof(":::Appending to file : %s failed:::\nError occured:::%s\n",
- "/"+k.GetTopic(),hdfsFileWriterError)
- panic(fmt.Sprintf("Appending to file : %s failed", k.GetTopic()))
- }
- bytesWritten, error := hdfsFileWriter.Write([]byte(newDataStr))
- if bytesWritten > 0 && error == nil {
- slogger.Infof("::: Wrote %s to HDFS:::", newDataStr)
- slogger.Infof("::: Wrote %d bytes to HDFS:::", bytesWritten)
-
- if setUpPipeline==false{
- slogger.Infof(" The pipeline with topic: %s and hdfs url %s is setup,"+
- "watching for more messages.. ",k.GetTopic(), h.GetHdfsURL())
- setUpPipeline = true
- }
+ "/"+k.Topic, hdfsFileWriterError)
+ continue
} else {
- slogger.Info("::: Unable to write to HDFS\n :::Error:: %s",error)
+ bytesWritten, error := hdfsFileWriter.Write([]byte(newDataStr))
+ if bytesWritten > 0 && error == nil {
+ slogger.Infof("::: Wrote %s to HDFS:::", newDataStr)
+ slogger.Infof("::: Wrote %d bytes to HDFS:::", bytesWritten)
+
+ if setUpPipeline == false {
+ slogger.Infof(" The pipeline with topic: %s and hdfs url %s is setup,"+
+ "watching for more messages.. ", k.Topic, h.HdfsURL)
+ setUpPipeline = true
+ }
+ hdfsFileWriter.Close()
+ } else {
+ slogger.Info("::: Unable to write to HDFS\n :::Error:: %s", error)
+ }
}
- hdfsFileWriter.Close()
-
+
case kafka.Error:
// Errors should generally be considered
// informational, the client will try to
@@ -108,23 +141,33 @@ func BuildWriterPipeline(k utils.KafkaConfig, h utils.HdfsConfig, writerName str
// the application if all brokers are down.
fmt.Fprintf(os.Stderr, "%% Error: %v: %v\n", e.Code(), e)
if e.Code() == kafka.ErrAllBrokersDown {
- run = false
+ return
}
-
+
default:
fmt.Printf("Ignored %v\n", e)
} //:: END : Switch between different types of messages that come out of kafka
} // END: select channel
} // END : infinite loop
-
- fmt.Printf("Closing the consumer")
}
-func cleanup(h *hdfs.FileWriter){
- if h!=nil{
+func cleanup(h *hdfs.FileWriter) {
+ if h != nil {
err := h.Close()
- if err!=nil{
- fmt.Printf(":::Error occured while closing the hdfs writer::: \n%s", err.Error())
+ if err != nil {
+ fmt.Printf(":::Error occured while closing hdfs writer::: \n%s", err.Error())
}
+
}
-} \ No newline at end of file
+ fmt.Printf("\n:::Clean up executed ::: \n")
+}
+
+// DeletePipeline deletes a writer pipeline
+func DeletePipeline(writerName string) {
+ slogger.Infof("::Writer to be closed:: %s", writerName)
+ toBeClosedChannel := ChannelMap[writerName]
+ toBeClosedChannel <- true
+ // deleting the channel from ChannelMap after closure to
+ // avoid closing the closed channel
+ delete(ChannelMap, writerName)
+}
diff --git a/vnfs/DAaaS/microservices/GoApps/src/go-hdfs-writer/pkg/sample-rest-requests/createWriter.json b/vnfs/DAaaS/microservices/GoApps/src/go-hdfs-writer/pkg/sample-rest-requests/createWriter.json
index 9a41d91b..055dac1f 100644
--- a/vnfs/DAaaS/microservices/GoApps/src/go-hdfs-writer/pkg/sample-rest-requests/createWriter.json
+++ b/vnfs/DAaaS/microservices/GoApps/src/go-hdfs-writer/pkg/sample-rest-requests/createWriter.json
@@ -1,4 +1,4 @@
-{"writer": {
+{
"kafkaConfig": {
"broker": "kafka-cluster-kafka-bootstrap:9092",
"group": "grp1",
@@ -7,5 +7,4 @@
"hdfsConfig": {
"hdfs_url": "hdfs1-namenode:8020"
}
-}
} \ No newline at end of file
diff --git a/vnfs/DAaaS/microservices/GoApps/src/go-hdfs-writer/pkg/utils/hdfs-config.go b/vnfs/DAaaS/microservices/GoApps/src/go-hdfs-writer/pkg/utils/hdfs-config.go
deleted file mode 100644
index ac33bc6a..00000000
--- a/vnfs/DAaaS/microservices/GoApps/src/go-hdfs-writer/pkg/utils/hdfs-config.go
+++ /dev/null
@@ -1,37 +0,0 @@
-package utils
-
-import (
- "os"
-)
-
-// SetHdfsParametersByObjectMap set the value of the hdfs config parameters
-// and return HdfsConfig object
-func SetHdfsParametersByObjectMap(m map[string]interface{}) HdfsConfig{
-
- hc := HdfsConfig{}
- hc.hdfsURL = m["hdfs_url"].(string)
- return hc
-
-}
-
-// SetHdfsParametersByEnvVariables sets the hdfs parameters
-func SetHdfsParametersByEnvVariables() HdfsConfig {
-
- slogger := GetLoggerInstance()
- hdfsConfigObject := HdfsConfig{
- hdfsURL: os.Getenv("HDFS_URL"),
- }
- slogger.Infof("::hdfsURL:: %s", hdfsConfigObject.hdfsURL)
- return hdfsConfigObject
-
-}
-
-// HdfsConfig contains hdfs related config items
-type HdfsConfig struct {
- hdfsURL string
-}
-
-// GetHdfsURL returns HdfsURL
-func (h HdfsConfig) GetHdfsURL() string {
- return h.hdfsURL
-} \ No newline at end of file
diff --git a/vnfs/DAaaS/microservices/GoApps/src/go-hdfs-writer/pkg/utils/hdfsUtils.go b/vnfs/DAaaS/microservices/GoApps/src/go-hdfs-writer/pkg/utils/hdfsUtils.go
index 1a93a5ad..8edcec19 100644
--- a/vnfs/DAaaS/microservices/GoApps/src/go-hdfs-writer/pkg/utils/hdfsUtils.go
+++ b/vnfs/DAaaS/microservices/GoApps/src/go-hdfs-writer/pkg/utils/hdfsUtils.go
@@ -3,24 +3,9 @@ package utils
import (
"fmt"
"github.com/colinmarc/hdfs"
- //"sync"
- //"go.uber.org/zap"
)
-//var clientOnce sync.Once
-//var hdfsClient *hdfs.Client
-//var slogger *zap.SugaredLogger
-
-
-//GetHdfsClientInstance returns a singleton hdfsClient instance
-// func GetHdfsClientInstance(hdfsURL string) (*hdfs.Client){
-// clientOnce.Do(func(){
-// hdfsClient = createHdfsClient(hdfsURL)
-// })
-// return hdfsClient
-// }
-
//CreateHdfsClient creates a hdfs client and returns hdfs client
func CreateHdfsClient(hdfsURL string) (*hdfs.Client){
slogger := GetLoggerInstance()
diff --git a/vnfs/DAaaS/microservices/GoApps/src/go-hdfs-writer/pkg/utils/kafka-config.go b/vnfs/DAaaS/microservices/GoApps/src/go-hdfs-writer/pkg/utils/kafka-config.go
deleted file mode 100644
index 080bfd4b..00000000
--- a/vnfs/DAaaS/microservices/GoApps/src/go-hdfs-writer/pkg/utils/kafka-config.go
+++ /dev/null
@@ -1,55 +0,0 @@
-package utils
-
-
-import (
- "os"
-)
-
-// SetKafkaParametersByObjectMap sets the value of the kafka parameters
-// and sets the KafkaConfig object
-func SetKafkaParametersByObjectMap(m map[string]interface{}) KafkaConfig {
- kc := KafkaConfig{}
- kc.broker = m["broker"].(string)
- kc.group = m["group"].(string)
- kc.topic = m["topic"].(string)
-
- return kc
-}
-
-// SetKafkaParametersByEnvVariables sets the kafka parameters
-func SetKafkaParametersByEnvVariables() KafkaConfig {
- slogger := GetLoggerInstance()
-
- kafkaConfigObject := KafkaConfig{
- broker: os.Getenv("BROKER"),
- group: os.Getenv("GROUP"),
- topic: os.Getenv("TOPIC"),
- }
- slogger.Infof("::broker:: %s", kafkaConfigObject.broker)
- slogger.Infof("::group:: %s", kafkaConfigObject.group)
- slogger.Infof("::topic:: %s", kafkaConfigObject.topic)
-
- return kafkaConfigObject
-}
-
-// KafkaConfig contains all the config parameters needed for kafka. This can be extended over time
-type KafkaConfig struct {
- broker string
- group string
- topic string
-}
-
-// GetBroker returns kafka broker configured
-func (k KafkaConfig) GetBroker() string {
- return k.broker
-}
-
-// GetGroup returns kafka group configured
-func (k KafkaConfig) GetGroup() string {
- return k.group
-}
-
-// GetTopic returns kafka topic configured
-func (k KafkaConfig) GetTopic() string {
- return k.topic
-} \ No newline at end of file
diff --git a/vnfs/DAaaS/microservices/GoApps/src/go-hdfs-writer/pkg/utils/types.go b/vnfs/DAaaS/microservices/GoApps/src/go-hdfs-writer/pkg/utils/types.go
new file mode 100644
index 00000000..3db3e420
--- /dev/null
+++ b/vnfs/DAaaS/microservices/GoApps/src/go-hdfs-writer/pkg/utils/types.go
@@ -0,0 +1,19 @@
+package utils
+
+// Pipeline type represents a stucture of a general pipeline
+type Pipeline struct{
+ KafkaConfiguration KafkaConfig `json:"kafkaConfig"`
+ HdfsConfiguration HdfsConfig `json:"hdfsConfig"`
+}
+
+// HdfsConfig type represents the config items of HDFS
+type HdfsConfig struct {
+ HdfsURL string `json:"hdfs_url"`
+}
+
+// KafkaConfig type represents the config items of Kafka
+type KafkaConfig struct {
+ Broker string `json:"broker"`
+ Group string `json:"group"`
+ Topic string `json:"topic"`
+} \ No newline at end of file
diff --git a/vnfs/DAaaS/microservices/GoApps/src/go-hdfs-writer/skaffold.yaml b/vnfs/DAaaS/microservices/GoApps/src/go-hdfs-writer/skaffold.yaml
index c207967f..22e173f9 100644
--- a/vnfs/DAaaS/microservices/GoApps/src/go-hdfs-writer/skaffold.yaml
+++ b/vnfs/DAaaS/microservices/GoApps/src/go-hdfs-writer/skaffold.yaml
@@ -1,19 +1,15 @@
-apiVersion: skaffold/v1beta6
+apiVersion: skaffold/v1beta13
kind: Config
build:
+ artifacts:
+ - image: hdfs-writer
tagPolicy:
sha256: {}
- artifacts:
- - context: .
- image: hdfs-writer
- local:
- useBuildkit: false
- useDockerCLI: false
deploy:
kubectl:
manifests:
- - kubernetes-manifests/**
-profiles:
- - name: cloudbuild
- build:
- googleCloudBuild: {}
+ - kubernetes-manifests/configmap_hdfs.yaml
+ - kubernetes-manifests/configmap_kafka.yaml
+ - kubernetes-manifests/configmap_writer.yaml
+ - kubernetes-manifests/hdfs_writer_deployment.yaml
+ - kubernetes-manifests/hdfs_writer_service.yaml