summaryrefslogtreecommitdiffstats
path: root/vnfs/DAaaS/microservices/PythonApps/python-kafkaConsumer-inference-app/src/main.py
diff options
context:
space:
mode:
authorRajamohan Raj <rajamohan.raj@intel.com>2019-11-01 00:30:30 +0000
committerMarco Platania <platania@research.att.com>2019-11-14 14:10:09 +0000
commit53c9fab327d1d9a079154b01242cf0930c106989 (patch)
tree4cc6bc25a290c6ac3e6028122192de1dd9493646 /vnfs/DAaaS/microservices/PythonApps/python-kafkaConsumer-inference-app/src/main.py
parentfb9b7baa506e5c92bc243a30364e9f72ecd9c3f1 (diff)
First working draft of kafka for inference app
Created a python based inference app which can query a given metrics for a given duration from kafka topic. Consumer runs on separate thread and doesnt interfere with the main app. Issue-ID: ONAPARC-528 Signed-off-by: Rajamohan Raj <rajamohan.raj@intel.com> Change-Id: Ic84ea137b134385246bf11dee2ed6d34b593b956
Diffstat (limited to 'vnfs/DAaaS/microservices/PythonApps/python-kafkaConsumer-inference-app/src/main.py')
-rwxr-xr-xvnfs/DAaaS/microservices/PythonApps/python-kafkaConsumer-inference-app/src/main.py47
1 files changed, 47 insertions, 0 deletions
diff --git a/vnfs/DAaaS/microservices/PythonApps/python-kafkaConsumer-inference-app/src/main.py b/vnfs/DAaaS/microservices/PythonApps/python-kafkaConsumer-inference-app/src/main.py
new file mode 100755
index 00000000..bf62f50f
--- /dev/null
+++ b/vnfs/DAaaS/microservices/PythonApps/python-kafkaConsumer-inference-app/src/main.py
@@ -0,0 +1,47 @@
+#!/usr/bin/env python3
+
+# from .consumer.CustomKafkaConsumer import CustomKafkaConsumer
+# from .producer.CustomKafkaProducer import CustomKafkaProducer
+
+import sys
+import os, threading
+import traceback
+import json
+import concurrent.futures
+import logging
+
+from consumer import CustomKafkaConsumer
+from producer import CustomKafkaProducer
+
+logging.basicConfig(format='%(asctime)s::%(process)d::%(levelname)s::%(message)s', level=logging.INFO, datefmt='%d-%b-%y %H:%M:%S')
+
+def main():
+ #Begin: Sample producer based on file
+ customKafkaProducer = CustomKafkaProducer.CustomKafkaProducer()
+ with open("./multithreading-metrics.json") as input_file:
+ for each_line in input_file:
+ python_obj = json.loads(each_line)
+ # print(python_obj["labels"]["__name__"])
+ customKafkaProducer.produce(each_line, python_obj["labels"]["__name__"])
+ #END: Sample producer based on file
+
+ customKafkaConsumer = CustomKafkaConsumer.CustomKafkaConsumer()
+
+ #Form a data structure for query formation
+ queries = []
+ queries.append({"metric_name" : "go_gc_duration_seconds_count", "ip": "10.42.1.93:8686"})
+ queries.append({"metric_name" : 'go_gc_duration_seconds_count', "ip": "10.42.1.92:8686"})
+
+ executor = concurrent.futures.ThreadPoolExecutor(max_workers=1)
+ executor.submit(customKafkaConsumer.consume)
+
+ while(True):
+ for each_record in queries:
+ list_of_records = customKafkaConsumer.executeQuery(each_record["metric_name"], each_record["ip"])
+ logging.info("The records collected :: {}".format(list_of_records))
+ logging.info("The length of records collected: {}".format(len(list_of_records)))
+ print("The records :: {}".format(list_of_records))
+
+
+if __name__ == '__main__':
+ main() \ No newline at end of file