From 53c9fab327d1d9a079154b01242cf0930c106989 Mon Sep 17 00:00:00 2001 From: Rajamohan Raj Date: Fri, 1 Nov 2019 00:30:30 +0000 Subject: First working draft of kafka for inference app Created a python based inference app which can query a given metrics for a given duration from kafka topic. Consumer runs on separate thread and doesnt interfere with the main app. Issue-ID: ONAPARC-528 Signed-off-by: Rajamohan Raj Change-Id: Ic84ea137b134385246bf11dee2ed6d34b593b956 --- .../src/producer/CustomKafkaProducer.py | 38 ++++++++++++++++++++++ .../src/producer/__init__.py | 0 2 files changed, 38 insertions(+) create mode 100644 vnfs/DAaaS/microservices/PythonApps/python-kafkaConsumer-inference-app/src/producer/CustomKafkaProducer.py create mode 100644 vnfs/DAaaS/microservices/PythonApps/python-kafkaConsumer-inference-app/src/producer/__init__.py (limited to 'vnfs/DAaaS/microservices/PythonApps/python-kafkaConsumer-inference-app/src/producer') diff --git a/vnfs/DAaaS/microservices/PythonApps/python-kafkaConsumer-inference-app/src/producer/CustomKafkaProducer.py b/vnfs/DAaaS/microservices/PythonApps/python-kafkaConsumer-inference-app/src/producer/CustomKafkaProducer.py new file mode 100644 index 00000000..8f726bd9 --- /dev/null +++ b/vnfs/DAaaS/microservices/PythonApps/python-kafkaConsumer-inference-app/src/producer/CustomKafkaProducer.py @@ -0,0 +1,38 @@ +import logging +from confluent_kafka import Producer +import traceback + +logging.basicConfig(format='%(asctime)s::%(process)d::%(levelname)s::%(message)s', level=logging.INFO, datefmt='%d-%b-%y %H:%M:%S') + + +class CustomKafkaProducer: + def __init__(self): + self.topic_name = "metrics3" + #self.topic_name = "adatopic1" + conf = {'bootstrap.servers': 'kafka-cluster-kafka-bootstrap:9092' + } + self.producer = Producer(**conf) + + + def produce(self, kafka_msg, kafka_key): + try: + self.producer.produce(topic=self.topic_name, + value=kafka_msg, + key=kafka_key, + callback=lambda err, msg: self.on_delivery(err, msg) + ) + self.producer.flush() + + except Exception as e: + #print("Error during producing to kafka topic. Stacktrace is %s",e) + logging.error("Error during producing to kafka topic.") + traceback.print_exc() + + + def on_delivery(self, err, msg): + if err: + print("Message failed delivery, error: %s", err) + logging.error('%s raised an error', err) + else: + logging.info("Message delivered to %s on partition %s", + msg.topic(), msg.partition()) \ No newline at end of file diff --git a/vnfs/DAaaS/microservices/PythonApps/python-kafkaConsumer-inference-app/src/producer/__init__.py b/vnfs/DAaaS/microservices/PythonApps/python-kafkaConsumer-inference-app/src/producer/__init__.py new file mode 100644 index 00000000..e69de29b -- cgit