aboutsummaryrefslogtreecommitdiffstats
path: root/vnfs/DAaaS/microservices/PythonApps/python-kafkaConsumer-inference-app/src/producer/CustomKafkaProducer.py
diff options
context:
space:
mode:
Diffstat (limited to 'vnfs/DAaaS/microservices/PythonApps/python-kafkaConsumer-inference-app/src/producer/CustomKafkaProducer.py')
-rw-r--r--vnfs/DAaaS/microservices/PythonApps/python-kafkaConsumer-inference-app/src/producer/CustomKafkaProducer.py38
1 files changed, 38 insertions, 0 deletions
diff --git a/vnfs/DAaaS/microservices/PythonApps/python-kafkaConsumer-inference-app/src/producer/CustomKafkaProducer.py b/vnfs/DAaaS/microservices/PythonApps/python-kafkaConsumer-inference-app/src/producer/CustomKafkaProducer.py
new file mode 100644
index 00000000..8f726bd9
--- /dev/null
+++ b/vnfs/DAaaS/microservices/PythonApps/python-kafkaConsumer-inference-app/src/producer/CustomKafkaProducer.py
@@ -0,0 +1,38 @@
+import logging
+from confluent_kafka import Producer
+import traceback
+
+logging.basicConfig(format='%(asctime)s::%(process)d::%(levelname)s::%(message)s', level=logging.INFO, datefmt='%d-%b-%y %H:%M:%S')
+
+
+class CustomKafkaProducer:
+ def __init__(self):
+ self.topic_name = "metrics3"
+ #self.topic_name = "adatopic1"
+ conf = {'bootstrap.servers': 'kafka-cluster-kafka-bootstrap:9092'
+ }
+ self.producer = Producer(**conf)
+
+
+ def produce(self, kafka_msg, kafka_key):
+ try:
+ self.producer.produce(topic=self.topic_name,
+ value=kafka_msg,
+ key=kafka_key,
+ callback=lambda err, msg: self.on_delivery(err, msg)
+ )
+ self.producer.flush()
+
+ except Exception as e:
+ #print("Error during producing to kafka topic. Stacktrace is %s",e)
+ logging.error("Error during producing to kafka topic.")
+ traceback.print_exc()
+
+
+ def on_delivery(self, err, msg):
+ if err:
+ print("Message failed delivery, error: %s", err)
+ logging.error('%s raised an error', err)
+ else:
+ logging.info("Message delivered to %s on partition %s",
+ msg.topic(), msg.partition()) \ No newline at end of file