diff options
author | Rajamohan Raj <rajamohan.raj@intel.com> | 2019-11-01 00:30:30 +0000 |
---|---|---|
committer | Marco Platania <platania@research.att.com> | 2019-11-14 14:10:09 +0000 |
commit | 53c9fab327d1d9a079154b01242cf0930c106989 (patch) | |
tree | 4cc6bc25a290c6ac3e6028122192de1dd9493646 /vnfs/DAaaS/microservices/PythonApps/python-kafkaConsumer-inference-app/src/main.py | |
parent | fb9b7baa506e5c92bc243a30364e9f72ecd9c3f1 (diff) |
First working draft of kafka for inference app
Created a python based inference app which can query a given metrics for
a given duration from kafka topic.
Consumer runs on separate thread and doesnt interfere with the main app.
Issue-ID: ONAPARC-528
Signed-off-by: Rajamohan Raj <rajamohan.raj@intel.com>
Change-Id: Ic84ea137b134385246bf11dee2ed6d34b593b956
Diffstat (limited to 'vnfs/DAaaS/microservices/PythonApps/python-kafkaConsumer-inference-app/src/main.py')
-rwxr-xr-x | vnfs/DAaaS/microservices/PythonApps/python-kafkaConsumer-inference-app/src/main.py | 47 |
1 files changed, 47 insertions, 0 deletions
diff --git a/vnfs/DAaaS/microservices/PythonApps/python-kafkaConsumer-inference-app/src/main.py b/vnfs/DAaaS/microservices/PythonApps/python-kafkaConsumer-inference-app/src/main.py new file mode 100755 index 00000000..bf62f50f --- /dev/null +++ b/vnfs/DAaaS/microservices/PythonApps/python-kafkaConsumer-inference-app/src/main.py @@ -0,0 +1,47 @@ +#!/usr/bin/env python3 + +# from .consumer.CustomKafkaConsumer import CustomKafkaConsumer +# from .producer.CustomKafkaProducer import CustomKafkaProducer + +import sys +import os, threading +import traceback +import json +import concurrent.futures +import logging + +from consumer import CustomKafkaConsumer +from producer import CustomKafkaProducer + +logging.basicConfig(format='%(asctime)s::%(process)d::%(levelname)s::%(message)s', level=logging.INFO, datefmt='%d-%b-%y %H:%M:%S') + +def main(): + #Begin: Sample producer based on file + customKafkaProducer = CustomKafkaProducer.CustomKafkaProducer() + with open("./multithreading-metrics.json") as input_file: + for each_line in input_file: + python_obj = json.loads(each_line) + # print(python_obj["labels"]["__name__"]) + customKafkaProducer.produce(each_line, python_obj["labels"]["__name__"]) + #END: Sample producer based on file + + customKafkaConsumer = CustomKafkaConsumer.CustomKafkaConsumer() + + #Form a data structure for query formation + queries = [] + queries.append({"metric_name" : "go_gc_duration_seconds_count", "ip": "10.42.1.93:8686"}) + queries.append({"metric_name" : 'go_gc_duration_seconds_count', "ip": "10.42.1.92:8686"}) + + executor = concurrent.futures.ThreadPoolExecutor(max_workers=1) + executor.submit(customKafkaConsumer.consume) + + while(True): + for each_record in queries: + list_of_records = customKafkaConsumer.executeQuery(each_record["metric_name"], each_record["ip"]) + logging.info("The records collected :: {}".format(list_of_records)) + logging.info("The length of records collected: {}".format(len(list_of_records))) + print("The records :: {}".format(list_of_records)) + + +if __name__ == '__main__': + main()
\ No newline at end of file |