diff options
author | Bogumil Zebek <bogumil.zebek@nokia.com> | 2021-03-30 15:01:37 +0200 |
---|---|---|
committer | Zebek Bogumil <bogumil.zebek@nokia.com> | 2021-04-02 07:39:48 +0200 |
commit | c0d47aca4a13b239e51772fa366fa780ec7812da (patch) | |
tree | 7561c4f7654de3dba3d0e1c1de3f8c68fe75fff4 | |
parent | 06daadc4403397935c647dca2bbb92459864d12a (diff) |
Add Kafka support
- send changes on Kafka topic
- add endpoint for fetching changes from Kafka
Signed-off-by: Bogumil Zebek <bogumil.zebek@nokia.com>
Issue-ID: INT-1869
Signed-off-by: Zebek Bogumil <bogumil.zebek@nokia.com>
Change-Id: I349fdc4295659fc69407b5b1138281e2673f7938
18 files changed, 407 insertions, 54 deletions
@@ -20,6 +20,6 @@ RUN mkdir -p /resources/certs && \ ENV ENABLE_TLS=false ENV KAFKA_HOST_NAME="kafka1" ENV KAFKA_PORT=9092 -ENV KAFKA_TOPIC="config:1:1" +ENV KAFKA_TOPIC="config" ENTRYPOINT ["./scripts/set-up-netopeer.sh", "/resources/models", "/resources/certs"] @@ -68,6 +68,21 @@ </arguments> </configuration> </execution> + <execution> + <id>python-clean</id> + <phase>clean</phase> + <goals> + <goal>exec</goal> + </goals> + <configuration> + <workingDirectory>./src/python</workingDirectory> + <executable>rm</executable> + <arguments> + <argument>-rf</argument> + <argument>.tox</argument> + </arguments> + </configuration> + </execution> </executions> </plugin> </plugins> diff --git a/src/python/netconf_change_listener_application.py b/src/python/netconf_change_listener_application.py index 34b00b3..400ff3d 100644 --- a/src/python/netconf_change_listener_application.py +++ b/src/python/netconf_change_listener_application.py @@ -21,7 +21,6 @@ import asyncio import sys import logging -from typing import Tuple from netconf_server.netconf_app_configuration import NetconfAppConfiguration @@ -33,7 +32,7 @@ from netconf_server.sysrepo_interface.sysrepo_client import SysrepoClient logging.basicConfig( handlers=[logging.StreamHandler(), logging.FileHandler("/logs/netconf_change_listener.log")], - level=logging.DEBUG + level=logging.INFO ) logger = logging.getLogger("netconf_change_listener") @@ -43,9 +42,9 @@ def run_server_forever(session, connection, change_listener: NetconfChangeListen asyncio.get_event_loop().run_forever() -def create_change_listener(module_configuration_file_path: str) -> NetconfChangeListener: - configuration = SysrepoConfigurationLoader.load_configuration(module_configuration_file_path) - return NetconfChangeListenerFactory(configuration.models_to_subscribe_to).create() +def create_change_listener(app_configuration: NetconfAppConfiguration) -> NetconfChangeListener: + configuration = SysrepoConfigurationLoader.load_configuration(app_configuration.module_configuration_file_path) + return NetconfChangeListenerFactory(configuration.models_to_subscribe_to, app_configuration).create() if __name__ == "__main__": @@ -54,7 +53,7 @@ if __name__ == "__main__": if app_configuration: logger.info("Netconf change listener application configuration: {}".format(app_configuration)) try: - netconf_change_listener = create_change_listener(app_configuration.module_configuration_file_path) + netconf_change_listener = create_change_listener(app_configuration) SysrepoClient().run_in_session(run_server_forever, netconf_change_listener) except ConfigLoadingException: logger.error("File to load configuration from file %s" % app_configuration.module_configuration_file_path) diff --git a/src/python/netconf_rest_application.py b/src/python/netconf_rest_application.py index 0a040c9..b13a084 100644 --- a/src/python/netconf_rest_application.py +++ b/src/python/netconf_rest_application.py @@ -21,6 +21,7 @@ import sys import logging from netconf_server.netconf_app_configuration import NetconfAppConfiguration +from netconf_server.netconf_kafka_client import provide_configured_kafka_client from netconf_server.netconf_rest_server import NetconfRestServer from netconf_server.sysrepo_configuration.sysrepo_configuration_manager import SysrepoConfigurationManager @@ -28,14 +29,18 @@ from netconf_server.sysrepo_interface.sysrepo_client import SysrepoClient logging.basicConfig( handlers=[logging.StreamHandler(), logging.FileHandler("/logs/netconf_rest_application.log")], - level=logging.DEBUG + level=logging.INFO ) logger = logging.getLogger("netconf_rest_application") def start_rest_server(session, connection, server_rest: NetconfRestServer, netconf_app_configuration: NetconfAppConfiguration): sysrepo_cfg_manager = create_conf_manager(session, connection) - server_rest.start(sysrepo_cfg_manager, netconf_app_configuration) + kafka_client = provide_configured_kafka_client( + netconf_app_configuration.kafka_host_name, + netconf_app_configuration.kafka_port + ) + server_rest.start(sysrepo_cfg_manager, kafka_client, netconf_app_configuration.kafka_topic) def create_rest_server() -> NetconfRestServer: diff --git a/src/python/netconf_server/kafka_consumer_factory.py b/src/python/netconf_server/kafka_consumer_factory.py new file mode 100644 index 0000000..332cd21 --- /dev/null +++ b/src/python/netconf_server/kafka_consumer_factory.py @@ -0,0 +1,35 @@ +### +# ============LICENSE_START======================================================= +# Netconf Server +# ================================================================================ +# Copyright (C) 2021 Nokia. All rights reserved. +# ================================================================================ +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============LICENSE_END========================================================= +### +from json import loads + +from kafka import KafkaConsumer + +STANDARD_CHARSETS_UTF8 = 'utf-8' + + +def provide_kafka_consumer(topic: str, server: str) -> KafkaConsumer: + return KafkaConsumer(topic, + consumer_timeout_ms=1000, + group_id='netconf-group', + auto_offset_reset='earliest', + enable_auto_commit=False, + bootstrap_servers=[server], + value_deserializer=lambda x: loads(x.decode(STANDARD_CHARSETS_UTF8)) + ) diff --git a/src/python/netconf_server/netconf_app_configuration.py b/src/python/netconf_server/netconf_app_configuration.py index 0e25a41..1aaf12c 100644 --- a/src/python/netconf_server/netconf_app_configuration.py +++ b/src/python/netconf_server/netconf_app_configuration.py @@ -38,6 +38,7 @@ class NetconfAppConfiguration(object): self.kafka_topic = kafka_topic def __str__(self): - return "NetconfAppConfiguration[configuration_file -> '{}', kafka_host_name -> '{}', kafka_port -> '{}', kafka_topic -> '{}']"\ + return "NetconfAppConfiguration[configuration_file -> '{}', " \ + "kafka_host_name -> '{}', kafka_port -> '{}', kafka_topic -> '{}']"\ .format(self.module_configuration_file_path, self.kafka_host_name, self.kafka_port, self.kafka_topic) diff --git a/src/python/netconf_server/netconf_change_listener.py b/src/python/netconf_server/netconf_change_listener.py index 44910d1..628f757 100644 --- a/src/python/netconf_server/netconf_change_listener.py +++ b/src/python/netconf_server/netconf_change_listener.py @@ -19,21 +19,46 @@ ### import logging +from kafka.producer.future import FutureRecordMetadata + + +from netconf_server.netconf_kafka_client import NetconfKafkaClient +from netconf_server.netconf_kafka_message_factory import NetconfKafkaMessageFactory from netconf_server.sysrepo_interface.config_change_data import ConfigChangeData +from netconf_server.sysrepo_interface.sysrepo_message_model import SysrepoMessage logger = logging.getLogger(__name__) class NetconfChangeListener(object): - def __init__(self, subscriptions: list): + def __init__(self, subscriptions: list, kafka_client: NetconfKafkaClient, topic: str): self.subscriptions = subscriptions + self.kafka_client = kafka_client + self.topic = topic def run(self, session): for subscription in self.subscriptions: - subscription.callback_function = self.__on_module_configuration_change + subscription.callback_function = self._on_module_configuration_change subscription.subscribe_on_model_change(session) + def _on_module_configuration_change(self, config_change_data: ConfigChangeData): + logger.info("Received module changed: {} , {} ".format(config_change_data.event, config_change_data.changes)) + if config_change_data.event != "done": + self._send_change_to_kafka(config_change_data) + + def _send_change_to_kafka(self, config_change_data): + for change in config_change_data.changes: + try: + kafka_message = NetconfChangeListener._create_kafka_message(change) + logging.info("Sending message '{}' to Kafka '{}' topic".format(kafka_message, self.topic)) + response = self.kafka_client.send(self.topic, kafka_message) # type: FutureRecordMetadata + logging.info("Response from Kafka: {}".format(response.get(timeout=1))) + + except Exception as e: + logger.error("Exception occurred during handling of sysrepo config change", e) + logger.info("Module changes sent to Kafka. Operation finished.") + @staticmethod - def __on_module_configuration_change(config_change_data: ConfigChangeData): - logger.info("Received module changed: %s , %s " % (config_change_data.event, config_change_data.changes)) + def _create_kafka_message(change): + return NetconfKafkaMessageFactory.create(SysrepoMessage(change)) diff --git a/src/python/netconf_server/netconf_change_listener_factory.py b/src/python/netconf_server/netconf_change_listener_factory.py index fa5e071..9b9ff24 100644 --- a/src/python/netconf_server/netconf_change_listener_factory.py +++ b/src/python/netconf_server/netconf_change_listener_factory.py @@ -19,7 +19,10 @@ ### import logging + +from netconf_server.netconf_app_configuration import NetconfAppConfiguration from netconf_server.netconf_change_listener import NetconfChangeListener +from netconf_server.netconf_kafka_client import NetconfKafkaClient, provide_configured_kafka_client from netconf_server.sysrepo_interface.config_change_subscriber import ConfigChangeSubscriber logger = logging.getLogger(__name__) @@ -27,8 +30,9 @@ logger = logging.getLogger(__name__) class NetconfChangeListenerFactory(object): - def __init__(self, modules_to_subscribe_names: list): + def __init__(self, modules_to_subscribe_names: list, app_configuration: NetconfAppConfiguration): self.modules_to_subscribe_names = modules_to_subscribe_names + self.app_configuration = app_configuration def create(self) -> NetconfChangeListener: subscriptions = list() @@ -36,5 +40,13 @@ class NetconfChangeListenerFactory(object): subscriptions.append( ConfigChangeSubscriber(module_name) ) - return NetconfChangeListener(subscriptions) + kafka_client = NetconfChangeListenerFactory._try_to_create_kafka_client( + self.app_configuration.kafka_host_name, + self.app_configuration.kafka_port + ) + + return NetconfChangeListener(subscriptions, kafka_client, self.app_configuration.kafka_topic) + @staticmethod + def _try_to_create_kafka_client(kafka_host_name: str, kafka_port: int): + return provide_configured_kafka_client(kafka_host_name, kafka_port) # type: NetconfKafkaClient diff --git a/src/python/netconf_server/netconf_kafka_client.py b/src/python/netconf_server/netconf_kafka_client.py index b0effc3..8687802 100644 --- a/src/python/netconf_server/netconf_kafka_client.py +++ b/src/python/netconf_server/netconf_kafka_client.py @@ -18,32 +18,33 @@ # ============LICENSE_END========================================================= ### import logging -from json import dumps, loads -from typing import Callable +from json import dumps +from typing import Callable, Any from kafka import KafkaProducer, KafkaConsumer +from kafka.errors import NoBrokersAvailable from kafka.producer.future import FutureRecordMetadata +from retry import retry + +from netconf_server.kafka_consumer_factory import provide_kafka_consumer STANDARD_CHARSETS_UTF8 = 'utf-8' -logger = logging.getLogger("netconf_kafka_client") +logger = logging.getLogger(__name__) -def provide_kafka_consumer(topic: str, server: str) -> KafkaConsumer: - return KafkaConsumer(topic, - consumer_timeout_ms=1000, - group_id='netconf-group', - auto_offset_reset='earliest', - enable_auto_commit=True, - bootstrap_servers=[server], - value_deserializer=lambda x: loads(x.decode(STANDARD_CHARSETS_UTF8)) - ) +@retry(NoBrokersAvailable, tries=3, delay=5) +def provide_configured_kafka_client(kafka_host_name, kafka_port): + return NetconfKafkaClient.create( + host=kafka_host_name, + port=kafka_port + ) # type: NetconfKafkaClient class NetconfKafkaClient(object): @staticmethod - def create(host: str, port: int) -> object: + def create(host: str, port: int): server = "{}:{}".format(host, port) producer = KafkaProducer( bootstrap_servers=server, @@ -59,7 +60,7 @@ class NetconfKafkaClient(object): self._producer = producer self._get_kafka_consumer = get_kafka_consumer_func - def send(self, topic: str, value: str) -> FutureRecordMetadata: + def send(self, topic: str, value: Any) -> FutureRecordMetadata: return self._producer.send( topic=topic, value=value @@ -72,7 +73,7 @@ class NetconfKafkaClient(object): consumer = self._get_kafka_consumer(topic) for message in consumer: message_value = message.value - logger.debug("Fetched config change %s" % message_value) + logger.info("Fetched config change %s" % message_value) messages.append(message_value) return messages diff --git a/src/python/netconf_server/netconf_kafka_message_factory.py b/src/python/netconf_server/netconf_kafka_message_factory.py new file mode 100644 index 0000000..e39556e --- /dev/null +++ b/src/python/netconf_server/netconf_kafka_message_factory.py @@ -0,0 +1,54 @@ +### +# ============LICENSE_START======================================================= +# Netconf Server +# ================================================================================ +# Copyright (C) 2021 Nokia. All rights reserved. +# ================================================================================ +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============LICENSE_END========================================================= +### +import logging + +from netconf_server.sysrepo_interface.sysrepo_message_model import SysrepoMessage + +logger = logging.getLogger(__name__) + + +class NetconfKafkaMessageFactory(object): + + @classmethod + def create(cls, change: SysrepoMessage) -> dict: + message = {} + if change and change.is_modified(): + logger.debug("Parsing change modified") + message = cls._create_modified_message(change) + elif change and change.is_created(): + logger.debug("Parsing change created") + message = cls._create_created_message(change) + return message + + @classmethod + def _create_created_message(cls, change: SysrepoMessage) -> dict: + message = {"type": "ChangeCreated"} + if change.value(): + message["new"] = {"path": change.xpath(), "value": change.value()} + return message + + @classmethod + def _create_modified_message(cls, change: SysrepoMessage) -> dict: + message = {"type": "ChangeModified"} + if change.prev_val(): + message["old"] = {"path": change.xpath(), "value": change.prev_val()} + if change.value(): + message["new"] = {"path": change.xpath(), "value": change.value()} + return message diff --git a/src/python/netconf_server/netconf_rest_server.py b/src/python/netconf_server/netconf_rest_server.py index 8568454..edaa6e8 100644 --- a/src/python/netconf_server/netconf_rest_server.py +++ b/src/python/netconf_server/netconf_rest_server.py @@ -18,9 +18,8 @@ # ============LICENSE_END========================================================= ### -from flask import Flask, logging, make_response, Response, request +from flask import Flask, logging, make_response, Response, request, jsonify -from netconf_server.netconf_app_configuration import NetconfAppConfiguration from netconf_server.netconf_kafka_client import NetconfKafkaClient from netconf_server.sysrepo_configuration.sysrepo_configuration_manager import SysrepoConfigurationManager @@ -29,7 +28,8 @@ class NetconfRestServer: _rest_server: Flask = Flask("server") logger = logging.create_logger(_rest_server) _configuration_manager: SysrepoConfigurationManager - _app_configuration: NetconfAppConfiguration + _kafka_topic: str + _kafka_client: NetconfKafkaClient def __init__(self, host='0.0.0.0', port=6555): self._host = host @@ -37,9 +37,11 @@ class NetconfRestServer: def start(self, configuration_manager: SysrepoConfigurationManager, - netconf_app_configuration: NetconfAppConfiguration): + kafka_client: NetconfKafkaClient, + kafka_topic: str): NetconfRestServer._configuration_manager = configuration_manager - NetconfRestServer._app_configuration = netconf_app_configuration + NetconfRestServer._kafka_client = kafka_client + NetconfRestServer._kafka_topic = kafka_topic Flask.run( NetconfRestServer._rest_server, host=self._host, @@ -54,22 +56,11 @@ class NetconfRestServer: @staticmethod @_rest_server.route("/readiness") def _readiness_check(): - try: - NetconfRestServer.__try_connect_to_kafka() + if NetconfRestServer._kafka_client: return Response('Ready', status=200) - except Exception as e: - NetconfRestServer.logger.error("Unable to create a Kafka client", e) + else: return Response('Not Ready', status=503) - # if Kafka is up & running and hostname with port is proper, then client will be created; otherwise - # an error will be reported - @staticmethod - def __try_connect_to_kafka(): - NetconfKafkaClient.create( - host=NetconfRestServer._app_configuration.kafka_host_name, - port=NetconfRestServer._app_configuration.kafka_port - ) - @staticmethod @_rest_server.route("/change_config/<path:module_name>", methods=['POST']) def _change_config(module_name): @@ -78,12 +69,17 @@ class NetconfRestServer: return NetconfRestServer.__create_http_response(202, "Accepted") @staticmethod + @_rest_server.route("/change_history") + def _change_history(): + history = NetconfRestServer._kafka_client.get_all_messages_from(NetconfRestServer._kafka_topic) + return jsonify(history), 200 + + @staticmethod @_rest_server.route("/get_config/<path:module_name>", methods=['GET']) def _get_config(module_name): data = NetconfRestServer._configuration_manager.get_configuration(module_name) return NetconfRestServer.__create_http_response(200, data) - @staticmethod def __create_http_response(code, message): return make_response( diff --git a/src/python/netconf_server/sysrepo_interface/config_change_subscriber.py b/src/python/netconf_server/sysrepo_interface/config_change_subscriber.py index faa8254..e3e09f0 100644 --- a/src/python/netconf_server/sysrepo_interface/config_change_subscriber.py +++ b/src/python/netconf_server/sysrepo_interface/config_change_subscriber.py @@ -22,7 +22,7 @@ import logging from netconf_server.sysrepo_interface.config_change_data import ConfigChangeData -logger = logging.getLogger("sysrep_config_change_subscriber") +logger = logging.getLogger(__name__) class ConfigChangeSubscriber(object): diff --git a/src/python/netconf_server/sysrepo_interface/sysrepo_message_model.py b/src/python/netconf_server/sysrepo_interface/sysrepo_message_model.py new file mode 100644 index 0000000..e21a9b5 --- /dev/null +++ b/src/python/netconf_server/sysrepo_interface/sysrepo_message_model.py @@ -0,0 +1,42 @@ +### +# ============LICENSE_START======================================================= +# Netconf Server +# ================================================================================ +# Copyright (C) 2021 Nokia. All rights reserved. +# ================================================================================ +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============LICENSE_END========================================================= +### + +import sysrepo + + +class SysrepoMessage(object): + + def __init__(self, change): + self._change = change + + def is_modified(self): + return isinstance(self._change, sysrepo.ChangeModified) + + def is_created(self): + return isinstance(self._change, sysrepo.ChangeCreated) + + def value(self): + return self._change.value + + def xpath(self): + return self._change.xpath + + def prev_val(self): + return self._change.prev_val diff --git a/src/python/requirements.txt b/src/python/requirements.txt index eb07c40..1654c50 100644 --- a/src/python/requirements.txt +++ b/src/python/requirements.txt @@ -21,3 +21,4 @@ sysrepo==0.4.2 Flask==1.1.1 kafka-python==2.0.2 +retry==0.9.2 diff --git a/src/python/test-requirements.txt b/src/python/test-requirements.txt index 4983061..75de922 100644 --- a/src/python/test-requirements.txt +++ b/src/python/test-requirements.txt @@ -20,4 +20,5 @@ pytest==6.2.2 kafka-python==2.0.2 +retry==0.9.2 diff --git a/src/python/tests/unit/test_netconf_change_listener.py b/src/python/tests/unit/test_netconf_change_listener.py new file mode 100644 index 0000000..c9e11d3 --- /dev/null +++ b/src/python/tests/unit/test_netconf_change_listener.py @@ -0,0 +1,74 @@ +### +# ============LICENSE_START======================================================= +# Netconf Server +# ================================================================================ +# Copyright (C) 2021 Nokia. All rights reserved. +# ================================================================================ +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============LICENSE_END========================================================= +### +import unittest +from unittest.mock import MagicMock + +import sys + +# we need to mock sysrepo library. It is not possible to install it in the newest version of the Linux distribution +from netconf_server.sysrepo_interface.config_change_data import ConfigChangeData + +sys.modules['sysrepo'] = MagicMock() +from netconf_server.netconf_change_listener import NetconfChangeListener + +KAFKA_TOPIC = "config" + + +class TestNetconfChangeListener(unittest.TestCase): + def test_should_subscribe_on_run(self): + # given + subscriber1 = MagicMock() + subscriber2 = MagicMock() + kafka_client = MagicMock() + session = MagicMock() + netconf_change_listener = NetconfChangeListener([subscriber1, subscriber2], kafka_client, KAFKA_TOPIC) + + # when + netconf_change_listener.run(session) + + # then + subscriber1.subscribe_on_model_change.assert_called_once() + self.assertEqual(subscriber1.callback_function, netconf_change_listener._on_module_configuration_change) + subscriber2.subscribe_on_model_change.assert_called_once() + self.assertEqual(subscriber2.callback_function, netconf_change_listener._on_module_configuration_change) + + def test_should_send_two_changes_at_kafka(self): + # given + subscriber1 = MagicMock() + subscriber2 = MagicMock() + kafka_client = MagicMock() + netconf_change_listener = NetconfChangeListener([subscriber1, subscriber2], kafka_client, KAFKA_TOPIC) + NetconfChangeListener._create_kafka_message = lambda _: MagicMock() + + # when + netconf_change_listener._on_module_configuration_change( + ConfigChangeData( + event="event", + req_id=1, + changes=[MagicMock(), MagicMock()] + ) + ) + + # then + self.assertEqual(kafka_client.send.call_count, 2) + + +if __name__ == '__main__': + unittest.main() diff --git a/src/python/tests/unit/test_netconf_chang_listener.py b/src/python/tests/unit/test_netconf_change_listener_factory.py index c2889f1..113f96c 100644 --- a/src/python/tests/unit/test_netconf_chang_listener.py +++ b/src/python/tests/unit/test_netconf_change_listener_factory.py @@ -20,16 +20,23 @@ import unittest from unittest.mock import MagicMock +import sys + +# we need to mock sysrepo library. It is not possible to install it in the newest version of the Linux distribution +sys.modules['sysrepo'] = MagicMock() + +from netconf_server.netconf_app_configuration import NetconfAppConfiguration from netconf_server.netconf_change_listener_factory import NetconfChangeListenerFactory from tests.mocs.mocked_session import MockedSession -class TestNetconfServer(unittest.TestCase): +class TestNetconfChangeListenerFactory(unittest.TestCase): def test_should_create_and_run_netconf_server_with_one_model(self): # given modules_to_subscribe_names = ["test"] - server = NetconfChangeListenerFactory(modules_to_subscribe_names).create() + factory = TestNetconfChangeListenerFactory._given_netconf_change_listener_factory(modules_to_subscribe_names) + server = factory.create() session = MockedSession() session.subscribe_module_change = MagicMock() @@ -42,7 +49,8 @@ class TestNetconfServer(unittest.TestCase): def test_should_create_and_run_netconf_server_with_multiple_models(self): # given modules_to_subscribe_names = ["test", "test2", "test3"] - server = NetconfChangeListenerFactory(modules_to_subscribe_names).create() + factory = TestNetconfChangeListenerFactory._given_netconf_change_listener_factory(modules_to_subscribe_names) + server = factory.create() session = MockedSession() session.subscribe_module_change = MagicMock() @@ -51,3 +59,12 @@ class TestNetconfServer(unittest.TestCase): # then self.assertEqual(session.subscribe_module_change.call_count, 3) + + @staticmethod + def _given_netconf_change_listener_factory(modules_to_subscribe_names: list) -> NetconfChangeListenerFactory: + app_configuration, _ = NetconfAppConfiguration.get_configuration( + ["../models", "models-configuration.ini", "127.0.0.1", "9092", + "kafka1"]) # type: NetconfAppConfiguration, str + factory = NetconfChangeListenerFactory(modules_to_subscribe_names, app_configuration) + NetconfChangeListenerFactory._try_to_create_kafka_client = lambda host, port: MagicMock() + return factory diff --git a/src/python/tests/unit/test_netconf_kafka_message_factory.py b/src/python/tests/unit/test_netconf_kafka_message_factory.py new file mode 100644 index 0000000..b899bd9 --- /dev/null +++ b/src/python/tests/unit/test_netconf_kafka_message_factory.py @@ -0,0 +1,75 @@ +from collections import namedtuple +from unittest import TestCase +from unittest.mock import MagicMock + +import sys + +# we need to mock sysrepo library. It is not possible to install it in the newest version of the Linux distribution +sys.modules['sysrepo'] = MagicMock() + +from netconf_server.sysrepo_interface.sysrepo_message_model import SysrepoMessage + +from netconf_server.netconf_kafka_message_factory import NetconfKafkaMessageFactory + +SYSREPO_MESSAGE_MODEL = namedtuple('SM', ['value', 'xpath', 'prev_val']) + + +class TestNetconfKafkaMessageFactory(TestCase): + def test_should_return_empty_dict_when_sysrepo_message_is_none(self): + # when + actual = NetconfKafkaMessageFactory.create(None) + + # then + self.assertEqual({}, actual) + + def test_should_prepare_message_for_sysrepo_message_with_status_change_created(self): + # given + s = SYSREPO_MESSAGE_MODEL(44, '/pnf-simulator:config/itemValue1', None) + + sysrepo_message = SysrepoMessage(s) + sysrepo_message.is_modified = lambda: False + sysrepo_message.is_created = lambda: True + + # when + actual = NetconfKafkaMessageFactory.create(sysrepo_message) + + # then + self.assertEqual( + {'type': 'ChangeCreated', 'new': {'path': '/pnf-simulator:config/itemValue1', 'value': 44}}, + actual + ) + + def test_should_prepare_message_for_sysrepo_message_with_status_change_modified_no_old_value(self): + # given + s = SYSREPO_MESSAGE_MODEL(45, '/pnf-simulator:config/itemValue1', None) + + sysrepo_message = SysrepoMessage(s) + sysrepo_message.is_modified = lambda: True + sysrepo_message.is_created = lambda: False + + # when + actual = NetconfKafkaMessageFactory.create(sysrepo_message) + + # then + self.assertEqual( + {'type': 'ChangeModified', 'new': {'path': '/pnf-simulator:config/itemValue1', 'value': 45}}, + actual + ) + + def test_should_prepare_message_for_sysrepo_message_with_status_change_modified_old_value_exists(self): + # given + s = SYSREPO_MESSAGE_MODEL(45, '/pnf-simulator:config/itemValue1', 44) + + sysrepo_message = SysrepoMessage(s) + sysrepo_message.is_modified = lambda: True + sysrepo_message.is_created = lambda: False + + # when + actual = NetconfKafkaMessageFactory.create(sysrepo_message) + + # then + self.assertEqual( + {'type': 'ChangeModified', 'old': {'path': '/pnf-simulator:config/itemValue1', 'value': 44}, 'new': {'path': '/pnf-simulator:config/itemValue1', 'value': 45}}, + actual + ) + |