diff options
Diffstat (limited to 'vnfs/DAaaS/microservices/PythonApps/python-kafkaConsumer-inference-app/src/producer/CustomKafkaProducer.py')
-rw-r--r-- | vnfs/DAaaS/microservices/PythonApps/python-kafkaConsumer-inference-app/src/producer/CustomKafkaProducer.py | 38 |
1 files changed, 38 insertions, 0 deletions
diff --git a/vnfs/DAaaS/microservices/PythonApps/python-kafkaConsumer-inference-app/src/producer/CustomKafkaProducer.py b/vnfs/DAaaS/microservices/PythonApps/python-kafkaConsumer-inference-app/src/producer/CustomKafkaProducer.py new file mode 100644 index 00000000..8f726bd9 --- /dev/null +++ b/vnfs/DAaaS/microservices/PythonApps/python-kafkaConsumer-inference-app/src/producer/CustomKafkaProducer.py @@ -0,0 +1,38 @@ +import logging +from confluent_kafka import Producer +import traceback + +logging.basicConfig(format='%(asctime)s::%(process)d::%(levelname)s::%(message)s', level=logging.INFO, datefmt='%d-%b-%y %H:%M:%S') + + +class CustomKafkaProducer: + def __init__(self): + self.topic_name = "metrics3" + #self.topic_name = "adatopic1" + conf = {'bootstrap.servers': 'kafka-cluster-kafka-bootstrap:9092' + } + self.producer = Producer(**conf) + + + def produce(self, kafka_msg, kafka_key): + try: + self.producer.produce(topic=self.topic_name, + value=kafka_msg, + key=kafka_key, + callback=lambda err, msg: self.on_delivery(err, msg) + ) + self.producer.flush() + + except Exception as e: + #print("Error during producing to kafka topic. Stacktrace is %s",e) + logging.error("Error during producing to kafka topic.") + traceback.print_exc() + + + def on_delivery(self, err, msg): + if err: + print("Message failed delivery, error: %s", err) + logging.error('%s raised an error', err) + else: + logging.info("Message delivered to %s on partition %s", + msg.topic(), msg.partition())
\ No newline at end of file |