diff options
Diffstat (limited to 'src/python/netconf_server/netconf_kafka_client.py')
-rw-r--r-- | src/python/netconf_server/netconf_kafka_client.py | 31 |
1 files changed, 16 insertions, 15 deletions
diff --git a/src/python/netconf_server/netconf_kafka_client.py b/src/python/netconf_server/netconf_kafka_client.py index b0effc3..8687802 100644 --- a/src/python/netconf_server/netconf_kafka_client.py +++ b/src/python/netconf_server/netconf_kafka_client.py @@ -18,32 +18,33 @@ # ============LICENSE_END========================================================= ### import logging -from json import dumps, loads -from typing import Callable +from json import dumps +from typing import Callable, Any from kafka import KafkaProducer, KafkaConsumer +from kafka.errors import NoBrokersAvailable from kafka.producer.future import FutureRecordMetadata +from retry import retry + +from netconf_server.kafka_consumer_factory import provide_kafka_consumer STANDARD_CHARSETS_UTF8 = 'utf-8' -logger = logging.getLogger("netconf_kafka_client") +logger = logging.getLogger(__name__) -def provide_kafka_consumer(topic: str, server: str) -> KafkaConsumer: - return KafkaConsumer(topic, - consumer_timeout_ms=1000, - group_id='netconf-group', - auto_offset_reset='earliest', - enable_auto_commit=True, - bootstrap_servers=[server], - value_deserializer=lambda x: loads(x.decode(STANDARD_CHARSETS_UTF8)) - ) +@retry(NoBrokersAvailable, tries=3, delay=5) +def provide_configured_kafka_client(kafka_host_name, kafka_port): + return NetconfKafkaClient.create( + host=kafka_host_name, + port=kafka_port + ) # type: NetconfKafkaClient class NetconfKafkaClient(object): @staticmethod - def create(host: str, port: int) -> object: + def create(host: str, port: int): server = "{}:{}".format(host, port) producer = KafkaProducer( bootstrap_servers=server, @@ -59,7 +60,7 @@ class NetconfKafkaClient(object): self._producer = producer self._get_kafka_consumer = get_kafka_consumer_func - def send(self, topic: str, value: str) -> FutureRecordMetadata: + def send(self, topic: str, value: Any) -> FutureRecordMetadata: return self._producer.send( topic=topic, value=value @@ -72,7 +73,7 @@ class NetconfKafkaClient(object): consumer = self._get_kafka_consumer(topic) for message in consumer: message_value = message.value - logger.debug("Fetched config change %s" % message_value) + logger.info("Fetched config change %s" % message_value) messages.append(message_value) return messages |