1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
|
from collections import namedtuple
from unittest import TestCase
from unittest.mock import MagicMock
import sys
# we need to mock sysrepo library. It is not possible to install it in the newest version of the Linux distribution
sys.modules['sysrepo'] = MagicMock()
from netconf_server.sysrepo_interface.sysrepo_message_model import SysrepoMessage
from netconf_server.netconf_kafka_message_factory import NetconfKafkaMessageFactory
SYSREPO_MESSAGE_MODEL = namedtuple('SM', ['value', 'xpath', 'prev_val'])
class TestNetconfKafkaMessageFactory(TestCase):
def test_should_return_empty_dict_when_sysrepo_message_is_none(self):
# when
actual = NetconfKafkaMessageFactory.create(None)
# then
self.assertEqual({}, actual)
def test_should_prepare_message_for_sysrepo_message_with_status_change_created(self):
# given
s = SYSREPO_MESSAGE_MODEL(44, '/pnf-simulator:config/itemValue1', None)
sysrepo_message = SysrepoMessage(s)
sysrepo_message.is_modified = lambda: False
sysrepo_message.is_created = lambda: True
# when
actual = NetconfKafkaMessageFactory.create(sysrepo_message)
# then
self.assertEqual(
{'type': 'ChangeCreated', 'new': {'path': '/pnf-simulator:config/itemValue1', 'value': 44}},
actual
)
def test_should_prepare_message_for_sysrepo_message_with_status_change_modified_no_old_value(self):
# given
s = SYSREPO_MESSAGE_MODEL(45, '/pnf-simulator:config/itemValue1', None)
sysrepo_message = SysrepoMessage(s)
sysrepo_message.is_modified = lambda: True
sysrepo_message.is_created = lambda: False
# when
actual = NetconfKafkaMessageFactory.create(sysrepo_message)
# then
self.assertEqual(
{'type': 'ChangeModified', 'new': {'path': '/pnf-simulator:config/itemValue1', 'value': 45}},
actual
)
def test_should_prepare_message_for_sysrepo_message_with_status_change_modified_old_value_exists(self):
# given
s = SYSREPO_MESSAGE_MODEL(45, '/pnf-simulator:config/itemValue1', 44)
sysrepo_message = SysrepoMessage(s)
sysrepo_message.is_modified = lambda: True
sysrepo_message.is_created = lambda: False
# when
actual = NetconfKafkaMessageFactory.create(sysrepo_message)
# then
self.assertEqual(
{'type': 'ChangeModified', 'old': {'path': '/pnf-simulator:config/itemValue1', 'value': 44}, 'new': {'path': '/pnf-simulator:config/itemValue1', 'value': 45}},
actual
)
|