aboutsummaryrefslogtreecommitdiffstats
path: root/vnfs/DAaaS/microservices/PythonApps/python-kafkaConsumer-inference-app/src/producer/CustomKafkaProducer.py
diff options
context:
space:
mode:
authorRajamohan Raj <rajamohan.raj@intel.com>2019-11-01 00:30:30 +0000
committerMarco Platania <platania@research.att.com>2019-11-14 14:10:09 +0000
commit53c9fab327d1d9a079154b01242cf0930c106989 (patch)
tree4cc6bc25a290c6ac3e6028122192de1dd9493646 /vnfs/DAaaS/microservices/PythonApps/python-kafkaConsumer-inference-app/src/producer/CustomKafkaProducer.py
parentfb9b7baa506e5c92bc243a30364e9f72ecd9c3f1 (diff)
First working draft of kafka for inference app
Created a python based inference app which can query a given metrics for a given duration from kafka topic. Consumer runs on separate thread and doesnt interfere with the main app. Issue-ID: ONAPARC-528 Signed-off-by: Rajamohan Raj <rajamohan.raj@intel.com> Change-Id: Ic84ea137b134385246bf11dee2ed6d34b593b956
Diffstat (limited to 'vnfs/DAaaS/microservices/PythonApps/python-kafkaConsumer-inference-app/src/producer/CustomKafkaProducer.py')
-rw-r--r--vnfs/DAaaS/microservices/PythonApps/python-kafkaConsumer-inference-app/src/producer/CustomKafkaProducer.py38
1 files changed, 38 insertions, 0 deletions
diff --git a/vnfs/DAaaS/microservices/PythonApps/python-kafkaConsumer-inference-app/src/producer/CustomKafkaProducer.py b/vnfs/DAaaS/microservices/PythonApps/python-kafkaConsumer-inference-app/src/producer/CustomKafkaProducer.py
new file mode 100644
index 00000000..8f726bd9
--- /dev/null
+++ b/vnfs/DAaaS/microservices/PythonApps/python-kafkaConsumer-inference-app/src/producer/CustomKafkaProducer.py
@@ -0,0 +1,38 @@
+import logging
+from confluent_kafka import Producer
+import traceback
+
+logging.basicConfig(format='%(asctime)s::%(process)d::%(levelname)s::%(message)s', level=logging.INFO, datefmt='%d-%b-%y %H:%M:%S')
+
+
+class CustomKafkaProducer:
+ def __init__(self):
+ self.topic_name = "metrics3"
+ #self.topic_name = "adatopic1"
+ conf = {'bootstrap.servers': 'kafka-cluster-kafka-bootstrap:9092'
+ }
+ self.producer = Producer(**conf)
+
+
+ def produce(self, kafka_msg, kafka_key):
+ try:
+ self.producer.produce(topic=self.topic_name,
+ value=kafka_msg,
+ key=kafka_key,
+ callback=lambda err, msg: self.on_delivery(err, msg)
+ )
+ self.producer.flush()
+
+ except Exception as e:
+ #print("Error during producing to kafka topic. Stacktrace is %s",e)
+ logging.error("Error during producing to kafka topic.")
+ traceback.print_exc()
+
+
+ def on_delivery(self, err, msg):
+ if err:
+ print("Message failed delivery, error: %s", err)
+ logging.error('%s raised an error', err)
+ else:
+ logging.info("Message delivered to %s on partition %s",
+ msg.topic(), msg.partition()) \ No newline at end of file