package pipeline

import (
	"fmt"
	"github.com/colinmarc/hdfs"
	"github.com/confluentinc/confluent-kafka-go/kafka"
	guuid "github.com/google/uuid"
	"os"
	"sync"

	utils "hdfs-writer/pkg/utils"
)

var slogger = utils.GetLoggerInstance()

// ChannelMap is the global map to store writerNames as key and channels as values.
//var ChannelMap =make(map[string]chan struct{})
var ChannelMap = make(map[string]chan bool)

// Wg is of type WaitGroup ensures all the writers have enough time to cleanup a
var Wg sync.WaitGroup
var writerStr = "writer"

// CreatePipeline initiates the building of a pipeline
func CreatePipeline(kc utils.KafkaConfig, hc utils.HdfsConfig) string {
	//pipelineChan := make(chan struct{})
	pipelineChan := make(chan bool)
	uuid := guuid.New().String()
	slogger.Infof(":: Storing writerName and channel in ChannelMap :: ")
	writerName := writerStr + "-" + uuid[len(uuid)-4:]
	slogger.Infof("::writerName:: %s ", writerName)
	ChannelMap[writerName] = pipelineChan

	//Every create request shall add 1 to the WaitGroup
	Wg.Add(1)
	// envoke the go routine to build pipeline
	go buildWriterPipeline(kc, hc, writerName, ChannelMap[writerName])
	return writerName
}

// buildWriterPipeline builds a pipeline
func buildWriterPipeline(k utils.KafkaConfig, h utils.HdfsConfig, writerName string, sigchan chan bool) {

	topics := make([]string, 1)
	topics[0] = k.Topic

	c, err := kafka.NewConsumer(&kafka.ConfigMap{
		"bootstrap.servers":     k.Broker,
		"broker.address.family": "v4",
		"group.id":              k.Group,
		"session.timeout.ms":    6000,
		"auto.offset.reset":     "earliest"})

	if err != nil {
		fmt.Fprintf(os.Stderr, "Failed to create consumer: %s\n", err)
		slogger.Info("Failed to create consumer: %s", err.Error())
		delete(ChannelMap, writerName)
		Wg.Done()
		return
	}
	fmt.Printf("Created Consumer %v\n", c)
	err = c.SubscribeTopics(topics, nil)

	setUpPipeline := false

	var hdfsFileWriter *hdfs.FileWriter
	var hdfsFileWriterError error
	// HDFS CLIENT CREATION
	//client := utils.GetHdfsClientInstance(h.GetHdfsURL())
	client := utils.CreateHdfsClient(h.HdfsURL)

	for {
		select {
		case sig := <-sigchan:
			defer Wg.Done()
			client.Close()
			if hdfsFileWriter != nil {
				cleanup(hdfsFileWriter)
			}
			slogger.Infof("\nCaught signal %v: terminating the go-routine of writer :: %s\n", sig, writerName)
			close(sigchan)
			return
		default:
			//slogger.Info("Running default option ....")
			ev := c.Poll(100)
			if ev == nil {
				continue
			}
			//:: BEGIN : Switch between different types of messages that come out of kafka
			switch e := ev.(type) {
			case *kafka.Message:
				slogger.Infof("::: Message on %s\n%s\n", e.TopicPartition, e.Value)
				dataStr := string(e.Value)
				slogger.Infof("byte array ::: %s", []byte(dataStr))
				fileInfo, fileInfoError := client.Stat("/" + k.Topic)
				// create file if it doesnt exists already
				if fileInfoError != nil {
					slogger.Infof("Error::: %s", fileInfoError)
					slogger.Infof("Creating file::: %s", "/"+k.Topic)
					hdfsFileWriterError = client.CreateEmptyFile("/" + k.Topic)
					if hdfsFileWriterError != nil {
						slogger.Infof("Creation of empty file ::: %s failed\n Error:: %s",
							"/"+k.Topic, hdfsFileWriterError.Error())
						continue
					}
					_ = client.Chmod("/"+k.Topic, 0777)
				}
				newDataStr := dataStr + "\n"
				// file exists case, so just append
				hdfsFileWriter, hdfsFileWriterError = client.Append("/" + fileInfo.Name())

				if hdfsFileWriterError != nil {
					if hdfsFileWriter == nil {
						slogger.Infof("hdfsFileWriter is NULL !!")
					}
					slogger.Infof(":::Appending to file : %s failed:::\nError occured:::%s\n",
						"/"+k.Topic, hdfsFileWriterError)
					continue
				} else {
					bytesWritten, error := hdfsFileWriter.Write([]byte(newDataStr))
					if bytesWritten > 0 && error == nil {
						slogger.Infof("::: Wrote %s to HDFS:::", newDataStr)
						slogger.Infof("::: Wrote %d bytes to HDFS:::", bytesWritten)

						if setUpPipeline == false {
							slogger.Infof(" The pipeline with topic: %s and hdfs url %s is setup,"+
								"watching for more messages.. ", k.Topic, h.HdfsURL)
							setUpPipeline = true
						}
						hdfsFileWriter.Close()
					} else {
						slogger.Info("::: Unable to write to HDFS\n :::Error:: %s", error)
					}
				}

			case kafka.Error:
				// Errors should generally be considered
				// informational, the client will try to
				// automatically recover.
				// But in this example we choose to terminate
				// the application if all brokers are down.
				fmt.Fprintf(os.Stderr, "%% Error: %v: %v\n", e.Code(), e)
				if e.Code() == kafka.ErrAllBrokersDown {
					return
				}

			default:
				fmt.Printf("Ignored %v\n", e)
			} //:: END : Switch between different types of messages that come out of kafka
		} // END: select channel
	} // END : infinite loop
}

func cleanup(h *hdfs.FileWriter) {
	if h != nil {
		err := h.Close()
		if err != nil {
			fmt.Printf(":::Error occured while closing hdfs writer::: \n%s", err.Error())
		}

	}
	fmt.Printf("\n:::Clean up executed ::: \n")
}

// DeletePipeline deletes a writer pipeline
func DeletePipeline(writerName string) {
	slogger.Infof("::Writer to be closed:: %s", writerName)
	toBeClosedChannel := ChannelMap[writerName]
	toBeClosedChannel <- true
	// deleting the channel from ChannelMap after closure to
	// avoid closing the closed channel
	delete(ChannelMap, writerName)
}