diff options
10 files changed, 225 insertions, 253 deletions
diff --git a/vnfs/DAaaS/microservices/GoApps/src/go-hdfs-writer/Dockerfile b/vnfs/DAaaS/microservices/GoApps/src/go-hdfs-writer/Dockerfile index ee476717..dd2b475d 100644 --- a/vnfs/DAaaS/microservices/GoApps/src/go-hdfs-writer/Dockerfile +++ b/vnfs/DAaaS/microservices/GoApps/src/go-hdfs-writer/Dockerfile @@ -7,7 +7,7 @@ RUN go get -u github.com/go-delve/delve/cmd/dlv WORKDIR /src/hdfs-writer RUN mkdir /librdkafka-dir && cd /librdkafka-dir -RUN git clone https://github.com/edenhill/librdkafka.git && \ +RUN git clone https://github.com/edenhill/librdkafka.git && \ cd librdkafka && \ ./configure --prefix /usr && \ make && \ diff --git a/vnfs/DAaaS/microservices/GoApps/src/go-hdfs-writer/cmd/hdfs-writer/main.go b/vnfs/DAaaS/microservices/GoApps/src/go-hdfs-writer/cmd/hdfs-writer/main.go index a79a3e06..4f3cfcbc 100644 --- a/vnfs/DAaaS/microservices/GoApps/src/go-hdfs-writer/cmd/hdfs-writer/main.go +++ b/vnfs/DAaaS/microservices/GoApps/src/go-hdfs-writer/cmd/hdfs-writer/main.go @@ -6,9 +6,9 @@ import ( "net/http" "os" "os/signal" - "time" handler "hdfs-writer/pkg/handler" + pipeline "hdfs-writer/pkg/pipeline" utils "hdfs-writer/pkg/utils" ) @@ -22,23 +22,24 @@ func main() { Addr: ":9393", } - connectionsClose := make(chan struct{}) + connectionsClose := make(chan bool) go func() { c := make(chan os.Signal, 1) signal.Notify(c, os.Interrupt) <-c // function literal waiting to receive Interrupt signal fmt.Printf(":::Got the kill signal:::") slogger.Info(":::Got the kill signal:::") - for eachWriter, eachChannel := range handler.ChannelMap { + for eachWriter, eachChannel := range pipeline.ChannelMap { + slogger.Info("Begin:: Closing writer goroutines::") slogger.Infof("Closing writer goroutine :: %s", eachWriter) - slogger.Infof("eachChannel:: %v", eachChannel) - close(eachChannel) - // This wait time ensures that the each of the channel is killed before - // main routine finishes. - time.Sleep(time.Second * 5) + delete(pipeline.ChannelMap, eachWriter) + eachChannel <- true } - //once all goroutines are signalled, send close to main thread + httpServer.Shutdown(context.Background()) + /*once all goroutines are signalled and httpServer is shutdown, + send close to main thread */ + connectionsClose <- true close(connectionsClose) }() @@ -47,5 +48,6 @@ func main() { if err != nil && err != http.ErrServerClosed { slogger.Fatal(err) } + pipeline.Wg.Wait() <-connectionsClose //main thread waiting to receive close signal } diff --git a/vnfs/DAaaS/microservices/GoApps/src/go-hdfs-writer/pkg/handler/handler.go b/vnfs/DAaaS/microservices/GoApps/src/go-hdfs-writer/pkg/handler/handler.go index 65021b4a..568cb8c6 100644 --- a/vnfs/DAaaS/microservices/GoApps/src/go-hdfs-writer/pkg/handler/handler.go +++ b/vnfs/DAaaS/microservices/GoApps/src/go-hdfs-writer/pkg/handler/handler.go @@ -1,96 +1,116 @@ package handler - import ( - "fmt" - "net/http" - "io/ioutil" "encoding/json" + "fmt" "github.com/gorilla/mux" + "io/ioutil" + "net/http" + "strings" - guuid "github.com/google/uuid" pipeline "hdfs-writer/pkg/pipeline" utils "hdfs-writer/pkg/utils" ) - var slogger = utils.GetLoggerInstance() -// ChannelMap is the global map to store writerNames as key and channels as values. -var ChannelMap =make(map[string]chan struct{}) - - -// This is a sample test request handler -func testFunc(w http.ResponseWriter, r *http.Request){ - slogger.Info("Invoking testFunc ...") - w.WriteHeader(http.StatusOK) - fmt.Fprintln(w,"HTTP Test successful ") -} // CreateRouter returns a http handler for the registered URLs -func CreateRouter() http.Handler{ +func CreateRouter() http.Handler { router := mux.NewRouter().StrictSlash(true) slogger.Info("Created router ...") - router.HandleFunc("/test", testFunc).Methods("GET") - router.HandleFunc("/createWriter", createWriter).Methods("POST") - router.HandleFunc("/deleteWriter/{writerName}", deleteWriter).Methods("DELETE") + router.HandleFunc("/v1/writer", createWriter).Methods("POST") + router.HandleFunc("/v1/writer/{writerName}", deleteWriter).Methods("DELETE") + router.HandleFunc("/v1/writers", getAllWriters).Methods("GET") return router } - // CreateWriter creates a pipeline -func createWriter(w http.ResponseWriter, r *http.Request){ - reqBody, _ := ioutil.ReadAll(r.Body) +func createWriter(w http.ResponseWriter, r *http.Request) { + if r.Body == nil { + http.Error(w, "Empty body", http.StatusBadRequest) + return + } + reqBody, err := ioutil.ReadAll(r.Body) + if err != nil { + http.Error(w, err.Error(), http.StatusUnprocessableEntity) + return + } slogger.Info(string(reqBody)) - var results map[string]interface{} - json.Unmarshal(reqBody, &results) - if len(results)==0{ - slogger.Fatalf("Unable to read from the config json file, unable to create configObject map") + var results utils.Pipeline + error := json.Unmarshal(reqBody, &results) + if error != nil { + unableToParse := fmt.Sprintf("Could not unmarshal the JSON in create request :: %s", err.Error()) + fmt.Fprintln(w, unableToParse) + return } - writerStr := "writer" - writer := results[writerStr].(map[string]interface{}) - kafkaConfigMapObj := writer["kafkaConfig"].(map[string]interface{}) - hdfsConfigObj := writer["hdfsConfig"].(map[string]interface{}) - - kc := utils.SetKafkaParametersByObjectMap(kafkaConfigMapObj) - hc := utils.SetHdfsParametersByObjectMap(hdfsConfigObj) - - //populate the channelMap - pipelineChan := make(chan struct{}) - slogger.Infof("Channel created by post :: %v", pipelineChan) - uuid := guuid.New().String() - //slogger.Infof("guuid :: %s",uuid) - slogger.Infof(":: Storing writerName and channel in ChannelMap :: ") - writerName := writerStr+"-"+uuid[len(uuid)-4:] - slogger.Infof("::writerName:: %s ",writerName) - ChannelMap[writerName] = pipelineChan - - // envoke the go routine to build pipeline - go pipeline.BuildWriterPipeline(kc,hc, writerName, ChannelMap[writerName]) + if validateKafkaConfig(results.KafkaConfiguration) == false { + http.Error(w, "Validation failed for kafka config items, check logs ..", http.StatusBadRequest) + return + } + if validateHdfsConfig(results.HdfsConfiguration) == false { + http.Error(w, "Validation failed for hdfs config items, check logs ..", http.StatusBadRequest) + return + } + writerName := pipeline.CreatePipeline(results.KafkaConfiguration, results.HdfsConfiguration) successMessage := fmt.Sprintf("Created the writer ::%s", writerName) - w.WriteHeader(http.StatusOK) - fmt.Fprintln(w,successMessage) + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(http.StatusCreated) + fmt.Fprintln(w, successMessage) } - // deleteWriter deletes a given writer pipeline -func deleteWriter(w http.ResponseWriter, r *http.Request){ +func deleteWriter(w http.ResponseWriter, r *http.Request) { vars := mux.Vars(r) writerName := vars["writerName"] - if _, keyExists := ChannelMap[writerName]; keyExists{ - slogger.Infof("::Writer to be closed:: %s",writerName) - toBeClosedChannel := ChannelMap[writerName] - close(toBeClosedChannel) - // deleting the channel from ChannelMap after closure to - // avoid closing the closed channel - delete(ChannelMap, writerName) - + if _, keyExists := pipeline.ChannelMap[writerName]; keyExists { + pipeline.DeletePipeline(writerName) w.WriteHeader(http.StatusOK) - deleteMessage := fmt.Sprintf("Deleted writer :: %s",writerName) - fmt.Fprintln(w,deleteMessage) - - }else{ - notFoundMessage := fmt.Sprintf("Could not find writer :: %s",writerName) - fmt.Fprintln(w,notFoundMessage) + deleteMessage := fmt.Sprintf("Deleted writer :: %s", writerName) + fmt.Fprintln(w, deleteMessage) + } else { + notFoundMessage := fmt.Sprintf("Could not find writer :: %s", writerName) + fmt.Fprintln(w, notFoundMessage) + + } +} + +// validateKafkaConfig validates the kafka config items and returns true if they are valid. +func validateKafkaConfig(k utils.KafkaConfig) bool { + if strings.TrimSpace(k.Broker) == "" { + fmt.Println("Broker is empty!") + slogger.Infof("Broker is empty!") + return false + } + if strings.TrimSpace(k.Group) == "" { + fmt.Println("Group is empty!") + slogger.Infof("Group is empty!") + return false } - -}
\ No newline at end of file + if strings.TrimSpace(k.Topic) == "" { + fmt.Println("Topic is empty!") + slogger.Infof("Topic is empty!") + return false + } + return true +} + +// validateHdfsConfig validates the kafka config items and returns true if they are valid. +func validateHdfsConfig(h utils.HdfsConfig) bool { + if strings.TrimSpace(h.HdfsURL) == "" { + fmt.Println("HdfsURL is empty!") + return false + } + return true +} + +// getAllWriters list down the active writers +func getAllWriters(w http.ResponseWriter, r *http.Request) { + slogger.Info("Listing all the writers ...") + var listOfWriters []string + for k := range pipeline.ChannelMap { + listOfWriters = append(listOfWriters, k) + } + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(http.StatusOK) + w.Write([]byte(fmt.Sprintf(`{"Writers" : "%v"}`, listOfWriters))) +} diff --git a/vnfs/DAaaS/microservices/GoApps/src/go-hdfs-writer/pkg/pipeline/pipeline.go b/vnfs/DAaaS/microservices/GoApps/src/go-hdfs-writer/pkg/pipeline/pipeline.go index c5dbd3cd..2e192e99 100644 --- a/vnfs/DAaaS/microservices/GoApps/src/go-hdfs-writer/pkg/pipeline/pipeline.go +++ b/vnfs/DAaaS/microservices/GoApps/src/go-hdfs-writer/pkg/pipeline/pipeline.go @@ -2,52 +2,84 @@ package pipeline import ( "fmt" - "os" "github.com/colinmarc/hdfs" "github.com/confluentinc/confluent-kafka-go/kafka" + guuid "github.com/google/uuid" + "os" + "sync" + utils "hdfs-writer/pkg/utils" - ) -// BuildWriterPipeline builds a pipeline -func BuildWriterPipeline(k utils.KafkaConfig, h utils.HdfsConfig, writerName string, sigchan chan struct{}) { - slogger := utils.GetLoggerInstance() +var slogger = utils.GetLoggerInstance() + +// ChannelMap is the global map to store writerNames as key and channels as values. +//var ChannelMap =make(map[string]chan struct{}) +var ChannelMap = make(map[string]chan bool) + +// Wg is of type WaitGroup ensures all the writers have enough time to cleanup a +var Wg sync.WaitGroup +var writerStr = "writer" + +// CreatePipeline initiates the building of a pipeline +func CreatePipeline(kc utils.KafkaConfig, hc utils.HdfsConfig) string { + //pipelineChan := make(chan struct{}) + pipelineChan := make(chan bool) + uuid := guuid.New().String() + slogger.Infof(":: Storing writerName and channel in ChannelMap :: ") + writerName := writerStr + "-" + uuid[len(uuid)-4:] + slogger.Infof("::writerName:: %s ", writerName) + ChannelMap[writerName] = pipelineChan + + //Every create request shall add 1 to the WaitGroup + Wg.Add(1) + // envoke the go routine to build pipeline + go buildWriterPipeline(kc, hc, writerName, ChannelMap[writerName]) + return writerName +} + +// buildWriterPipeline builds a pipeline +func buildWriterPipeline(k utils.KafkaConfig, h utils.HdfsConfig, writerName string, sigchan chan bool) { + topics := make([]string, 1) - topics[0] = k.GetTopic() - - c,err := kafka.NewConsumer(&kafka.ConfigMap{ - "bootstrap.servers": k.GetBroker(), + topics[0] = k.Topic + + c, err := kafka.NewConsumer(&kafka.ConfigMap{ + "bootstrap.servers": k.Broker, "broker.address.family": "v4", - "group.id": k.GetGroup(), + "group.id": k.Group, "session.timeout.ms": 6000, "auto.offset.reset": "earliest"}) if err != nil { fmt.Fprintf(os.Stderr, "Failed to create consumer: %s\n", err) - os.Exit(1) + slogger.Info("Failed to create consumer: %s", err.Error()) + delete(ChannelMap, writerName) + Wg.Done() + return } fmt.Printf("Created Consumer %v\n", c) err = c.SubscribeTopics(topics, nil) - run := true setUpPipeline := false var hdfsFileWriter *hdfs.FileWriter var hdfsFileWriterError error // HDFS CLIENT CREATION //client := utils.GetHdfsClientInstance(h.GetHdfsURL()) - client := utils.CreateHdfsClient(h.GetHdfsURL()) - + client := utils.CreateHdfsClient(h.HdfsURL) - for run==true { + for { select { case sig := <-sigchan: + defer Wg.Done() client.Close() - if hdfsFileWriter!=nil{ + if hdfsFileWriter != nil { cleanup(hdfsFileWriter) } slogger.Infof("\nCaught signal %v: terminating the go-routine of writer :: %s\n", sig, writerName) - run = false + close(sigchan) + return default: //slogger.Info("Running default option ....") ev := c.Poll(100) @@ -55,51 +87,52 @@ func BuildWriterPipeline(k utils.KafkaConfig, h utils.HdfsConfig, writerName str continue } //:: BEGIN : Switch between different types of messages that come out of kafka - switch e := ev.(type){ + switch e := ev.(type) { case *kafka.Message: slogger.Infof("::: Message on %s\n%s\n", e.TopicPartition, e.Value) dataStr := string(e.Value) slogger.Infof("byte array ::: %s", []byte(dataStr)) - fileInfo, fileInfoError := client.Stat("/" + k.GetTopic()) + fileInfo, fileInfoError := client.Stat("/" + k.Topic) // create file if it doesnt exists already if fileInfoError != nil { - slogger.Infof("Error::: %s",fileInfoError) - slogger.Infof("Creating file::: %s", "/"+k.GetTopic()) - hdfsFileWriterError = client.CreateEmptyFile("/"+k.GetTopic()) - if hdfsFileWriterError !=nil { + slogger.Infof("Error::: %s", fileInfoError) + slogger.Infof("Creating file::: %s", "/"+k.Topic) + hdfsFileWriterError = client.CreateEmptyFile("/" + k.Topic) + if hdfsFileWriterError != nil { slogger.Infof("Creation of empty file ::: %s failed\n Error:: %s", - "/"+k.GetTopic(), hdfsFileWriterError.Error()) - panic(fmt.Sprintf("Creation of empty file ::: %s failed", k.GetTopic())) + "/"+k.Topic, hdfsFileWriterError.Error()) + continue } - _= client.Chmod("/"+k.GetTopic(), 0777); + _ = client.Chmod("/"+k.Topic, 0777) } newDataStr := dataStr + "\n" // file exists case, so just append - hdfsFileWriter, hdfsFileWriterError = client.Append("/"+fileInfo.Name()) - - if hdfsFileWriterError != nil || hdfsFileWriter==nil{ - if(hdfsFileWriter==nil){ + hdfsFileWriter, hdfsFileWriterError = client.Append("/" + fileInfo.Name()) + + if hdfsFileWriterError != nil { + if hdfsFileWriter == nil { slogger.Infof("hdfsFileWriter is NULL !!") } slogger.Infof(":::Appending to file : %s failed:::\nError occured:::%s\n", - "/"+k.GetTopic(),hdfsFileWriterError) - panic(fmt.Sprintf("Appending to file : %s failed", k.GetTopic())) - } - bytesWritten, error := hdfsFileWriter.Write([]byte(newDataStr)) - if bytesWritten > 0 && error == nil { - slogger.Infof("::: Wrote %s to HDFS:::", newDataStr) - slogger.Infof("::: Wrote %d bytes to HDFS:::", bytesWritten) - - if setUpPipeline==false{ - slogger.Infof(" The pipeline with topic: %s and hdfs url %s is setup,"+ - "watching for more messages.. ",k.GetTopic(), h.GetHdfsURL()) - setUpPipeline = true - } + "/"+k.Topic, hdfsFileWriterError) + continue } else { - slogger.Info("::: Unable to write to HDFS\n :::Error:: %s",error) + bytesWritten, error := hdfsFileWriter.Write([]byte(newDataStr)) + if bytesWritten > 0 && error == nil { + slogger.Infof("::: Wrote %s to HDFS:::", newDataStr) + slogger.Infof("::: Wrote %d bytes to HDFS:::", bytesWritten) + + if setUpPipeline == false { + slogger.Infof(" The pipeline with topic: %s and hdfs url %s is setup,"+ + "watching for more messages.. ", k.Topic, h.HdfsURL) + setUpPipeline = true + } + hdfsFileWriter.Close() + } else { + slogger.Info("::: Unable to write to HDFS\n :::Error:: %s", error) + } } - hdfsFileWriter.Close() - + case kafka.Error: // Errors should generally be considered // informational, the client will try to @@ -108,23 +141,33 @@ func BuildWriterPipeline(k utils.KafkaConfig, h utils.HdfsConfig, writerName str // the application if all brokers are down. fmt.Fprintf(os.Stderr, "%% Error: %v: %v\n", e.Code(), e) if e.Code() == kafka.ErrAllBrokersDown { - run = false + return } - + default: fmt.Printf("Ignored %v\n", e) } //:: END : Switch between different types of messages that come out of kafka } // END: select channel } // END : infinite loop - - fmt.Printf("Closing the consumer") } -func cleanup(h *hdfs.FileWriter){ - if h!=nil{ +func cleanup(h *hdfs.FileWriter) { + if h != nil { err := h.Close() - if err!=nil{ - fmt.Printf(":::Error occured while closing the hdfs writer::: \n%s", err.Error()) + if err != nil { + fmt.Printf(":::Error occured while closing hdfs writer::: \n%s", err.Error()) } + } -}
\ No newline at end of file + fmt.Printf("\n:::Clean up executed ::: \n") +} + +// DeletePipeline deletes a writer pipeline +func DeletePipeline(writerName string) { + slogger.Infof("::Writer to be closed:: %s", writerName) + toBeClosedChannel := ChannelMap[writerName] + toBeClosedChannel <- true + // deleting the channel from ChannelMap after closure to + // avoid closing the closed channel + delete(ChannelMap, writerName) +} diff --git a/vnfs/DAaaS/microservices/GoApps/src/go-hdfs-writer/pkg/sample-rest-requests/createWriter.json b/vnfs/DAaaS/microservices/GoApps/src/go-hdfs-writer/pkg/sample-rest-requests/createWriter.json index 9a41d91b..055dac1f 100644 --- a/vnfs/DAaaS/microservices/GoApps/src/go-hdfs-writer/pkg/sample-rest-requests/createWriter.json +++ b/vnfs/DAaaS/microservices/GoApps/src/go-hdfs-writer/pkg/sample-rest-requests/createWriter.json @@ -1,4 +1,4 @@ -{"writer": { +{ "kafkaConfig": { "broker": "kafka-cluster-kafka-bootstrap:9092", "group": "grp1", @@ -7,5 +7,4 @@ "hdfsConfig": { "hdfs_url": "hdfs1-namenode:8020" } -} }
\ No newline at end of file diff --git a/vnfs/DAaaS/microservices/GoApps/src/go-hdfs-writer/pkg/utils/hdfs-config.go b/vnfs/DAaaS/microservices/GoApps/src/go-hdfs-writer/pkg/utils/hdfs-config.go deleted file mode 100644 index ac33bc6a..00000000 --- a/vnfs/DAaaS/microservices/GoApps/src/go-hdfs-writer/pkg/utils/hdfs-config.go +++ /dev/null @@ -1,37 +0,0 @@ -package utils - -import ( - "os" -) - -// SetHdfsParametersByObjectMap set the value of the hdfs config parameters -// and return HdfsConfig object -func SetHdfsParametersByObjectMap(m map[string]interface{}) HdfsConfig{ - - hc := HdfsConfig{} - hc.hdfsURL = m["hdfs_url"].(string) - return hc - -} - -// SetHdfsParametersByEnvVariables sets the hdfs parameters -func SetHdfsParametersByEnvVariables() HdfsConfig { - - slogger := GetLoggerInstance() - hdfsConfigObject := HdfsConfig{ - hdfsURL: os.Getenv("HDFS_URL"), - } - slogger.Infof("::hdfsURL:: %s", hdfsConfigObject.hdfsURL) - return hdfsConfigObject - -} - -// HdfsConfig contains hdfs related config items -type HdfsConfig struct { - hdfsURL string -} - -// GetHdfsURL returns HdfsURL -func (h HdfsConfig) GetHdfsURL() string { - return h.hdfsURL -}
\ No newline at end of file diff --git a/vnfs/DAaaS/microservices/GoApps/src/go-hdfs-writer/pkg/utils/hdfsUtils.go b/vnfs/DAaaS/microservices/GoApps/src/go-hdfs-writer/pkg/utils/hdfsUtils.go index 1a93a5ad..8edcec19 100644 --- a/vnfs/DAaaS/microservices/GoApps/src/go-hdfs-writer/pkg/utils/hdfsUtils.go +++ b/vnfs/DAaaS/microservices/GoApps/src/go-hdfs-writer/pkg/utils/hdfsUtils.go @@ -3,24 +3,9 @@ package utils import ( "fmt" "github.com/colinmarc/hdfs" - //"sync" - //"go.uber.org/zap" ) -//var clientOnce sync.Once -//var hdfsClient *hdfs.Client -//var slogger *zap.SugaredLogger - - -//GetHdfsClientInstance returns a singleton hdfsClient instance -// func GetHdfsClientInstance(hdfsURL string) (*hdfs.Client){ -// clientOnce.Do(func(){ -// hdfsClient = createHdfsClient(hdfsURL) -// }) -// return hdfsClient -// } - //CreateHdfsClient creates a hdfs client and returns hdfs client func CreateHdfsClient(hdfsURL string) (*hdfs.Client){ slogger := GetLoggerInstance() diff --git a/vnfs/DAaaS/microservices/GoApps/src/go-hdfs-writer/pkg/utils/kafka-config.go b/vnfs/DAaaS/microservices/GoApps/src/go-hdfs-writer/pkg/utils/kafka-config.go deleted file mode 100644 index 080bfd4b..00000000 --- a/vnfs/DAaaS/microservices/GoApps/src/go-hdfs-writer/pkg/utils/kafka-config.go +++ /dev/null @@ -1,55 +0,0 @@ -package utils - - -import ( - "os" -) - -// SetKafkaParametersByObjectMap sets the value of the kafka parameters -// and sets the KafkaConfig object -func SetKafkaParametersByObjectMap(m map[string]interface{}) KafkaConfig { - kc := KafkaConfig{} - kc.broker = m["broker"].(string) - kc.group = m["group"].(string) - kc.topic = m["topic"].(string) - - return kc -} - -// SetKafkaParametersByEnvVariables sets the kafka parameters -func SetKafkaParametersByEnvVariables() KafkaConfig { - slogger := GetLoggerInstance() - - kafkaConfigObject := KafkaConfig{ - broker: os.Getenv("BROKER"), - group: os.Getenv("GROUP"), - topic: os.Getenv("TOPIC"), - } - slogger.Infof("::broker:: %s", kafkaConfigObject.broker) - slogger.Infof("::group:: %s", kafkaConfigObject.group) - slogger.Infof("::topic:: %s", kafkaConfigObject.topic) - - return kafkaConfigObject -} - -// KafkaConfig contains all the config parameters needed for kafka. This can be extended over time -type KafkaConfig struct { - broker string - group string - topic string -} - -// GetBroker returns kafka broker configured -func (k KafkaConfig) GetBroker() string { - return k.broker -} - -// GetGroup returns kafka group configured -func (k KafkaConfig) GetGroup() string { - return k.group -} - -// GetTopic returns kafka topic configured -func (k KafkaConfig) GetTopic() string { - return k.topic -}
\ No newline at end of file diff --git a/vnfs/DAaaS/microservices/GoApps/src/go-hdfs-writer/pkg/utils/types.go b/vnfs/DAaaS/microservices/GoApps/src/go-hdfs-writer/pkg/utils/types.go new file mode 100644 index 00000000..3db3e420 --- /dev/null +++ b/vnfs/DAaaS/microservices/GoApps/src/go-hdfs-writer/pkg/utils/types.go @@ -0,0 +1,19 @@ +package utils + +// Pipeline type represents a stucture of a general pipeline +type Pipeline struct{ + KafkaConfiguration KafkaConfig `json:"kafkaConfig"` + HdfsConfiguration HdfsConfig `json:"hdfsConfig"` +} + +// HdfsConfig type represents the config items of HDFS +type HdfsConfig struct { + HdfsURL string `json:"hdfs_url"` +} + +// KafkaConfig type represents the config items of Kafka +type KafkaConfig struct { + Broker string `json:"broker"` + Group string `json:"group"` + Topic string `json:"topic"` +}
\ No newline at end of file diff --git a/vnfs/DAaaS/microservices/GoApps/src/go-hdfs-writer/skaffold.yaml b/vnfs/DAaaS/microservices/GoApps/src/go-hdfs-writer/skaffold.yaml index c207967f..22e173f9 100644 --- a/vnfs/DAaaS/microservices/GoApps/src/go-hdfs-writer/skaffold.yaml +++ b/vnfs/DAaaS/microservices/GoApps/src/go-hdfs-writer/skaffold.yaml @@ -1,19 +1,15 @@ -apiVersion: skaffold/v1beta6 +apiVersion: skaffold/v1beta13 kind: Config build: + artifacts: + - image: hdfs-writer tagPolicy: sha256: {} - artifacts: - - context: . - image: hdfs-writer - local: - useBuildkit: false - useDockerCLI: false deploy: kubectl: manifests: - - kubernetes-manifests/** -profiles: - - name: cloudbuild - build: - googleCloudBuild: {} + - kubernetes-manifests/configmap_hdfs.yaml + - kubernetes-manifests/configmap_kafka.yaml + - kubernetes-manifests/configmap_writer.yaml + - kubernetes-manifests/hdfs_writer_deployment.yaml + - kubernetes-manifests/hdfs_writer_service.yaml |