diff options
author | Zebek Bogumil <bogumil.zebek@nokia.com> | 2021-03-29 07:38:36 +0200 |
---|---|---|
committer | Zebek Bogumil <bogumil.zebek@nokia.com> | 2021-03-29 09:36:27 +0200 |
commit | 18c0838e23209d58881dc59a4ae24a2111883ac2 (patch) | |
tree | 51d7ef3953667f2e00c733d53d91394b7818a1ce | |
parent | 0c3c63ccbe28a18f57b8341a73eca08a6ad832fc (diff) |
Add Kafka envs
Signed-off-by: Bogumil Zebek <bogumil.zebek@nokia.com>
Issue-ID: INT-1869
Change-Id: Id8f4d3061fc16e8cb2c16bf32058ec8527e4c6c3
23 files changed, 321 insertions, 28 deletions
@@ -4,3 +4,5 @@ **/logs **/venv **/__pycache__ + +.tox/ @@ -18,5 +18,8 @@ RUN mkdir -p /resources/certs && \ ./scripts/generate-certificates.sh /resources/certs ENV ENABLE_TLS=false +ENV KAFKA_HOST_NAME="kafka1" +ENV KAFKA_PORT=9092 +ENV KAFKA_TOPIC="config:1:1" ENTRYPOINT ["./scripts/set-up-netopeer.sh", "/resources/models", "/resources/certs"] diff --git a/docker-compose.yml b/docker-compose.yml index fbc9516..bd268a8 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -1,11 +1,43 @@ version: '3' services: + zookeeper: + image: wurstmeister/zookeeper + ports: + - "2181:2181" + networks: + - netconfnetwork + + kafka1: + image: wurstmeister/kafka:1.1.0 + ports: + - "9092:9092" + hostname: kafka1 + networks: + - netconfnetwork + environment: + KAFKA_ADVERTISED_PORT: 9092 + KAFKA_ADVERTISED_HOST_NAME: kafka1 + KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 + KAFKA_CREATE_TOPICS: "config:1:1" + KAFKA_DELETE_RETENTION_MS: 604800000 + KAFKA_LOG_CLEANER_DELETE_RETENTION_MS: 604800000 + depends_on: + - zookeeper netconf-server: container_name: netconf-server image: onap/org.onap.integration.nfsimulator.netconfserver:latest ports: - - "830:830" - - "6513:6513" - - "6555:6555" + - "830:830" + - "6513:6513" + - "6555:6555" + networks: + - netconfnetwork + depends_on: + - zookeeper + - kafka1 + +networks: + netconfnetwork: + driver: bridge diff --git a/scripts/run-netconf-server-application.sh b/scripts/run-netconf-server-application.sh index b07da8a..81ce10f 100755 --- a/scripts/run-netconf-server-application.sh +++ b/scripts/run-netconf-server-application.sh @@ -19,19 +19,20 @@ # ============LICENSE_END========================================================= ### -if [ "$#" -eq 2 ]; then +if [ "$#" -eq 5 ]; then ## Set up variable MODELS_CONFIG_PATH=$1 MODELS_CONFIG_NAME=$2 + KAFKA_HOST_NAME=$3 + KAFKA_HOST_PORT=$4 + KAFKA_TOPIC=$5 - echo "Starting NETCONF Change listener" - python3 ./application/netconf_change_listener_application.py $MODELS_CONFIG_PATH/$MODELS_CONFIG_NAME & - - - echo "Starting NETCONF Rest server" - python3 ./application/netconf_rest_application.py $MODELS_CONFIG_PATH/$MODELS_CONFIG_NAME & + echo "[INFO] Starting NETCONF Change listener" + python3 ./application/netconf_change_listener_application.py $MODELS_CONFIG_PATH/$MODELS_CONFIG_NAME $KAFKA_HOST_NAME $KAFKA_HOST_PORT $KAFKA_TOPIC & + echo "[INFO] Starting NETCONF Rest server" + python3 ./application/netconf_rest_application.py $MODELS_CONFIG_PATH/$MODELS_CONFIG_NAME $KAFKA_HOST_NAME $KAFKA_HOST_PORT $KAFKA_TOPIC & else echo "[ERROR] Invalid number of arguments. Please provide all required arguments." diff --git a/scripts/set-up-netopeer.sh b/scripts/set-up-netopeer.sh index e7b4f76..ab285d8 100755 --- a/scripts/set-up-netopeer.sh +++ b/scripts/set-up-netopeer.sh @@ -19,11 +19,17 @@ # ============LICENSE_END========================================================= ### -if [ "$#" -ge 1 ]; then +echo "[INFO] Starting NETCONF Server app configuration ..." + +if [ "$#" -gt 1 ]; then ## Set up variable SCRIPTS_DIR=$PWD/"$(dirname $0)" enable_tls=${ENABLE_TLS:-false} + kafka_host_name=${KAFKA_HOST_NAME:-"localhost"} + kafka_port=${KAFKA_PORT:-9092} + kafka_topic=${KAFKA_TOPIC=-"config:1:1"} + models_config_path=$1 models_configuration_file_name=${MODELS_CONFIGURATION_FILE_NAME:-models-configuration.ini} ## Install all modules from given directory @@ -32,19 +38,22 @@ if [ "$#" -ge 1 ]; then ## If TLS is enabled start initializing certificates if [[ "$enable_tls" == "true" ]]; then if [ "$#" -ge 2 ]; then - echo "initializing TLS" + echo "[INFO] Initializing TLS" $SCRIPTS_DIR/install-tls-with-custom-certificates.sh $SCRIPTS_DIR/tls $2 else - echo "Missing second argument: path to file with certificates for TLS." + echo "[ERROR] Missing second argument: path to file with certificates for TLS." fi fi + echo "[INFO] NETCONF Server configuration finished." + ## Run netconf server application - $SCRIPTS_DIR/run-netconf-server-application.sh $1 $models_configuration_file_name + $SCRIPTS_DIR/run-netconf-server-application.sh $models_config_path $models_configuration_file_name $kafka_host_name $kafka_port $kafka_topic ## Run sysrepo supervisor + echo "[INFO] Starting Netopeer Server ..." /usr/bin/supervisord -c /etc/supervisord.conf else - echo "Missing first argument: path to file with YANG models." + echo "[ERROR] Unable to configure application. Provide all required arguments." fi diff --git a/src/python/netconf_change_listener_application.py b/src/python/netconf_change_listener_application.py index a4dd8bd..34b00b3 100644 --- a/src/python/netconf_change_listener_application.py +++ b/src/python/netconf_change_listener_application.py @@ -20,8 +20,10 @@ import asyncio import sys import logging -from netconf_server.netconf_rest_server import NetconfRestServer -from netconf_server.sysrepo_configuration.sysrepo_configuration_manager import SysrepoConfigurationManager + +from typing import Tuple + +from netconf_server.netconf_app_configuration import NetconfAppConfiguration from netconf_server.netconf_change_listener import NetconfChangeListener from netconf_server.netconf_change_listener_factory import NetconfChangeListenerFactory @@ -41,17 +43,20 @@ def run_server_forever(session, connection, change_listener: NetconfChangeListen asyncio.get_event_loop().run_forever() -def create_change_listener() -> NetconfChangeListener: - configuration = SysrepoConfigurationLoader.load_configuration(sys.argv[1]) +def create_change_listener(module_configuration_file_path: str) -> NetconfChangeListener: + configuration = SysrepoConfigurationLoader.load_configuration(module_configuration_file_path) return NetconfChangeListenerFactory(configuration.models_to_subscribe_to).create() if __name__ == "__main__": - if len(sys.argv) >= 2: + app_configuration, error = NetconfAppConfiguration.get_configuration(sys.argv) # type: NetconfAppConfiguration, str + + if app_configuration: + logger.info("Netconf change listener application configuration: {}".format(app_configuration)) try: - netconf_change_listener = create_change_listener() + netconf_change_listener = create_change_listener(app_configuration.module_configuration_file_path) SysrepoClient().run_in_session(run_server_forever, netconf_change_listener) except ConfigLoadingException: - logger.error("File to load configuration from file %s" % sys.argv[1]) + logger.error("File to load configuration from file %s" % app_configuration.module_configuration_file_path) else: - logger.error("Missing path to file with configuration argument required to start netconf server.") + logger.error(error) diff --git a/src/python/netconf_rest_application.py b/src/python/netconf_rest_application.py index 79f2336..0d255bc 100644 --- a/src/python/netconf_rest_application.py +++ b/src/python/netconf_rest_application.py @@ -19,10 +19,11 @@ ### import sys import logging + +from netconf_server.netconf_app_configuration import NetconfAppConfiguration from netconf_server.netconf_rest_server import NetconfRestServer from netconf_server.sysrepo_configuration.sysrepo_configuration_manager import SysrepoConfigurationManager -from netconf_server.sysrepo_configuration.sysrepo_configuration_loader import ConfigLoadingException from netconf_server.sysrepo_interface.sysrepo_client import SysrepoClient logging.basicConfig( @@ -46,8 +47,11 @@ def create_conf_manager(session, connection) -> SysrepoConfigurationManager: if __name__ == "__main__": - if len(sys.argv) >= 2: + app_configuration, error = NetconfAppConfiguration.get_configuration(sys.argv) # type: NetconfAppConfiguration, str + + if app_configuration: + logger.info("Netconf rest application configuration: {}".format(app_configuration)) rest_server = create_rest_server() SysrepoClient().run_in_session(start_rest_server, rest_server) else: - logger.error("Missing path to file with configuration argument required to start netconf server.") + logger.error(error) diff --git a/src/python/netconf_server/netconf_app_configuration.py b/src/python/netconf_server/netconf_app_configuration.py new file mode 100644 index 0000000..190e113 --- /dev/null +++ b/src/python/netconf_server/netconf_app_configuration.py @@ -0,0 +1,43 @@ +### +# ============LICENSE_START======================================================= +# Netconf Server +# ================================================================================ +# Copyright (C) 2021 Nokia. All rights reserved. +# ================================================================================ +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============LICENSE_END========================================================= +### +class NetconfAppConfiguration(object): + + @staticmethod + def get_configuration(args: list): + if len(args) >= 4: + configuration_file = args[1] + kafka_host_name = args[2] + kafka_port = args[3] + kafka_topic = args[4] + + return NetconfAppConfiguration(configuration_file, kafka_host_name, kafka_port, kafka_topic), None + else: + return None, "Invalid number of arguments. Please provide all required arguments." + + def __init__(self, module_configuration_file_path: str, kafka_host_name: str, kafka_port: str, kafka_topic: str): + self.module_configuration_file_path = module_configuration_file_path + self.kafka_host_name = kafka_host_name + self.kafka_port = kafka_port + self.kafka_topic = kafka_topic + + def __str__(self): + return "NetconfAppConfiguration[configuration_file -> '{}', kafka_host_name -> '{}', kafka_port -> '{}', kafka_topic -> '{}']"\ + .format(self.module_configuration_file_path, self.kafka_host_name, self.kafka_port, self.kafka_topic) + diff --git a/src/python/netconf_server/netconf_kafka_client.py b/src/python/netconf_server/netconf_kafka_client.py new file mode 100644 index 0000000..b0effc3 --- /dev/null +++ b/src/python/netconf_server/netconf_kafka_client.py @@ -0,0 +1,78 @@ +### +# ============LICENSE_START======================================================= +# Netconf Server +# ================================================================================ +# Copyright (C) 2021 Nokia. All rights reserved. +# ================================================================================ +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============LICENSE_END========================================================= +### +import logging +from json import dumps, loads +from typing import Callable + +from kafka import KafkaProducer, KafkaConsumer +from kafka.producer.future import FutureRecordMetadata + +STANDARD_CHARSETS_UTF8 = 'utf-8' + +logger = logging.getLogger("netconf_kafka_client") + + +def provide_kafka_consumer(topic: str, server: str) -> KafkaConsumer: + return KafkaConsumer(topic, + consumer_timeout_ms=1000, + group_id='netconf-group', + auto_offset_reset='earliest', + enable_auto_commit=True, + bootstrap_servers=[server], + value_deserializer=lambda x: loads(x.decode(STANDARD_CHARSETS_UTF8)) + ) + + +class NetconfKafkaClient(object): + + @staticmethod + def create(host: str, port: int) -> object: + server = "{}:{}".format(host, port) + producer = KafkaProducer( + bootstrap_servers=server, + value_serializer=lambda x: dumps(x).encode(STANDARD_CHARSETS_UTF8) + ) + + return NetconfKafkaClient( + producer=producer, + get_kafka_consumer_func=lambda topic: provide_kafka_consumer(topic, server) + ) + + def __init__(self, producer: KafkaProducer, get_kafka_consumer_func: Callable[[str], KafkaConsumer]): + self._producer = producer + self._get_kafka_consumer = get_kafka_consumer_func + + def send(self, topic: str, value: str) -> FutureRecordMetadata: + return self._producer.send( + topic=topic, + value=value + ) + + def get_all_messages_from(self, topic: str) -> list: + logger.debug("Getting config changes from topic %s" % topic) + + messages = [] + consumer = self._get_kafka_consumer(topic) + for message in consumer: + message_value = message.value + logger.debug("Fetched config change %s" % message_value) + messages.append(message_value) + + return messages diff --git a/src/python/requirements.txt b/src/python/requirements.txt index 0dc606a..eb07c40 100644 --- a/src/python/requirements.txt +++ b/src/python/requirements.txt @@ -20,3 +20,4 @@ sysrepo==0.4.2 Flask==1.1.1 +kafka-python==2.0.2 diff --git a/src/python/test-requirements.txt b/src/python/test-requirements.txt index 4c3f573..4983061 100644 --- a/src/python/test-requirements.txt +++ b/src/python/test-requirements.txt @@ -19,4 +19,5 @@ ### pytest==6.2.2 +kafka-python==2.0.2 diff --git a/src/python/tests/snippet/README.md b/src/python/tests/snippet/README.md new file mode 100644 index 0000000..a120182 --- /dev/null +++ b/src/python/tests/snippet/README.md @@ -0,0 +1,16 @@ +How to use +---------- + 1. Make modifications in 'docker-compose.yml' in root directory + + Replace 'KAFKA_ADVERTISED_HOST_NAME: kafka1' by 'KAFKA_ADVERTISED_HOST_NAME: localhost' + + 2. Build local netconf-server image in root directory + + mvn clean package -Pdocker + + 3. Run docker-compose in root directory + + docker-compose up -d + + 4. Run kafka_producer.py. Wait on results. + 5. Run kafka_consumer.py. Wait on results. diff --git a/src/python/tests/netconf_server/sysrepo_configuration/__init__.py b/src/python/tests/snippet/kafka_consumer.py index eeb06d5..eef3e4d 100644 --- a/src/python/tests/netconf_server/sysrepo_configuration/__init__.py +++ b/src/python/tests/snippet/kafka_consumer.py @@ -17,3 +17,18 @@ # limitations under the License. # ============LICENSE_END========================================================= ### +import logging + +from netconf_server.netconf_kafka_client import NetconfKafkaClient + +logging.basicConfig(filename='kafka_consumer.log', level=logging.DEBUG) + +if __name__ == "__main__": + + client = NetconfKafkaClient.create( + host="localhost", + port=9092 + ) + + messages = client.get_all_messages_from(topic='config') + print("Received {}".format(messages)) diff --git a/src/python/tests/netconf_server/sysrepo_interface/__init__.py b/src/python/tests/snippet/kafka_producer.py index eeb06d5..30a898c 100644 --- a/src/python/tests/netconf_server/sysrepo_interface/__init__.py +++ b/src/python/tests/snippet/kafka_producer.py @@ -17,3 +17,26 @@ # limitations under the License. # ============LICENSE_END========================================================= ### +import logging + +from netconf_server.netconf_kafka_client import NetconfKafkaClient + +NUMBER_OF_MESSAGES = 1000 + +logging.basicConfig(filename='kafka_producer.log', level=logging.DEBUG) + +if __name__ == "__main__": + + client = NetconfKafkaClient.create( + host="localhost", + port=9092 + ) + + for number in range(NUMBER_OF_MESSAGES): + print("Send {}".format(number)) + data = {'number': number} + resp = client.send( + topic='config', + value=data + ) + print("Response: {}".format(resp.get(timeout=4))) diff --git a/src/python/tests/netconf_server/__init__.py b/src/python/tests/unit/__init__.py index eeb06d5..eeb06d5 100644 --- a/src/python/tests/netconf_server/__init__.py +++ b/src/python/tests/unit/__init__.py diff --git a/src/python/tests/unit/sysrepo_configuration/__init__.py b/src/python/tests/unit/sysrepo_configuration/__init__.py new file mode 100644 index 0000000..e69de29 --- /dev/null +++ b/src/python/tests/unit/sysrepo_configuration/__init__.py diff --git a/src/python/tests/netconf_server/sysrepo_configuration/test_sysrepo_configuration_loader.py b/src/python/tests/unit/sysrepo_configuration/test_sysrepo_configuration_loader.py index e5462e4..e5462e4 100644 --- a/src/python/tests/netconf_server/sysrepo_configuration/test_sysrepo_configuration_loader.py +++ b/src/python/tests/unit/sysrepo_configuration/test_sysrepo_configuration_loader.py diff --git a/src/python/tests/netconf_server/sysrepo_configuration/test_sysrepo_configuration_manager_.py b/src/python/tests/unit/sysrepo_configuration/test_sysrepo_configuration_manager_.py index 5194218..5194218 100644 --- a/src/python/tests/netconf_server/sysrepo_configuration/test_sysrepo_configuration_manager_.py +++ b/src/python/tests/unit/sysrepo_configuration/test_sysrepo_configuration_manager_.py diff --git a/src/python/tests/unit/sysrepo_interface/__init__.py b/src/python/tests/unit/sysrepo_interface/__init__.py new file mode 100644 index 0000000..e69de29 --- /dev/null +++ b/src/python/tests/unit/sysrepo_interface/__init__.py diff --git a/src/python/tests/netconf_server/sysrepo_interface/test_config_change_subscriber.py b/src/python/tests/unit/sysrepo_interface/test_config_change_subscriber.py index 9817ba4..9817ba4 100644 --- a/src/python/tests/netconf_server/sysrepo_interface/test_config_change_subscriber.py +++ b/src/python/tests/unit/sysrepo_interface/test_config_change_subscriber.py diff --git a/src/python/tests/netconf_server/test_netconf_chang_listener.py b/src/python/tests/unit/test_netconf_chang_listener.py index c2889f1..c2889f1 100644 --- a/src/python/tests/netconf_server/test_netconf_chang_listener.py +++ b/src/python/tests/unit/test_netconf_chang_listener.py diff --git a/src/python/tests/unit/test_netconf_kafka_client.py b/src/python/tests/unit/test_netconf_kafka_client.py new file mode 100644 index 0000000..9eff761 --- /dev/null +++ b/src/python/tests/unit/test_netconf_kafka_client.py @@ -0,0 +1,60 @@ +### +# ============LICENSE_START======================================================= +# Netconf Server +# ================================================================================ +# Copyright (C) 2021 Nokia. All rights reserved. +# ================================================================================ +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============LICENSE_END========================================================= +### +from unittest import TestCase +from unittest.mock import MagicMock, call + +from netconf_server.netconf_kafka_client import NetconfKafkaClient + +MESSAGE_1 = "{'number':1}" +MESSAGE_2 = "{'number':2}" + +TOPIC_NAME = 'config' + + +class TestNetconfKafkaClient(TestCase): + + def setUp(self): + self.producer = MagicMock() + self.kafka_customer_func = MagicMock( + return_value=[MagicMock(value=MESSAGE_1), MagicMock(value=MESSAGE_2)] + ) + self.test_obj = NetconfKafkaClient( + producer=self.producer, + get_kafka_consumer_func=self.kafka_customer_func + ) + + def test_create_instance(self): + self.assertIsNotNone(self.test_obj) + + def test_send_a_message_to_kafka(self): + # when + self.test_obj.send(TOPIC_NAME, MESSAGE_1) + + # then + self.producer.assert_has_calls([call.send(topic=TOPIC_NAME, value=MESSAGE_1)]) + + def test_get_all_messages_from_kafka_topic(self): + # when + messages = self.test_obj.get_all_messages_from(TOPIC_NAME) + + # then + self.assertTrue(len(messages) == 2) + self.assertTrue(MESSAGE_1 in messages) + self.assertTrue(MESSAGE_2 in messages) diff --git a/src/python/tox.ini b/src/python/tox.ini index dd76991..c3a5083 100644 --- a/src/python/tox.ini +++ b/src/python/tox.ini @@ -1,11 +1,11 @@ [tox] -envlist = py36 +envlist = [py36, py38] skipsdist = true [testenv] commands = pytest basepython = python3 -deps = -r test-requirements.txt +deps = -rtest-requirements.txt [testenv:pytest] commands = pytest -v |