aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorTomasz Pietruszkiewicz <tomasz.pietruszkiewicz@nokia.com>2021-04-12 14:33:41 +0200
committerTomasz Pietruszkiewicz <tomasz.pietruszkiewicz@nokia.com>2021-04-12 14:46:17 +0200
commit37bb5c99e085364cd71bbb44b225f0eb37a7aac7 (patch)
tree07344eb633282698cac1d2a6ab737af38ea11ba3
parentc0d47aca4a13b239e51772fa366fa780ec7812da (diff)
Fix Kafka consumer
Change-Id: I8b52e2d1859b964a582289e2f63d70272d60f39a Issue-ID: INT-1869 Signed-off-by: Tomasz Pietruszkiewicz <tomasz.pietruszkiewicz@nokia.com>
-rw-r--r--src/python/netconf_server/kafka_consumer_factory.py2
-rw-r--r--src/python/netconf_server/netconf_kafka_client.py1
-rw-r--r--src/python/tests/unit/test_netconf_kafka_client.py21
3 files changed, 22 insertions, 2 deletions
diff --git a/src/python/netconf_server/kafka_consumer_factory.py b/src/python/netconf_server/kafka_consumer_factory.py
index 332cd21..43ef1cb 100644
--- a/src/python/netconf_server/kafka_consumer_factory.py
+++ b/src/python/netconf_server/kafka_consumer_factory.py
@@ -26,7 +26,7 @@ STANDARD_CHARSETS_UTF8 = 'utf-8'
def provide_kafka_consumer(topic: str, server: str) -> KafkaConsumer:
return KafkaConsumer(topic,
- consumer_timeout_ms=1000,
+ consumer_timeout_ms=5000,
group_id='netconf-group',
auto_offset_reset='earliest',
enable_auto_commit=False,
diff --git a/src/python/netconf_server/netconf_kafka_client.py b/src/python/netconf_server/netconf_kafka_client.py
index 8687802..027bde1 100644
--- a/src/python/netconf_server/netconf_kafka_client.py
+++ b/src/python/netconf_server/netconf_kafka_client.py
@@ -75,5 +75,6 @@ class NetconfKafkaClient(object):
message_value = message.value
logger.info("Fetched config change %s" % message_value)
messages.append(message_value)
+ consumer.close()
return messages
diff --git a/src/python/tests/unit/test_netconf_kafka_client.py b/src/python/tests/unit/test_netconf_kafka_client.py
index 9eff761..b3f45c1 100644
--- a/src/python/tests/unit/test_netconf_kafka_client.py
+++ b/src/python/tests/unit/test_netconf_kafka_client.py
@@ -32,8 +32,10 @@ class TestNetconfKafkaClient(TestCase):
def setUp(self):
self.producer = MagicMock()
+ self.kafkaConsumerResponse = KafkaConsumerResponse(MagicMock(value=MESSAGE_1), MagicMock(value=MESSAGE_2))
+ self.kafkaConsumerResponse.close = MagicMock()
self.kafka_customer_func = MagicMock(
- return_value=[MagicMock(value=MESSAGE_1), MagicMock(value=MESSAGE_2)]
+ return_value=self.kafkaConsumerResponse
)
self.test_obj = NetconfKafkaClient(
producer=self.producer,
@@ -58,3 +60,20 @@ class TestNetconfKafkaClient(TestCase):
self.assertTrue(len(messages) == 2)
self.assertTrue(MESSAGE_1 in messages)
self.assertTrue(MESSAGE_2 in messages)
+
+
+class KafkaConsumerResponse(list):
+
+ def __new__(self, *args, **kwargs):
+ return super(KafkaConsumerResponse, self).__new__(self, args, kwargs)
+
+ def __init__(self, *args, **kwargs):
+ if len(args) == 1 and hasattr(args[0], '__iter__'):
+ list.__init__(self, args[0])
+ else:
+ list.__init__(self, args)
+ self.__dict__.update(kwargs)
+
+ def __call__(self, **kwargs):
+ self.__dict__.update(kwargs)
+ return self