aboutsummaryrefslogtreecommitdiffstats
path: root/src/python/tests/unit/test_netconf_kafka_message_factory.py
diff options
context:
space:
mode:
Diffstat (limited to 'src/python/tests/unit/test_netconf_kafka_message_factory.py')
-rw-r--r--src/python/tests/unit/test_netconf_kafka_message_factory.py75
1 files changed, 75 insertions, 0 deletions
diff --git a/src/python/tests/unit/test_netconf_kafka_message_factory.py b/src/python/tests/unit/test_netconf_kafka_message_factory.py
new file mode 100644
index 0000000..b899bd9
--- /dev/null
+++ b/src/python/tests/unit/test_netconf_kafka_message_factory.py
@@ -0,0 +1,75 @@
+from collections import namedtuple
+from unittest import TestCase
+from unittest.mock import MagicMock
+
+import sys
+
+# we need to mock sysrepo library. It is not possible to install it in the newest version of the Linux distribution
+sys.modules['sysrepo'] = MagicMock()
+
+from netconf_server.sysrepo_interface.sysrepo_message_model import SysrepoMessage
+
+from netconf_server.netconf_kafka_message_factory import NetconfKafkaMessageFactory
+
+SYSREPO_MESSAGE_MODEL = namedtuple('SM', ['value', 'xpath', 'prev_val'])
+
+
+class TestNetconfKafkaMessageFactory(TestCase):
+ def test_should_return_empty_dict_when_sysrepo_message_is_none(self):
+ # when
+ actual = NetconfKafkaMessageFactory.create(None)
+
+ # then
+ self.assertEqual({}, actual)
+
+ def test_should_prepare_message_for_sysrepo_message_with_status_change_created(self):
+ # given
+ s = SYSREPO_MESSAGE_MODEL(44, '/pnf-simulator:config/itemValue1', None)
+
+ sysrepo_message = SysrepoMessage(s)
+ sysrepo_message.is_modified = lambda: False
+ sysrepo_message.is_created = lambda: True
+
+ # when
+ actual = NetconfKafkaMessageFactory.create(sysrepo_message)
+
+ # then
+ self.assertEqual(
+ {'type': 'ChangeCreated', 'new': {'path': '/pnf-simulator:config/itemValue1', 'value': 44}},
+ actual
+ )
+
+ def test_should_prepare_message_for_sysrepo_message_with_status_change_modified_no_old_value(self):
+ # given
+ s = SYSREPO_MESSAGE_MODEL(45, '/pnf-simulator:config/itemValue1', None)
+
+ sysrepo_message = SysrepoMessage(s)
+ sysrepo_message.is_modified = lambda: True
+ sysrepo_message.is_created = lambda: False
+
+ # when
+ actual = NetconfKafkaMessageFactory.create(sysrepo_message)
+
+ # then
+ self.assertEqual(
+ {'type': 'ChangeModified', 'new': {'path': '/pnf-simulator:config/itemValue1', 'value': 45}},
+ actual
+ )
+
+ def test_should_prepare_message_for_sysrepo_message_with_status_change_modified_old_value_exists(self):
+ # given
+ s = SYSREPO_MESSAGE_MODEL(45, '/pnf-simulator:config/itemValue1', 44)
+
+ sysrepo_message = SysrepoMessage(s)
+ sysrepo_message.is_modified = lambda: True
+ sysrepo_message.is_created = lambda: False
+
+ # when
+ actual = NetconfKafkaMessageFactory.create(sysrepo_message)
+
+ # then
+ self.assertEqual(
+ {'type': 'ChangeModified', 'old': {'path': '/pnf-simulator:config/itemValue1', 'value': 44}, 'new': {'path': '/pnf-simulator:config/itemValue1', 'value': 45}},
+ actual
+ )
+