aboutsummaryrefslogtreecommitdiffstats
path: root/src/python/tests/unit/test_netconf_kafka_message_factory.py
blob: b899bd9f03267a337dc07673016ed20b4adca319 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
from collections import namedtuple
from unittest import TestCase
from unittest.mock import MagicMock

import sys

# we need to mock sysrepo library. It is not possible to install it in the newest version of the Linux distribution
sys.modules['sysrepo'] = MagicMock()

from netconf_server.sysrepo_interface.sysrepo_message_model import SysrepoMessage

from netconf_server.netconf_kafka_message_factory import NetconfKafkaMessageFactory

SYSREPO_MESSAGE_MODEL = namedtuple('SM', ['value', 'xpath', 'prev_val'])


class TestNetconfKafkaMessageFactory(TestCase):
    def test_should_return_empty_dict_when_sysrepo_message_is_none(self):
        # when
        actual = NetconfKafkaMessageFactory.create(None)

        # then
        self.assertEqual({}, actual)

    def test_should_prepare_message_for_sysrepo_message_with_status_change_created(self):
        # given
        s = SYSREPO_MESSAGE_MODEL(44, '/pnf-simulator:config/itemValue1', None)

        sysrepo_message = SysrepoMessage(s)
        sysrepo_message.is_modified = lambda: False
        sysrepo_message.is_created = lambda: True

        # when
        actual = NetconfKafkaMessageFactory.create(sysrepo_message)

        # then
        self.assertEqual(
            {'type': 'ChangeCreated', 'new': {'path': '/pnf-simulator:config/itemValue1', 'value': 44}},
            actual
        )

    def test_should_prepare_message_for_sysrepo_message_with_status_change_modified_no_old_value(self):
        # given
        s = SYSREPO_MESSAGE_MODEL(45, '/pnf-simulator:config/itemValue1', None)

        sysrepo_message = SysrepoMessage(s)
        sysrepo_message.is_modified = lambda: True
        sysrepo_message.is_created = lambda: False

        # when
        actual = NetconfKafkaMessageFactory.create(sysrepo_message)

        # then
        self.assertEqual(
            {'type': 'ChangeModified', 'new': {'path': '/pnf-simulator:config/itemValue1', 'value': 45}},
            actual
        )

    def test_should_prepare_message_for_sysrepo_message_with_status_change_modified_old_value_exists(self):
        # given
        s = SYSREPO_MESSAGE_MODEL(45, '/pnf-simulator:config/itemValue1', 44)

        sysrepo_message = SysrepoMessage(s)
        sysrepo_message.is_modified = lambda: True
        sysrepo_message.is_created = lambda: False

        # when
        actual = NetconfKafkaMessageFactory.create(sysrepo_message)

        # then
        self.assertEqual(
            {'type': 'ChangeModified', 'old': {'path': '/pnf-simulator:config/itemValue1', 'value': 44}, 'new': {'path': '/pnf-simulator:config/itemValue1', 'value': 45}},
            actual
        )