aboutsummaryrefslogtreecommitdiffstats
path: root/src/python/netconf_server/netconf_kafka_client.py
diff options
context:
space:
mode:
Diffstat (limited to 'src/python/netconf_server/netconf_kafka_client.py')
-rw-r--r--src/python/netconf_server/netconf_kafka_client.py31
1 files changed, 16 insertions, 15 deletions
diff --git a/src/python/netconf_server/netconf_kafka_client.py b/src/python/netconf_server/netconf_kafka_client.py
index b0effc3..8687802 100644
--- a/src/python/netconf_server/netconf_kafka_client.py
+++ b/src/python/netconf_server/netconf_kafka_client.py
@@ -18,32 +18,33 @@
# ============LICENSE_END=========================================================
###
import logging
-from json import dumps, loads
-from typing import Callable
+from json import dumps
+from typing import Callable, Any
from kafka import KafkaProducer, KafkaConsumer
+from kafka.errors import NoBrokersAvailable
from kafka.producer.future import FutureRecordMetadata
+from retry import retry
+
+from netconf_server.kafka_consumer_factory import provide_kafka_consumer
STANDARD_CHARSETS_UTF8 = 'utf-8'
-logger = logging.getLogger("netconf_kafka_client")
+logger = logging.getLogger(__name__)
-def provide_kafka_consumer(topic: str, server: str) -> KafkaConsumer:
- return KafkaConsumer(topic,
- consumer_timeout_ms=1000,
- group_id='netconf-group',
- auto_offset_reset='earliest',
- enable_auto_commit=True,
- bootstrap_servers=[server],
- value_deserializer=lambda x: loads(x.decode(STANDARD_CHARSETS_UTF8))
- )
+@retry(NoBrokersAvailable, tries=3, delay=5)
+def provide_configured_kafka_client(kafka_host_name, kafka_port):
+ return NetconfKafkaClient.create(
+ host=kafka_host_name,
+ port=kafka_port
+ ) # type: NetconfKafkaClient
class NetconfKafkaClient(object):
@staticmethod
- def create(host: str, port: int) -> object:
+ def create(host: str, port: int):
server = "{}:{}".format(host, port)
producer = KafkaProducer(
bootstrap_servers=server,
@@ -59,7 +60,7 @@ class NetconfKafkaClient(object):
self._producer = producer
self._get_kafka_consumer = get_kafka_consumer_func
- def send(self, topic: str, value: str) -> FutureRecordMetadata:
+ def send(self, topic: str, value: Any) -> FutureRecordMetadata:
return self._producer.send(
topic=topic,
value=value
@@ -72,7 +73,7 @@ class NetconfKafkaClient(object):
consumer = self._get_kafka_consumer(topic)
for message in consumer:
message_value = message.value
- logger.debug("Fetched config change %s" % message_value)
+ logger.info("Fetched config change %s" % message_value)
messages.append(message_value)
return messages