diff options
author | Rajamohan Raj <rajamohan.raj@intel.com> | 2019-11-01 00:30:30 +0000 |
---|---|---|
committer | Marco Platania <platania@research.att.com> | 2019-11-14 14:10:09 +0000 |
commit | 53c9fab327d1d9a079154b01242cf0930c106989 (patch) | |
tree | 4cc6bc25a290c6ac3e6028122192de1dd9493646 /vnfs/DAaaS/microservices/PythonApps/python-kafkaConsumer-inference-app/src/producer | |
parent | fb9b7baa506e5c92bc243a30364e9f72ecd9c3f1 (diff) |
First working draft of kafka for inference app
Created a python based inference app which can query a given metrics for
a given duration from kafka topic.
Consumer runs on separate thread and doesnt interfere with the main app.
Issue-ID: ONAPARC-528
Signed-off-by: Rajamohan Raj <rajamohan.raj@intel.com>
Change-Id: Ic84ea137b134385246bf11dee2ed6d34b593b956
Diffstat (limited to 'vnfs/DAaaS/microservices/PythonApps/python-kafkaConsumer-inference-app/src/producer')
2 files changed, 38 insertions, 0 deletions
diff --git a/vnfs/DAaaS/microservices/PythonApps/python-kafkaConsumer-inference-app/src/producer/CustomKafkaProducer.py b/vnfs/DAaaS/microservices/PythonApps/python-kafkaConsumer-inference-app/src/producer/CustomKafkaProducer.py new file mode 100644 index 00000000..8f726bd9 --- /dev/null +++ b/vnfs/DAaaS/microservices/PythonApps/python-kafkaConsumer-inference-app/src/producer/CustomKafkaProducer.py @@ -0,0 +1,38 @@ +import logging +from confluent_kafka import Producer +import traceback + +logging.basicConfig(format='%(asctime)s::%(process)d::%(levelname)s::%(message)s', level=logging.INFO, datefmt='%d-%b-%y %H:%M:%S') + + +class CustomKafkaProducer: + def __init__(self): + self.topic_name = "metrics3" + #self.topic_name = "adatopic1" + conf = {'bootstrap.servers': 'kafka-cluster-kafka-bootstrap:9092' + } + self.producer = Producer(**conf) + + + def produce(self, kafka_msg, kafka_key): + try: + self.producer.produce(topic=self.topic_name, + value=kafka_msg, + key=kafka_key, + callback=lambda err, msg: self.on_delivery(err, msg) + ) + self.producer.flush() + + except Exception as e: + #print("Error during producing to kafka topic. Stacktrace is %s",e) + logging.error("Error during producing to kafka topic.") + traceback.print_exc() + + + def on_delivery(self, err, msg): + if err: + print("Message failed delivery, error: %s", err) + logging.error('%s raised an error', err) + else: + logging.info("Message delivered to %s on partition %s", + msg.topic(), msg.partition())
\ No newline at end of file diff --git a/vnfs/DAaaS/microservices/PythonApps/python-kafkaConsumer-inference-app/src/producer/__init__.py b/vnfs/DAaaS/microservices/PythonApps/python-kafkaConsumer-inference-app/src/producer/__init__.py new file mode 100644 index 00000000..e69de29b --- /dev/null +++ b/vnfs/DAaaS/microservices/PythonApps/python-kafkaConsumer-inference-app/src/producer/__init__.py |