1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
|
import logging
from confluent_kafka import Producer
import traceback
logging.basicConfig(format='%(asctime)s::%(process)d::%(levelname)s::%(message)s', level=logging.INFO, datefmt='%d-%b-%y %H:%M:%S')
class CustomKafkaProducer:
def __init__(self):
self.topic_name = "metrics3"
#self.topic_name = "adatopic1"
conf = {'bootstrap.servers': 'kafka-cluster-kafka-bootstrap:9092'
}
self.producer = Producer(**conf)
def produce(self, kafka_msg, kafka_key):
try:
self.producer.produce(topic=self.topic_name,
value=kafka_msg,
key=kafka_key,
callback=lambda err, msg: self.on_delivery(err, msg)
)
self.producer.flush()
except Exception:
logging.error("Error during producing to kafka topic.")
traceback.print_exc()
def on_delivery(self, err, msg):
if err:
print("Message failed delivery, error: %s", err)
logging.error('%s raised an error', err)
else:
logging.info("Message delivered to %s on partition %s",
msg.topic(), msg.partition())
|