diff options
author | Tomasz Pietruszkiewicz <tomasz.pietruszkiewicz@nokia.com> | 2021-04-12 14:33:41 +0200 |
---|---|---|
committer | Tomasz Pietruszkiewicz <tomasz.pietruszkiewicz@nokia.com> | 2021-04-12 14:46:17 +0200 |
commit | 37bb5c99e085364cd71bbb44b225f0eb37a7aac7 (patch) | |
tree | 07344eb633282698cac1d2a6ab737af38ea11ba3 /src/python | |
parent | c0d47aca4a13b239e51772fa366fa780ec7812da (diff) |
Fix Kafka consumer
Change-Id: I8b52e2d1859b964a582289e2f63d70272d60f39a
Issue-ID: INT-1869
Signed-off-by: Tomasz Pietruszkiewicz <tomasz.pietruszkiewicz@nokia.com>
Diffstat (limited to 'src/python')
-rw-r--r-- | src/python/netconf_server/kafka_consumer_factory.py | 2 | ||||
-rw-r--r-- | src/python/netconf_server/netconf_kafka_client.py | 1 | ||||
-rw-r--r-- | src/python/tests/unit/test_netconf_kafka_client.py | 21 |
3 files changed, 22 insertions, 2 deletions
diff --git a/src/python/netconf_server/kafka_consumer_factory.py b/src/python/netconf_server/kafka_consumer_factory.py index 332cd21..43ef1cb 100644 --- a/src/python/netconf_server/kafka_consumer_factory.py +++ b/src/python/netconf_server/kafka_consumer_factory.py @@ -26,7 +26,7 @@ STANDARD_CHARSETS_UTF8 = 'utf-8' def provide_kafka_consumer(topic: str, server: str) -> KafkaConsumer: return KafkaConsumer(topic, - consumer_timeout_ms=1000, + consumer_timeout_ms=5000, group_id='netconf-group', auto_offset_reset='earliest', enable_auto_commit=False, diff --git a/src/python/netconf_server/netconf_kafka_client.py b/src/python/netconf_server/netconf_kafka_client.py index 8687802..027bde1 100644 --- a/src/python/netconf_server/netconf_kafka_client.py +++ b/src/python/netconf_server/netconf_kafka_client.py @@ -75,5 +75,6 @@ class NetconfKafkaClient(object): message_value = message.value logger.info("Fetched config change %s" % message_value) messages.append(message_value) + consumer.close() return messages diff --git a/src/python/tests/unit/test_netconf_kafka_client.py b/src/python/tests/unit/test_netconf_kafka_client.py index 9eff761..b3f45c1 100644 --- a/src/python/tests/unit/test_netconf_kafka_client.py +++ b/src/python/tests/unit/test_netconf_kafka_client.py @@ -32,8 +32,10 @@ class TestNetconfKafkaClient(TestCase): def setUp(self): self.producer = MagicMock() + self.kafkaConsumerResponse = KafkaConsumerResponse(MagicMock(value=MESSAGE_1), MagicMock(value=MESSAGE_2)) + self.kafkaConsumerResponse.close = MagicMock() self.kafka_customer_func = MagicMock( - return_value=[MagicMock(value=MESSAGE_1), MagicMock(value=MESSAGE_2)] + return_value=self.kafkaConsumerResponse ) self.test_obj = NetconfKafkaClient( producer=self.producer, @@ -58,3 +60,20 @@ class TestNetconfKafkaClient(TestCase): self.assertTrue(len(messages) == 2) self.assertTrue(MESSAGE_1 in messages) self.assertTrue(MESSAGE_2 in messages) + + +class KafkaConsumerResponse(list): + + def __new__(self, *args, **kwargs): + return super(KafkaConsumerResponse, self).__new__(self, args, kwargs) + + def __init__(self, *args, **kwargs): + if len(args) == 1 and hasattr(args[0], '__iter__'): + list.__init__(self, args[0]) + else: + list.__init__(self, args) + self.__dict__.update(kwargs) + + def __call__(self, **kwargs): + self.__dict__.update(kwargs) + return self |