#!/usr/bin/env python3

# from .consumer.CustomKafkaConsumer import CustomKafkaConsumer
# from .producer.CustomKafkaProducer import CustomKafkaProducer

import json
import concurrent.futures
import logging

from consumer import CustomKafkaConsumer
from producer import CustomKafkaProducer

logging.basicConfig(format='%(asctime)s::%(process)d::%(levelname)s::%(message)s', level=logging.INFO, datefmt='%d-%b-%y %H:%M:%S')

def main():
    #Begin: Sample producer based on file
    customKafkaProducer = CustomKafkaProducer.CustomKafkaProducer()
    with open("./multithreading-metrics.json") as input_file:
        for each_line in input_file:
            python_obj = json.loads(each_line)
            # print(python_obj["labels"]["__name__"])
            customKafkaProducer.produce(each_line, python_obj["labels"]["__name__"])
    #END: Sample producer based on file

    customKafkaConsumer = CustomKafkaConsumer.CustomKafkaConsumer()

    #Form a data structure for query formation
    queries = []
    queries.append({"metric_name" : "go_gc_duration_seconds_count", "ip": "10.42.1.93:8686"})
    queries.append({"metric_name" : 'go_gc_duration_seconds_count', "ip": "10.42.1.92:8686"})

    executor = concurrent.futures.ThreadPoolExecutor(max_workers=1)
    executor.submit(customKafkaConsumer.consume)

    while(True):
        for each_record in queries:
            list_of_records = customKafkaConsumer.executeQuery(each_record["metric_name"], each_record["ip"])
            logging.info("The records collected :: {}".format(list_of_records))
            logging.info("The length of records collected: {}".format(len(list_of_records)))
            print("The records :: {}".format(list_of_records))


if __name__ == '__main__':
    main()