aboutsummaryrefslogtreecommitdiffstats
path: root/src/python/tests/unit/test_netconf_kafka_client.py
diff options
context:
space:
mode:
Diffstat (limited to 'src/python/tests/unit/test_netconf_kafka_client.py')
-rw-r--r--src/python/tests/unit/test_netconf_kafka_client.py60
1 files changed, 60 insertions, 0 deletions
diff --git a/src/python/tests/unit/test_netconf_kafka_client.py b/src/python/tests/unit/test_netconf_kafka_client.py
new file mode 100644
index 0000000..9eff761
--- /dev/null
+++ b/src/python/tests/unit/test_netconf_kafka_client.py
@@ -0,0 +1,60 @@
+###
+# ============LICENSE_START=======================================================
+# Netconf Server
+# ================================================================================
+# Copyright (C) 2021 Nokia. All rights reserved.
+# ================================================================================
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# ============LICENSE_END=========================================================
+###
+from unittest import TestCase
+from unittest.mock import MagicMock, call
+
+from netconf_server.netconf_kafka_client import NetconfKafkaClient
+
+MESSAGE_1 = "{'number':1}"
+MESSAGE_2 = "{'number':2}"
+
+TOPIC_NAME = 'config'
+
+
+class TestNetconfKafkaClient(TestCase):
+
+ def setUp(self):
+ self.producer = MagicMock()
+ self.kafka_customer_func = MagicMock(
+ return_value=[MagicMock(value=MESSAGE_1), MagicMock(value=MESSAGE_2)]
+ )
+ self.test_obj = NetconfKafkaClient(
+ producer=self.producer,
+ get_kafka_consumer_func=self.kafka_customer_func
+ )
+
+ def test_create_instance(self):
+ self.assertIsNotNone(self.test_obj)
+
+ def test_send_a_message_to_kafka(self):
+ # when
+ self.test_obj.send(TOPIC_NAME, MESSAGE_1)
+
+ # then
+ self.producer.assert_has_calls([call.send(topic=TOPIC_NAME, value=MESSAGE_1)])
+
+ def test_get_all_messages_from_kafka_topic(self):
+ # when
+ messages = self.test_obj.get_all_messages_from(TOPIC_NAME)
+
+ # then
+ self.assertTrue(len(messages) == 2)
+ self.assertTrue(MESSAGE_1 in messages)
+ self.assertTrue(MESSAGE_2 in messages)