diff options
author | Bartosz Gardziejewski <bartosz.gardziejewski@nokia.com> | 2021-04-12 14:21:34 +0200 |
---|---|---|
committer | Bartosz Gardziejewski <bartosz.gardziejewski@nokia.com> | 2021-04-12 14:23:27 +0200 |
commit | c9a9383bb208f8d846f2c83289635a1d4b95898c (patch) | |
tree | 6a179f362641e8bbb53ee230d764cdf6ccbfee97 /src/python | |
parent | b7428c3c9dcbd6dfbff1b1c5eb5720d3b1098936 (diff) |
change kafka send method to be async.
Issue-ID: INT-1869
Signed-off-by: Bartosz Gardziejewski <bartosz.gardziejewski@nokia.com>
Change-Id: I3301fd97f8f90443ebc4e3530b8c1fa10b5388c3
Diffstat (limited to 'src/python')
-rw-r--r-- | src/python/netconf_server/netconf_change_listener.py | 15 | ||||
-rw-r--r-- | src/python/netconf_server/netconf_kafka_client.py | 4 |
2 files changed, 15 insertions, 4 deletions
diff --git a/src/python/netconf_server/netconf_change_listener.py b/src/python/netconf_server/netconf_change_listener.py index 628f757..cdb5457 100644 --- a/src/python/netconf_server/netconf_change_listener.py +++ b/src/python/netconf_server/netconf_change_listener.py @@ -21,7 +21,6 @@ import logging from kafka.producer.future import FutureRecordMetadata - from netconf_server.netconf_kafka_client import NetconfKafkaClient from netconf_server.netconf_kafka_message_factory import NetconfKafkaMessageFactory from netconf_server.sysrepo_interface.config_change_data import ConfigChangeData @@ -53,11 +52,19 @@ class NetconfChangeListener(object): kafka_message = NetconfChangeListener._create_kafka_message(change) logging.info("Sending message '{}' to Kafka '{}' topic".format(kafka_message, self.topic)) response = self.kafka_client.send(self.topic, kafka_message) # type: FutureRecordMetadata - logging.info("Response from Kafka: {}".format(response.get(timeout=1))) - + self.set_up_callbacks_for_kafka_request(response) + logging.info("Module changes sent to Kafka") except Exception as e: logger.error("Exception occurred during handling of sysrepo config change", e) - logger.info("Module changes sent to Kafka. Operation finished.") + + @staticmethod + def set_up_callbacks_for_kafka_request(response): + response.add_callback( + lambda val: logging.info("Response from Kafka: {}".format(val)) + ) + response.add_errback( + lambda exc: logging.error("Exception from Kafka: {}".format(exc)) + ) @staticmethod def _create_kafka_message(change): diff --git a/src/python/netconf_server/netconf_kafka_client.py b/src/python/netconf_server/netconf_kafka_client.py index 8687802..53e7ecd 100644 --- a/src/python/netconf_server/netconf_kafka_client.py +++ b/src/python/netconf_server/netconf_kafka_client.py @@ -48,6 +48,10 @@ class NetconfKafkaClient(object): server = "{}:{}".format(host, port) producer = KafkaProducer( bootstrap_servers=server, + request_timeout_ms=15000, + retry_backoff_ms=1000, + max_in_flight_requests_per_connection=1, + retries=3, value_serializer=lambda x: dumps(x).encode(STANDARD_CHARSETS_UTF8) ) |