aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorBogumil Zebek <bogumil.zebek@nokia.com>2021-03-30 15:01:37 +0200
committerZebek Bogumil <bogumil.zebek@nokia.com>2021-04-02 07:39:48 +0200
commitc0d47aca4a13b239e51772fa366fa780ec7812da (patch)
tree7561c4f7654de3dba3d0e1c1de3f8c68fe75fff4
parent06daadc4403397935c647dca2bbb92459864d12a (diff)
Add Kafka support
- send changes on Kafka topic - add endpoint for fetching changes from Kafka Signed-off-by: Bogumil Zebek <bogumil.zebek@nokia.com> Issue-ID: INT-1869 Signed-off-by: Zebek Bogumil <bogumil.zebek@nokia.com> Change-Id: I349fdc4295659fc69407b5b1138281e2673f7938
-rw-r--r--Dockerfile2
-rw-r--r--pom.xml15
-rw-r--r--src/python/netconf_change_listener_application.py11
-rw-r--r--src/python/netconf_rest_application.py9
-rw-r--r--src/python/netconf_server/kafka_consumer_factory.py35
-rw-r--r--src/python/netconf_server/netconf_app_configuration.py3
-rw-r--r--src/python/netconf_server/netconf_change_listener.py33
-rw-r--r--src/python/netconf_server/netconf_change_listener_factory.py16
-rw-r--r--src/python/netconf_server/netconf_kafka_client.py31
-rw-r--r--src/python/netconf_server/netconf_kafka_message_factory.py54
-rw-r--r--src/python/netconf_server/netconf_rest_server.py34
-rw-r--r--src/python/netconf_server/sysrepo_interface/config_change_subscriber.py2
-rw-r--r--src/python/netconf_server/sysrepo_interface/sysrepo_message_model.py42
-rw-r--r--src/python/requirements.txt1
-rw-r--r--src/python/test-requirements.txt1
-rw-r--r--src/python/tests/unit/test_netconf_change_listener.py74
-rw-r--r--src/python/tests/unit/test_netconf_change_listener_factory.py (renamed from src/python/tests/unit/test_netconf_chang_listener.py)23
-rw-r--r--src/python/tests/unit/test_netconf_kafka_message_factory.py75
18 files changed, 407 insertions, 54 deletions
diff --git a/Dockerfile b/Dockerfile
index 1303b40..2ac0c5f 100644
--- a/Dockerfile
+++ b/Dockerfile
@@ -20,6 +20,6 @@ RUN mkdir -p /resources/certs && \
ENV ENABLE_TLS=false
ENV KAFKA_HOST_NAME="kafka1"
ENV KAFKA_PORT=9092
-ENV KAFKA_TOPIC="config:1:1"
+ENV KAFKA_TOPIC="config"
ENTRYPOINT ["./scripts/set-up-netopeer.sh", "/resources/models", "/resources/certs"]
diff --git a/pom.xml b/pom.xml
index 3071906..b161eff 100644
--- a/pom.xml
+++ b/pom.xml
@@ -68,6 +68,21 @@
</arguments>
</configuration>
</execution>
+ <execution>
+ <id>python-clean</id>
+ <phase>clean</phase>
+ <goals>
+ <goal>exec</goal>
+ </goals>
+ <configuration>
+ <workingDirectory>./src/python</workingDirectory>
+ <executable>rm</executable>
+ <arguments>
+ <argument>-rf</argument>
+ <argument>.tox</argument>
+ </arguments>
+ </configuration>
+ </execution>
</executions>
</plugin>
</plugins>
diff --git a/src/python/netconf_change_listener_application.py b/src/python/netconf_change_listener_application.py
index 34b00b3..400ff3d 100644
--- a/src/python/netconf_change_listener_application.py
+++ b/src/python/netconf_change_listener_application.py
@@ -21,7 +21,6 @@ import asyncio
import sys
import logging
-from typing import Tuple
from netconf_server.netconf_app_configuration import NetconfAppConfiguration
@@ -33,7 +32,7 @@ from netconf_server.sysrepo_interface.sysrepo_client import SysrepoClient
logging.basicConfig(
handlers=[logging.StreamHandler(), logging.FileHandler("/logs/netconf_change_listener.log")],
- level=logging.DEBUG
+ level=logging.INFO
)
logger = logging.getLogger("netconf_change_listener")
@@ -43,9 +42,9 @@ def run_server_forever(session, connection, change_listener: NetconfChangeListen
asyncio.get_event_loop().run_forever()
-def create_change_listener(module_configuration_file_path: str) -> NetconfChangeListener:
- configuration = SysrepoConfigurationLoader.load_configuration(module_configuration_file_path)
- return NetconfChangeListenerFactory(configuration.models_to_subscribe_to).create()
+def create_change_listener(app_configuration: NetconfAppConfiguration) -> NetconfChangeListener:
+ configuration = SysrepoConfigurationLoader.load_configuration(app_configuration.module_configuration_file_path)
+ return NetconfChangeListenerFactory(configuration.models_to_subscribe_to, app_configuration).create()
if __name__ == "__main__":
@@ -54,7 +53,7 @@ if __name__ == "__main__":
if app_configuration:
logger.info("Netconf change listener application configuration: {}".format(app_configuration))
try:
- netconf_change_listener = create_change_listener(app_configuration.module_configuration_file_path)
+ netconf_change_listener = create_change_listener(app_configuration)
SysrepoClient().run_in_session(run_server_forever, netconf_change_listener)
except ConfigLoadingException:
logger.error("File to load configuration from file %s" % app_configuration.module_configuration_file_path)
diff --git a/src/python/netconf_rest_application.py b/src/python/netconf_rest_application.py
index 0a040c9..b13a084 100644
--- a/src/python/netconf_rest_application.py
+++ b/src/python/netconf_rest_application.py
@@ -21,6 +21,7 @@ import sys
import logging
from netconf_server.netconf_app_configuration import NetconfAppConfiguration
+from netconf_server.netconf_kafka_client import provide_configured_kafka_client
from netconf_server.netconf_rest_server import NetconfRestServer
from netconf_server.sysrepo_configuration.sysrepo_configuration_manager import SysrepoConfigurationManager
@@ -28,14 +29,18 @@ from netconf_server.sysrepo_interface.sysrepo_client import SysrepoClient
logging.basicConfig(
handlers=[logging.StreamHandler(), logging.FileHandler("/logs/netconf_rest_application.log")],
- level=logging.DEBUG
+ level=logging.INFO
)
logger = logging.getLogger("netconf_rest_application")
def start_rest_server(session, connection, server_rest: NetconfRestServer, netconf_app_configuration: NetconfAppConfiguration):
sysrepo_cfg_manager = create_conf_manager(session, connection)
- server_rest.start(sysrepo_cfg_manager, netconf_app_configuration)
+ kafka_client = provide_configured_kafka_client(
+ netconf_app_configuration.kafka_host_name,
+ netconf_app_configuration.kafka_port
+ )
+ server_rest.start(sysrepo_cfg_manager, kafka_client, netconf_app_configuration.kafka_topic)
def create_rest_server() -> NetconfRestServer:
diff --git a/src/python/netconf_server/kafka_consumer_factory.py b/src/python/netconf_server/kafka_consumer_factory.py
new file mode 100644
index 0000000..332cd21
--- /dev/null
+++ b/src/python/netconf_server/kafka_consumer_factory.py
@@ -0,0 +1,35 @@
+###
+# ============LICENSE_START=======================================================
+# Netconf Server
+# ================================================================================
+# Copyright (C) 2021 Nokia. All rights reserved.
+# ================================================================================
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# ============LICENSE_END=========================================================
+###
+from json import loads
+
+from kafka import KafkaConsumer
+
+STANDARD_CHARSETS_UTF8 = 'utf-8'
+
+
+def provide_kafka_consumer(topic: str, server: str) -> KafkaConsumer:
+ return KafkaConsumer(topic,
+ consumer_timeout_ms=1000,
+ group_id='netconf-group',
+ auto_offset_reset='earliest',
+ enable_auto_commit=False,
+ bootstrap_servers=[server],
+ value_deserializer=lambda x: loads(x.decode(STANDARD_CHARSETS_UTF8))
+ )
diff --git a/src/python/netconf_server/netconf_app_configuration.py b/src/python/netconf_server/netconf_app_configuration.py
index 0e25a41..1aaf12c 100644
--- a/src/python/netconf_server/netconf_app_configuration.py
+++ b/src/python/netconf_server/netconf_app_configuration.py
@@ -38,6 +38,7 @@ class NetconfAppConfiguration(object):
self.kafka_topic = kafka_topic
def __str__(self):
- return "NetconfAppConfiguration[configuration_file -> '{}', kafka_host_name -> '{}', kafka_port -> '{}', kafka_topic -> '{}']"\
+ return "NetconfAppConfiguration[configuration_file -> '{}', " \
+ "kafka_host_name -> '{}', kafka_port -> '{}', kafka_topic -> '{}']"\
.format(self.module_configuration_file_path, self.kafka_host_name, self.kafka_port, self.kafka_topic)
diff --git a/src/python/netconf_server/netconf_change_listener.py b/src/python/netconf_server/netconf_change_listener.py
index 44910d1..628f757 100644
--- a/src/python/netconf_server/netconf_change_listener.py
+++ b/src/python/netconf_server/netconf_change_listener.py
@@ -19,21 +19,46 @@
###
import logging
+from kafka.producer.future import FutureRecordMetadata
+
+
+from netconf_server.netconf_kafka_client import NetconfKafkaClient
+from netconf_server.netconf_kafka_message_factory import NetconfKafkaMessageFactory
from netconf_server.sysrepo_interface.config_change_data import ConfigChangeData
+from netconf_server.sysrepo_interface.sysrepo_message_model import SysrepoMessage
logger = logging.getLogger(__name__)
class NetconfChangeListener(object):
- def __init__(self, subscriptions: list):
+ def __init__(self, subscriptions: list, kafka_client: NetconfKafkaClient, topic: str):
self.subscriptions = subscriptions
+ self.kafka_client = kafka_client
+ self.topic = topic
def run(self, session):
for subscription in self.subscriptions:
- subscription.callback_function = self.__on_module_configuration_change
+ subscription.callback_function = self._on_module_configuration_change
subscription.subscribe_on_model_change(session)
+ def _on_module_configuration_change(self, config_change_data: ConfigChangeData):
+ logger.info("Received module changed: {} , {} ".format(config_change_data.event, config_change_data.changes))
+ if config_change_data.event != "done":
+ self._send_change_to_kafka(config_change_data)
+
+ def _send_change_to_kafka(self, config_change_data):
+ for change in config_change_data.changes:
+ try:
+ kafka_message = NetconfChangeListener._create_kafka_message(change)
+ logging.info("Sending message '{}' to Kafka '{}' topic".format(kafka_message, self.topic))
+ response = self.kafka_client.send(self.topic, kafka_message) # type: FutureRecordMetadata
+ logging.info("Response from Kafka: {}".format(response.get(timeout=1)))
+
+ except Exception as e:
+ logger.error("Exception occurred during handling of sysrepo config change", e)
+ logger.info("Module changes sent to Kafka. Operation finished.")
+
@staticmethod
- def __on_module_configuration_change(config_change_data: ConfigChangeData):
- logger.info("Received module changed: %s , %s " % (config_change_data.event, config_change_data.changes))
+ def _create_kafka_message(change):
+ return NetconfKafkaMessageFactory.create(SysrepoMessage(change))
diff --git a/src/python/netconf_server/netconf_change_listener_factory.py b/src/python/netconf_server/netconf_change_listener_factory.py
index fa5e071..9b9ff24 100644
--- a/src/python/netconf_server/netconf_change_listener_factory.py
+++ b/src/python/netconf_server/netconf_change_listener_factory.py
@@ -19,7 +19,10 @@
###
import logging
+
+from netconf_server.netconf_app_configuration import NetconfAppConfiguration
from netconf_server.netconf_change_listener import NetconfChangeListener
+from netconf_server.netconf_kafka_client import NetconfKafkaClient, provide_configured_kafka_client
from netconf_server.sysrepo_interface.config_change_subscriber import ConfigChangeSubscriber
logger = logging.getLogger(__name__)
@@ -27,8 +30,9 @@ logger = logging.getLogger(__name__)
class NetconfChangeListenerFactory(object):
- def __init__(self, modules_to_subscribe_names: list):
+ def __init__(self, modules_to_subscribe_names: list, app_configuration: NetconfAppConfiguration):
self.modules_to_subscribe_names = modules_to_subscribe_names
+ self.app_configuration = app_configuration
def create(self) -> NetconfChangeListener:
subscriptions = list()
@@ -36,5 +40,13 @@ class NetconfChangeListenerFactory(object):
subscriptions.append(
ConfigChangeSubscriber(module_name)
)
- return NetconfChangeListener(subscriptions)
+ kafka_client = NetconfChangeListenerFactory._try_to_create_kafka_client(
+ self.app_configuration.kafka_host_name,
+ self.app_configuration.kafka_port
+ )
+
+ return NetconfChangeListener(subscriptions, kafka_client, self.app_configuration.kafka_topic)
+ @staticmethod
+ def _try_to_create_kafka_client(kafka_host_name: str, kafka_port: int):
+ return provide_configured_kafka_client(kafka_host_name, kafka_port) # type: NetconfKafkaClient
diff --git a/src/python/netconf_server/netconf_kafka_client.py b/src/python/netconf_server/netconf_kafka_client.py
index b0effc3..8687802 100644
--- a/src/python/netconf_server/netconf_kafka_client.py
+++ b/src/python/netconf_server/netconf_kafka_client.py
@@ -18,32 +18,33 @@
# ============LICENSE_END=========================================================
###
import logging
-from json import dumps, loads
-from typing import Callable
+from json import dumps
+from typing import Callable, Any
from kafka import KafkaProducer, KafkaConsumer
+from kafka.errors import NoBrokersAvailable
from kafka.producer.future import FutureRecordMetadata
+from retry import retry
+
+from netconf_server.kafka_consumer_factory import provide_kafka_consumer
STANDARD_CHARSETS_UTF8 = 'utf-8'
-logger = logging.getLogger("netconf_kafka_client")
+logger = logging.getLogger(__name__)
-def provide_kafka_consumer(topic: str, server: str) -> KafkaConsumer:
- return KafkaConsumer(topic,
- consumer_timeout_ms=1000,
- group_id='netconf-group',
- auto_offset_reset='earliest',
- enable_auto_commit=True,
- bootstrap_servers=[server],
- value_deserializer=lambda x: loads(x.decode(STANDARD_CHARSETS_UTF8))
- )
+@retry(NoBrokersAvailable, tries=3, delay=5)
+def provide_configured_kafka_client(kafka_host_name, kafka_port):
+ return NetconfKafkaClient.create(
+ host=kafka_host_name,
+ port=kafka_port
+ ) # type: NetconfKafkaClient
class NetconfKafkaClient(object):
@staticmethod
- def create(host: str, port: int) -> object:
+ def create(host: str, port: int):
server = "{}:{}".format(host, port)
producer = KafkaProducer(
bootstrap_servers=server,
@@ -59,7 +60,7 @@ class NetconfKafkaClient(object):
self._producer = producer
self._get_kafka_consumer = get_kafka_consumer_func
- def send(self, topic: str, value: str) -> FutureRecordMetadata:
+ def send(self, topic: str, value: Any) -> FutureRecordMetadata:
return self._producer.send(
topic=topic,
value=value
@@ -72,7 +73,7 @@ class NetconfKafkaClient(object):
consumer = self._get_kafka_consumer(topic)
for message in consumer:
message_value = message.value
- logger.debug("Fetched config change %s" % message_value)
+ logger.info("Fetched config change %s" % message_value)
messages.append(message_value)
return messages
diff --git a/src/python/netconf_server/netconf_kafka_message_factory.py b/src/python/netconf_server/netconf_kafka_message_factory.py
new file mode 100644
index 0000000..e39556e
--- /dev/null
+++ b/src/python/netconf_server/netconf_kafka_message_factory.py
@@ -0,0 +1,54 @@
+###
+# ============LICENSE_START=======================================================
+# Netconf Server
+# ================================================================================
+# Copyright (C) 2021 Nokia. All rights reserved.
+# ================================================================================
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# ============LICENSE_END=========================================================
+###
+import logging
+
+from netconf_server.sysrepo_interface.sysrepo_message_model import SysrepoMessage
+
+logger = logging.getLogger(__name__)
+
+
+class NetconfKafkaMessageFactory(object):
+
+ @classmethod
+ def create(cls, change: SysrepoMessage) -> dict:
+ message = {}
+ if change and change.is_modified():
+ logger.debug("Parsing change modified")
+ message = cls._create_modified_message(change)
+ elif change and change.is_created():
+ logger.debug("Parsing change created")
+ message = cls._create_created_message(change)
+ return message
+
+ @classmethod
+ def _create_created_message(cls, change: SysrepoMessage) -> dict:
+ message = {"type": "ChangeCreated"}
+ if change.value():
+ message["new"] = {"path": change.xpath(), "value": change.value()}
+ return message
+
+ @classmethod
+ def _create_modified_message(cls, change: SysrepoMessage) -> dict:
+ message = {"type": "ChangeModified"}
+ if change.prev_val():
+ message["old"] = {"path": change.xpath(), "value": change.prev_val()}
+ if change.value():
+ message["new"] = {"path": change.xpath(), "value": change.value()}
+ return message
diff --git a/src/python/netconf_server/netconf_rest_server.py b/src/python/netconf_server/netconf_rest_server.py
index 8568454..edaa6e8 100644
--- a/src/python/netconf_server/netconf_rest_server.py
+++ b/src/python/netconf_server/netconf_rest_server.py
@@ -18,9 +18,8 @@
# ============LICENSE_END=========================================================
###
-from flask import Flask, logging, make_response, Response, request
+from flask import Flask, logging, make_response, Response, request, jsonify
-from netconf_server.netconf_app_configuration import NetconfAppConfiguration
from netconf_server.netconf_kafka_client import NetconfKafkaClient
from netconf_server.sysrepo_configuration.sysrepo_configuration_manager import SysrepoConfigurationManager
@@ -29,7 +28,8 @@ class NetconfRestServer:
_rest_server: Flask = Flask("server")
logger = logging.create_logger(_rest_server)
_configuration_manager: SysrepoConfigurationManager
- _app_configuration: NetconfAppConfiguration
+ _kafka_topic: str
+ _kafka_client: NetconfKafkaClient
def __init__(self, host='0.0.0.0', port=6555):
self._host = host
@@ -37,9 +37,11 @@ class NetconfRestServer:
def start(self,
configuration_manager: SysrepoConfigurationManager,
- netconf_app_configuration: NetconfAppConfiguration):
+ kafka_client: NetconfKafkaClient,
+ kafka_topic: str):
NetconfRestServer._configuration_manager = configuration_manager
- NetconfRestServer._app_configuration = netconf_app_configuration
+ NetconfRestServer._kafka_client = kafka_client
+ NetconfRestServer._kafka_topic = kafka_topic
Flask.run(
NetconfRestServer._rest_server,
host=self._host,
@@ -54,22 +56,11 @@ class NetconfRestServer:
@staticmethod
@_rest_server.route("/readiness")
def _readiness_check():
- try:
- NetconfRestServer.__try_connect_to_kafka()
+ if NetconfRestServer._kafka_client:
return Response('Ready', status=200)
- except Exception as e:
- NetconfRestServer.logger.error("Unable to create a Kafka client", e)
+ else:
return Response('Not Ready', status=503)
- # if Kafka is up & running and hostname with port is proper, then client will be created; otherwise
- # an error will be reported
- @staticmethod
- def __try_connect_to_kafka():
- NetconfKafkaClient.create(
- host=NetconfRestServer._app_configuration.kafka_host_name,
- port=NetconfRestServer._app_configuration.kafka_port
- )
-
@staticmethod
@_rest_server.route("/change_config/<path:module_name>", methods=['POST'])
def _change_config(module_name):
@@ -78,12 +69,17 @@ class NetconfRestServer:
return NetconfRestServer.__create_http_response(202, "Accepted")
@staticmethod
+ @_rest_server.route("/change_history")
+ def _change_history():
+ history = NetconfRestServer._kafka_client.get_all_messages_from(NetconfRestServer._kafka_topic)
+ return jsonify(history), 200
+
+ @staticmethod
@_rest_server.route("/get_config/<path:module_name>", methods=['GET'])
def _get_config(module_name):
data = NetconfRestServer._configuration_manager.get_configuration(module_name)
return NetconfRestServer.__create_http_response(200, data)
-
@staticmethod
def __create_http_response(code, message):
return make_response(
diff --git a/src/python/netconf_server/sysrepo_interface/config_change_subscriber.py b/src/python/netconf_server/sysrepo_interface/config_change_subscriber.py
index faa8254..e3e09f0 100644
--- a/src/python/netconf_server/sysrepo_interface/config_change_subscriber.py
+++ b/src/python/netconf_server/sysrepo_interface/config_change_subscriber.py
@@ -22,7 +22,7 @@ import logging
from netconf_server.sysrepo_interface.config_change_data import ConfigChangeData
-logger = logging.getLogger("sysrep_config_change_subscriber")
+logger = logging.getLogger(__name__)
class ConfigChangeSubscriber(object):
diff --git a/src/python/netconf_server/sysrepo_interface/sysrepo_message_model.py b/src/python/netconf_server/sysrepo_interface/sysrepo_message_model.py
new file mode 100644
index 0000000..e21a9b5
--- /dev/null
+++ b/src/python/netconf_server/sysrepo_interface/sysrepo_message_model.py
@@ -0,0 +1,42 @@
+###
+# ============LICENSE_START=======================================================
+# Netconf Server
+# ================================================================================
+# Copyright (C) 2021 Nokia. All rights reserved.
+# ================================================================================
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# ============LICENSE_END=========================================================
+###
+
+import sysrepo
+
+
+class SysrepoMessage(object):
+
+ def __init__(self, change):
+ self._change = change
+
+ def is_modified(self):
+ return isinstance(self._change, sysrepo.ChangeModified)
+
+ def is_created(self):
+ return isinstance(self._change, sysrepo.ChangeCreated)
+
+ def value(self):
+ return self._change.value
+
+ def xpath(self):
+ return self._change.xpath
+
+ def prev_val(self):
+ return self._change.prev_val
diff --git a/src/python/requirements.txt b/src/python/requirements.txt
index eb07c40..1654c50 100644
--- a/src/python/requirements.txt
+++ b/src/python/requirements.txt
@@ -21,3 +21,4 @@
sysrepo==0.4.2
Flask==1.1.1
kafka-python==2.0.2
+retry==0.9.2
diff --git a/src/python/test-requirements.txt b/src/python/test-requirements.txt
index 4983061..75de922 100644
--- a/src/python/test-requirements.txt
+++ b/src/python/test-requirements.txt
@@ -20,4 +20,5 @@
pytest==6.2.2
kafka-python==2.0.2
+retry==0.9.2
diff --git a/src/python/tests/unit/test_netconf_change_listener.py b/src/python/tests/unit/test_netconf_change_listener.py
new file mode 100644
index 0000000..c9e11d3
--- /dev/null
+++ b/src/python/tests/unit/test_netconf_change_listener.py
@@ -0,0 +1,74 @@
+###
+# ============LICENSE_START=======================================================
+# Netconf Server
+# ================================================================================
+# Copyright (C) 2021 Nokia. All rights reserved.
+# ================================================================================
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# ============LICENSE_END=========================================================
+###
+import unittest
+from unittest.mock import MagicMock
+
+import sys
+
+# we need to mock sysrepo library. It is not possible to install it in the newest version of the Linux distribution
+from netconf_server.sysrepo_interface.config_change_data import ConfigChangeData
+
+sys.modules['sysrepo'] = MagicMock()
+from netconf_server.netconf_change_listener import NetconfChangeListener
+
+KAFKA_TOPIC = "config"
+
+
+class TestNetconfChangeListener(unittest.TestCase):
+ def test_should_subscribe_on_run(self):
+ # given
+ subscriber1 = MagicMock()
+ subscriber2 = MagicMock()
+ kafka_client = MagicMock()
+ session = MagicMock()
+ netconf_change_listener = NetconfChangeListener([subscriber1, subscriber2], kafka_client, KAFKA_TOPIC)
+
+ # when
+ netconf_change_listener.run(session)
+
+ # then
+ subscriber1.subscribe_on_model_change.assert_called_once()
+ self.assertEqual(subscriber1.callback_function, netconf_change_listener._on_module_configuration_change)
+ subscriber2.subscribe_on_model_change.assert_called_once()
+ self.assertEqual(subscriber2.callback_function, netconf_change_listener._on_module_configuration_change)
+
+ def test_should_send_two_changes_at_kafka(self):
+ # given
+ subscriber1 = MagicMock()
+ subscriber2 = MagicMock()
+ kafka_client = MagicMock()
+ netconf_change_listener = NetconfChangeListener([subscriber1, subscriber2], kafka_client, KAFKA_TOPIC)
+ NetconfChangeListener._create_kafka_message = lambda _: MagicMock()
+
+ # when
+ netconf_change_listener._on_module_configuration_change(
+ ConfigChangeData(
+ event="event",
+ req_id=1,
+ changes=[MagicMock(), MagicMock()]
+ )
+ )
+
+ # then
+ self.assertEqual(kafka_client.send.call_count, 2)
+
+
+if __name__ == '__main__':
+ unittest.main()
diff --git a/src/python/tests/unit/test_netconf_chang_listener.py b/src/python/tests/unit/test_netconf_change_listener_factory.py
index c2889f1..113f96c 100644
--- a/src/python/tests/unit/test_netconf_chang_listener.py
+++ b/src/python/tests/unit/test_netconf_change_listener_factory.py
@@ -20,16 +20,23 @@
import unittest
from unittest.mock import MagicMock
+import sys
+
+# we need to mock sysrepo library. It is not possible to install it in the newest version of the Linux distribution
+sys.modules['sysrepo'] = MagicMock()
+
+from netconf_server.netconf_app_configuration import NetconfAppConfiguration
from netconf_server.netconf_change_listener_factory import NetconfChangeListenerFactory
from tests.mocs.mocked_session import MockedSession
-class TestNetconfServer(unittest.TestCase):
+class TestNetconfChangeListenerFactory(unittest.TestCase):
def test_should_create_and_run_netconf_server_with_one_model(self):
# given
modules_to_subscribe_names = ["test"]
- server = NetconfChangeListenerFactory(modules_to_subscribe_names).create()
+ factory = TestNetconfChangeListenerFactory._given_netconf_change_listener_factory(modules_to_subscribe_names)
+ server = factory.create()
session = MockedSession()
session.subscribe_module_change = MagicMock()
@@ -42,7 +49,8 @@ class TestNetconfServer(unittest.TestCase):
def test_should_create_and_run_netconf_server_with_multiple_models(self):
# given
modules_to_subscribe_names = ["test", "test2", "test3"]
- server = NetconfChangeListenerFactory(modules_to_subscribe_names).create()
+ factory = TestNetconfChangeListenerFactory._given_netconf_change_listener_factory(modules_to_subscribe_names)
+ server = factory.create()
session = MockedSession()
session.subscribe_module_change = MagicMock()
@@ -51,3 +59,12 @@ class TestNetconfServer(unittest.TestCase):
# then
self.assertEqual(session.subscribe_module_change.call_count, 3)
+
+ @staticmethod
+ def _given_netconf_change_listener_factory(modules_to_subscribe_names: list) -> NetconfChangeListenerFactory:
+ app_configuration, _ = NetconfAppConfiguration.get_configuration(
+ ["../models", "models-configuration.ini", "127.0.0.1", "9092",
+ "kafka1"]) # type: NetconfAppConfiguration, str
+ factory = NetconfChangeListenerFactory(modules_to_subscribe_names, app_configuration)
+ NetconfChangeListenerFactory._try_to_create_kafka_client = lambda host, port: MagicMock()
+ return factory
diff --git a/src/python/tests/unit/test_netconf_kafka_message_factory.py b/src/python/tests/unit/test_netconf_kafka_message_factory.py
new file mode 100644
index 0000000..b899bd9
--- /dev/null
+++ b/src/python/tests/unit/test_netconf_kafka_message_factory.py
@@ -0,0 +1,75 @@
+from collections import namedtuple
+from unittest import TestCase
+from unittest.mock import MagicMock
+
+import sys
+
+# we need to mock sysrepo library. It is not possible to install it in the newest version of the Linux distribution
+sys.modules['sysrepo'] = MagicMock()
+
+from netconf_server.sysrepo_interface.sysrepo_message_model import SysrepoMessage
+
+from netconf_server.netconf_kafka_message_factory import NetconfKafkaMessageFactory
+
+SYSREPO_MESSAGE_MODEL = namedtuple('SM', ['value', 'xpath', 'prev_val'])
+
+
+class TestNetconfKafkaMessageFactory(TestCase):
+ def test_should_return_empty_dict_when_sysrepo_message_is_none(self):
+ # when
+ actual = NetconfKafkaMessageFactory.create(None)
+
+ # then
+ self.assertEqual({}, actual)
+
+ def test_should_prepare_message_for_sysrepo_message_with_status_change_created(self):
+ # given
+ s = SYSREPO_MESSAGE_MODEL(44, '/pnf-simulator:config/itemValue1', None)
+
+ sysrepo_message = SysrepoMessage(s)
+ sysrepo_message.is_modified = lambda: False
+ sysrepo_message.is_created = lambda: True
+
+ # when
+ actual = NetconfKafkaMessageFactory.create(sysrepo_message)
+
+ # then
+ self.assertEqual(
+ {'type': 'ChangeCreated', 'new': {'path': '/pnf-simulator:config/itemValue1', 'value': 44}},
+ actual
+ )
+
+ def test_should_prepare_message_for_sysrepo_message_with_status_change_modified_no_old_value(self):
+ # given
+ s = SYSREPO_MESSAGE_MODEL(45, '/pnf-simulator:config/itemValue1', None)
+
+ sysrepo_message = SysrepoMessage(s)
+ sysrepo_message.is_modified = lambda: True
+ sysrepo_message.is_created = lambda: False
+
+ # when
+ actual = NetconfKafkaMessageFactory.create(sysrepo_message)
+
+ # then
+ self.assertEqual(
+ {'type': 'ChangeModified', 'new': {'path': '/pnf-simulator:config/itemValue1', 'value': 45}},
+ actual
+ )
+
+ def test_should_prepare_message_for_sysrepo_message_with_status_change_modified_old_value_exists(self):
+ # given
+ s = SYSREPO_MESSAGE_MODEL(45, '/pnf-simulator:config/itemValue1', 44)
+
+ sysrepo_message = SysrepoMessage(s)
+ sysrepo_message.is_modified = lambda: True
+ sysrepo_message.is_created = lambda: False
+
+ # when
+ actual = NetconfKafkaMessageFactory.create(sysrepo_message)
+
+ # then
+ self.assertEqual(
+ {'type': 'ChangeModified', 'old': {'path': '/pnf-simulator:config/itemValue1', 'value': 44}, 'new': {'path': '/pnf-simulator:config/itemValue1', 'value': 45}},
+ actual
+ )
+