aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorZebek Bogumil <bogumil.zebek@nokia.com>2021-03-29 07:38:36 +0200
committerZebek Bogumil <bogumil.zebek@nokia.com>2021-03-29 09:36:27 +0200
commit18c0838e23209d58881dc59a4ae24a2111883ac2 (patch)
tree51d7ef3953667f2e00c733d53d91394b7818a1ce
parent0c3c63ccbe28a18f57b8341a73eca08a6ad832fc (diff)
Add Kafka envs
Signed-off-by: Bogumil Zebek <bogumil.zebek@nokia.com> Issue-ID: INT-1869 Change-Id: Id8f4d3061fc16e8cb2c16bf32058ec8527e4c6c3
-rw-r--r--.gitignore2
-rw-r--r--Dockerfile3
-rw-r--r--docker-compose.yml38
-rwxr-xr-xscripts/run-netconf-server-application.sh15
-rwxr-xr-xscripts/set-up-netopeer.sh19
-rw-r--r--src/python/netconf_change_listener_application.py21
-rw-r--r--src/python/netconf_rest_application.py10
-rw-r--r--src/python/netconf_server/netconf_app_configuration.py43
-rw-r--r--src/python/netconf_server/netconf_kafka_client.py78
-rw-r--r--src/python/requirements.txt1
-rw-r--r--src/python/test-requirements.txt1
-rw-r--r--src/python/tests/snippet/README.md16
-rw-r--r--src/python/tests/snippet/kafka_consumer.py (renamed from src/python/tests/netconf_server/sysrepo_configuration/__init__.py)15
-rw-r--r--src/python/tests/snippet/kafka_producer.py (renamed from src/python/tests/netconf_server/sysrepo_interface/__init__.py)23
-rw-r--r--src/python/tests/unit/__init__.py (renamed from src/python/tests/netconf_server/__init__.py)0
-rw-r--r--src/python/tests/unit/sysrepo_configuration/__init__.py0
-rw-r--r--src/python/tests/unit/sysrepo_configuration/test_sysrepo_configuration_loader.py (renamed from src/python/tests/netconf_server/sysrepo_configuration/test_sysrepo_configuration_loader.py)0
-rw-r--r--src/python/tests/unit/sysrepo_configuration/test_sysrepo_configuration_manager_.py (renamed from src/python/tests/netconf_server/sysrepo_configuration/test_sysrepo_configuration_manager_.py)0
-rw-r--r--src/python/tests/unit/sysrepo_interface/__init__.py0
-rw-r--r--src/python/tests/unit/sysrepo_interface/test_config_change_subscriber.py (renamed from src/python/tests/netconf_server/sysrepo_interface/test_config_change_subscriber.py)0
-rw-r--r--src/python/tests/unit/test_netconf_chang_listener.py (renamed from src/python/tests/netconf_server/test_netconf_chang_listener.py)0
-rw-r--r--src/python/tests/unit/test_netconf_kafka_client.py60
-rw-r--r--src/python/tox.ini4
23 files changed, 321 insertions, 28 deletions
diff --git a/.gitignore b/.gitignore
index 972ea9f..2072bde 100644
--- a/.gitignore
+++ b/.gitignore
@@ -4,3 +4,5 @@
**/logs
**/venv
**/__pycache__
+
+.tox/
diff --git a/Dockerfile b/Dockerfile
index d56d20a..1303b40 100644
--- a/Dockerfile
+++ b/Dockerfile
@@ -18,5 +18,8 @@ RUN mkdir -p /resources/certs && \
./scripts/generate-certificates.sh /resources/certs
ENV ENABLE_TLS=false
+ENV KAFKA_HOST_NAME="kafka1"
+ENV KAFKA_PORT=9092
+ENV KAFKA_TOPIC="config:1:1"
ENTRYPOINT ["./scripts/set-up-netopeer.sh", "/resources/models", "/resources/certs"]
diff --git a/docker-compose.yml b/docker-compose.yml
index fbc9516..bd268a8 100644
--- a/docker-compose.yml
+++ b/docker-compose.yml
@@ -1,11 +1,43 @@
version: '3'
services:
+ zookeeper:
+ image: wurstmeister/zookeeper
+ ports:
+ - "2181:2181"
+ networks:
+ - netconfnetwork
+
+ kafka1:
+ image: wurstmeister/kafka:1.1.0
+ ports:
+ - "9092:9092"
+ hostname: kafka1
+ networks:
+ - netconfnetwork
+ environment:
+ KAFKA_ADVERTISED_PORT: 9092
+ KAFKA_ADVERTISED_HOST_NAME: kafka1
+ KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
+ KAFKA_CREATE_TOPICS: "config:1:1"
+ KAFKA_DELETE_RETENTION_MS: 604800000
+ KAFKA_LOG_CLEANER_DELETE_RETENTION_MS: 604800000
+ depends_on:
+ - zookeeper
netconf-server:
container_name: netconf-server
image: onap/org.onap.integration.nfsimulator.netconfserver:latest
ports:
- - "830:830"
- - "6513:6513"
- - "6555:6555"
+ - "830:830"
+ - "6513:6513"
+ - "6555:6555"
+ networks:
+ - netconfnetwork
+ depends_on:
+ - zookeeper
+ - kafka1
+
+networks:
+ netconfnetwork:
+ driver: bridge
diff --git a/scripts/run-netconf-server-application.sh b/scripts/run-netconf-server-application.sh
index b07da8a..81ce10f 100755
--- a/scripts/run-netconf-server-application.sh
+++ b/scripts/run-netconf-server-application.sh
@@ -19,19 +19,20 @@
# ============LICENSE_END=========================================================
###
-if [ "$#" -eq 2 ]; then
+if [ "$#" -eq 5 ]; then
## Set up variable
MODELS_CONFIG_PATH=$1
MODELS_CONFIG_NAME=$2
+ KAFKA_HOST_NAME=$3
+ KAFKA_HOST_PORT=$4
+ KAFKA_TOPIC=$5
- echo "Starting NETCONF Change listener"
- python3 ./application/netconf_change_listener_application.py $MODELS_CONFIG_PATH/$MODELS_CONFIG_NAME &
-
-
- echo "Starting NETCONF Rest server"
- python3 ./application/netconf_rest_application.py $MODELS_CONFIG_PATH/$MODELS_CONFIG_NAME &
+ echo "[INFO] Starting NETCONF Change listener"
+ python3 ./application/netconf_change_listener_application.py $MODELS_CONFIG_PATH/$MODELS_CONFIG_NAME $KAFKA_HOST_NAME $KAFKA_HOST_PORT $KAFKA_TOPIC &
+ echo "[INFO] Starting NETCONF Rest server"
+ python3 ./application/netconf_rest_application.py $MODELS_CONFIG_PATH/$MODELS_CONFIG_NAME $KAFKA_HOST_NAME $KAFKA_HOST_PORT $KAFKA_TOPIC &
else
echo "[ERROR] Invalid number of arguments. Please provide all required arguments."
diff --git a/scripts/set-up-netopeer.sh b/scripts/set-up-netopeer.sh
index e7b4f76..ab285d8 100755
--- a/scripts/set-up-netopeer.sh
+++ b/scripts/set-up-netopeer.sh
@@ -19,11 +19,17 @@
# ============LICENSE_END=========================================================
###
-if [ "$#" -ge 1 ]; then
+echo "[INFO] Starting NETCONF Server app configuration ..."
+
+if [ "$#" -gt 1 ]; then
## Set up variable
SCRIPTS_DIR=$PWD/"$(dirname $0)"
enable_tls=${ENABLE_TLS:-false}
+ kafka_host_name=${KAFKA_HOST_NAME:-"localhost"}
+ kafka_port=${KAFKA_PORT:-9092}
+ kafka_topic=${KAFKA_TOPIC=-"config:1:1"}
+ models_config_path=$1
models_configuration_file_name=${MODELS_CONFIGURATION_FILE_NAME:-models-configuration.ini}
## Install all modules from given directory
@@ -32,19 +38,22 @@ if [ "$#" -ge 1 ]; then
## If TLS is enabled start initializing certificates
if [[ "$enable_tls" == "true" ]]; then
if [ "$#" -ge 2 ]; then
- echo "initializing TLS"
+ echo "[INFO] Initializing TLS"
$SCRIPTS_DIR/install-tls-with-custom-certificates.sh $SCRIPTS_DIR/tls $2
else
- echo "Missing second argument: path to file with certificates for TLS."
+ echo "[ERROR] Missing second argument: path to file with certificates for TLS."
fi
fi
+ echo "[INFO] NETCONF Server configuration finished."
+
## Run netconf server application
- $SCRIPTS_DIR/run-netconf-server-application.sh $1 $models_configuration_file_name
+ $SCRIPTS_DIR/run-netconf-server-application.sh $models_config_path $models_configuration_file_name $kafka_host_name $kafka_port $kafka_topic
## Run sysrepo supervisor
+ echo "[INFO] Starting Netopeer Server ..."
/usr/bin/supervisord -c /etc/supervisord.conf
else
- echo "Missing first argument: path to file with YANG models."
+ echo "[ERROR] Unable to configure application. Provide all required arguments."
fi
diff --git a/src/python/netconf_change_listener_application.py b/src/python/netconf_change_listener_application.py
index a4dd8bd..34b00b3 100644
--- a/src/python/netconf_change_listener_application.py
+++ b/src/python/netconf_change_listener_application.py
@@ -20,8 +20,10 @@
import asyncio
import sys
import logging
-from netconf_server.netconf_rest_server import NetconfRestServer
-from netconf_server.sysrepo_configuration.sysrepo_configuration_manager import SysrepoConfigurationManager
+
+from typing import Tuple
+
+from netconf_server.netconf_app_configuration import NetconfAppConfiguration
from netconf_server.netconf_change_listener import NetconfChangeListener
from netconf_server.netconf_change_listener_factory import NetconfChangeListenerFactory
@@ -41,17 +43,20 @@ def run_server_forever(session, connection, change_listener: NetconfChangeListen
asyncio.get_event_loop().run_forever()
-def create_change_listener() -> NetconfChangeListener:
- configuration = SysrepoConfigurationLoader.load_configuration(sys.argv[1])
+def create_change_listener(module_configuration_file_path: str) -> NetconfChangeListener:
+ configuration = SysrepoConfigurationLoader.load_configuration(module_configuration_file_path)
return NetconfChangeListenerFactory(configuration.models_to_subscribe_to).create()
if __name__ == "__main__":
- if len(sys.argv) >= 2:
+ app_configuration, error = NetconfAppConfiguration.get_configuration(sys.argv) # type: NetconfAppConfiguration, str
+
+ if app_configuration:
+ logger.info("Netconf change listener application configuration: {}".format(app_configuration))
try:
- netconf_change_listener = create_change_listener()
+ netconf_change_listener = create_change_listener(app_configuration.module_configuration_file_path)
SysrepoClient().run_in_session(run_server_forever, netconf_change_listener)
except ConfigLoadingException:
- logger.error("File to load configuration from file %s" % sys.argv[1])
+ logger.error("File to load configuration from file %s" % app_configuration.module_configuration_file_path)
else:
- logger.error("Missing path to file with configuration argument required to start netconf server.")
+ logger.error(error)
diff --git a/src/python/netconf_rest_application.py b/src/python/netconf_rest_application.py
index 79f2336..0d255bc 100644
--- a/src/python/netconf_rest_application.py
+++ b/src/python/netconf_rest_application.py
@@ -19,10 +19,11 @@
###
import sys
import logging
+
+from netconf_server.netconf_app_configuration import NetconfAppConfiguration
from netconf_server.netconf_rest_server import NetconfRestServer
from netconf_server.sysrepo_configuration.sysrepo_configuration_manager import SysrepoConfigurationManager
-from netconf_server.sysrepo_configuration.sysrepo_configuration_loader import ConfigLoadingException
from netconf_server.sysrepo_interface.sysrepo_client import SysrepoClient
logging.basicConfig(
@@ -46,8 +47,11 @@ def create_conf_manager(session, connection) -> SysrepoConfigurationManager:
if __name__ == "__main__":
- if len(sys.argv) >= 2:
+ app_configuration, error = NetconfAppConfiguration.get_configuration(sys.argv) # type: NetconfAppConfiguration, str
+
+ if app_configuration:
+ logger.info("Netconf rest application configuration: {}".format(app_configuration))
rest_server = create_rest_server()
SysrepoClient().run_in_session(start_rest_server, rest_server)
else:
- logger.error("Missing path to file with configuration argument required to start netconf server.")
+ logger.error(error)
diff --git a/src/python/netconf_server/netconf_app_configuration.py b/src/python/netconf_server/netconf_app_configuration.py
new file mode 100644
index 0000000..190e113
--- /dev/null
+++ b/src/python/netconf_server/netconf_app_configuration.py
@@ -0,0 +1,43 @@
+###
+# ============LICENSE_START=======================================================
+# Netconf Server
+# ================================================================================
+# Copyright (C) 2021 Nokia. All rights reserved.
+# ================================================================================
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# ============LICENSE_END=========================================================
+###
+class NetconfAppConfiguration(object):
+
+ @staticmethod
+ def get_configuration(args: list):
+ if len(args) >= 4:
+ configuration_file = args[1]
+ kafka_host_name = args[2]
+ kafka_port = args[3]
+ kafka_topic = args[4]
+
+ return NetconfAppConfiguration(configuration_file, kafka_host_name, kafka_port, kafka_topic), None
+ else:
+ return None, "Invalid number of arguments. Please provide all required arguments."
+
+ def __init__(self, module_configuration_file_path: str, kafka_host_name: str, kafka_port: str, kafka_topic: str):
+ self.module_configuration_file_path = module_configuration_file_path
+ self.kafka_host_name = kafka_host_name
+ self.kafka_port = kafka_port
+ self.kafka_topic = kafka_topic
+
+ def __str__(self):
+ return "NetconfAppConfiguration[configuration_file -> '{}', kafka_host_name -> '{}', kafka_port -> '{}', kafka_topic -> '{}']"\
+ .format(self.module_configuration_file_path, self.kafka_host_name, self.kafka_port, self.kafka_topic)
+
diff --git a/src/python/netconf_server/netconf_kafka_client.py b/src/python/netconf_server/netconf_kafka_client.py
new file mode 100644
index 0000000..b0effc3
--- /dev/null
+++ b/src/python/netconf_server/netconf_kafka_client.py
@@ -0,0 +1,78 @@
+###
+# ============LICENSE_START=======================================================
+# Netconf Server
+# ================================================================================
+# Copyright (C) 2021 Nokia. All rights reserved.
+# ================================================================================
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# ============LICENSE_END=========================================================
+###
+import logging
+from json import dumps, loads
+from typing import Callable
+
+from kafka import KafkaProducer, KafkaConsumer
+from kafka.producer.future import FutureRecordMetadata
+
+STANDARD_CHARSETS_UTF8 = 'utf-8'
+
+logger = logging.getLogger("netconf_kafka_client")
+
+
+def provide_kafka_consumer(topic: str, server: str) -> KafkaConsumer:
+ return KafkaConsumer(topic,
+ consumer_timeout_ms=1000,
+ group_id='netconf-group',
+ auto_offset_reset='earliest',
+ enable_auto_commit=True,
+ bootstrap_servers=[server],
+ value_deserializer=lambda x: loads(x.decode(STANDARD_CHARSETS_UTF8))
+ )
+
+
+class NetconfKafkaClient(object):
+
+ @staticmethod
+ def create(host: str, port: int) -> object:
+ server = "{}:{}".format(host, port)
+ producer = KafkaProducer(
+ bootstrap_servers=server,
+ value_serializer=lambda x: dumps(x).encode(STANDARD_CHARSETS_UTF8)
+ )
+
+ return NetconfKafkaClient(
+ producer=producer,
+ get_kafka_consumer_func=lambda topic: provide_kafka_consumer(topic, server)
+ )
+
+ def __init__(self, producer: KafkaProducer, get_kafka_consumer_func: Callable[[str], KafkaConsumer]):
+ self._producer = producer
+ self._get_kafka_consumer = get_kafka_consumer_func
+
+ def send(self, topic: str, value: str) -> FutureRecordMetadata:
+ return self._producer.send(
+ topic=topic,
+ value=value
+ )
+
+ def get_all_messages_from(self, topic: str) -> list:
+ logger.debug("Getting config changes from topic %s" % topic)
+
+ messages = []
+ consumer = self._get_kafka_consumer(topic)
+ for message in consumer:
+ message_value = message.value
+ logger.debug("Fetched config change %s" % message_value)
+ messages.append(message_value)
+
+ return messages
diff --git a/src/python/requirements.txt b/src/python/requirements.txt
index 0dc606a..eb07c40 100644
--- a/src/python/requirements.txt
+++ b/src/python/requirements.txt
@@ -20,3 +20,4 @@
sysrepo==0.4.2
Flask==1.1.1
+kafka-python==2.0.2
diff --git a/src/python/test-requirements.txt b/src/python/test-requirements.txt
index 4c3f573..4983061 100644
--- a/src/python/test-requirements.txt
+++ b/src/python/test-requirements.txt
@@ -19,4 +19,5 @@
###
pytest==6.2.2
+kafka-python==2.0.2
diff --git a/src/python/tests/snippet/README.md b/src/python/tests/snippet/README.md
new file mode 100644
index 0000000..a120182
--- /dev/null
+++ b/src/python/tests/snippet/README.md
@@ -0,0 +1,16 @@
+How to use
+----------
+ 1. Make modifications in 'docker-compose.yml' in root directory
+
+ Replace 'KAFKA_ADVERTISED_HOST_NAME: kafka1' by 'KAFKA_ADVERTISED_HOST_NAME: localhost'
+
+ 2. Build local netconf-server image in root directory
+
+ mvn clean package -Pdocker
+
+ 3. Run docker-compose in root directory
+
+ docker-compose up -d
+
+ 4. Run kafka_producer.py. Wait on results.
+ 5. Run kafka_consumer.py. Wait on results.
diff --git a/src/python/tests/netconf_server/sysrepo_configuration/__init__.py b/src/python/tests/snippet/kafka_consumer.py
index eeb06d5..eef3e4d 100644
--- a/src/python/tests/netconf_server/sysrepo_configuration/__init__.py
+++ b/src/python/tests/snippet/kafka_consumer.py
@@ -17,3 +17,18 @@
# limitations under the License.
# ============LICENSE_END=========================================================
###
+import logging
+
+from netconf_server.netconf_kafka_client import NetconfKafkaClient
+
+logging.basicConfig(filename='kafka_consumer.log', level=logging.DEBUG)
+
+if __name__ == "__main__":
+
+ client = NetconfKafkaClient.create(
+ host="localhost",
+ port=9092
+ )
+
+ messages = client.get_all_messages_from(topic='config')
+ print("Received {}".format(messages))
diff --git a/src/python/tests/netconf_server/sysrepo_interface/__init__.py b/src/python/tests/snippet/kafka_producer.py
index eeb06d5..30a898c 100644
--- a/src/python/tests/netconf_server/sysrepo_interface/__init__.py
+++ b/src/python/tests/snippet/kafka_producer.py
@@ -17,3 +17,26 @@
# limitations under the License.
# ============LICENSE_END=========================================================
###
+import logging
+
+from netconf_server.netconf_kafka_client import NetconfKafkaClient
+
+NUMBER_OF_MESSAGES = 1000
+
+logging.basicConfig(filename='kafka_producer.log', level=logging.DEBUG)
+
+if __name__ == "__main__":
+
+ client = NetconfKafkaClient.create(
+ host="localhost",
+ port=9092
+ )
+
+ for number in range(NUMBER_OF_MESSAGES):
+ print("Send {}".format(number))
+ data = {'number': number}
+ resp = client.send(
+ topic='config',
+ value=data
+ )
+ print("Response: {}".format(resp.get(timeout=4)))
diff --git a/src/python/tests/netconf_server/__init__.py b/src/python/tests/unit/__init__.py
index eeb06d5..eeb06d5 100644
--- a/src/python/tests/netconf_server/__init__.py
+++ b/src/python/tests/unit/__init__.py
diff --git a/src/python/tests/unit/sysrepo_configuration/__init__.py b/src/python/tests/unit/sysrepo_configuration/__init__.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/src/python/tests/unit/sysrepo_configuration/__init__.py
diff --git a/src/python/tests/netconf_server/sysrepo_configuration/test_sysrepo_configuration_loader.py b/src/python/tests/unit/sysrepo_configuration/test_sysrepo_configuration_loader.py
index e5462e4..e5462e4 100644
--- a/src/python/tests/netconf_server/sysrepo_configuration/test_sysrepo_configuration_loader.py
+++ b/src/python/tests/unit/sysrepo_configuration/test_sysrepo_configuration_loader.py
diff --git a/src/python/tests/netconf_server/sysrepo_configuration/test_sysrepo_configuration_manager_.py b/src/python/tests/unit/sysrepo_configuration/test_sysrepo_configuration_manager_.py
index 5194218..5194218 100644
--- a/src/python/tests/netconf_server/sysrepo_configuration/test_sysrepo_configuration_manager_.py
+++ b/src/python/tests/unit/sysrepo_configuration/test_sysrepo_configuration_manager_.py
diff --git a/src/python/tests/unit/sysrepo_interface/__init__.py b/src/python/tests/unit/sysrepo_interface/__init__.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/src/python/tests/unit/sysrepo_interface/__init__.py
diff --git a/src/python/tests/netconf_server/sysrepo_interface/test_config_change_subscriber.py b/src/python/tests/unit/sysrepo_interface/test_config_change_subscriber.py
index 9817ba4..9817ba4 100644
--- a/src/python/tests/netconf_server/sysrepo_interface/test_config_change_subscriber.py
+++ b/src/python/tests/unit/sysrepo_interface/test_config_change_subscriber.py
diff --git a/src/python/tests/netconf_server/test_netconf_chang_listener.py b/src/python/tests/unit/test_netconf_chang_listener.py
index c2889f1..c2889f1 100644
--- a/src/python/tests/netconf_server/test_netconf_chang_listener.py
+++ b/src/python/tests/unit/test_netconf_chang_listener.py
diff --git a/src/python/tests/unit/test_netconf_kafka_client.py b/src/python/tests/unit/test_netconf_kafka_client.py
new file mode 100644
index 0000000..9eff761
--- /dev/null
+++ b/src/python/tests/unit/test_netconf_kafka_client.py
@@ -0,0 +1,60 @@
+###
+# ============LICENSE_START=======================================================
+# Netconf Server
+# ================================================================================
+# Copyright (C) 2021 Nokia. All rights reserved.
+# ================================================================================
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# ============LICENSE_END=========================================================
+###
+from unittest import TestCase
+from unittest.mock import MagicMock, call
+
+from netconf_server.netconf_kafka_client import NetconfKafkaClient
+
+MESSAGE_1 = "{'number':1}"
+MESSAGE_2 = "{'number':2}"
+
+TOPIC_NAME = 'config'
+
+
+class TestNetconfKafkaClient(TestCase):
+
+ def setUp(self):
+ self.producer = MagicMock()
+ self.kafka_customer_func = MagicMock(
+ return_value=[MagicMock(value=MESSAGE_1), MagicMock(value=MESSAGE_2)]
+ )
+ self.test_obj = NetconfKafkaClient(
+ producer=self.producer,
+ get_kafka_consumer_func=self.kafka_customer_func
+ )
+
+ def test_create_instance(self):
+ self.assertIsNotNone(self.test_obj)
+
+ def test_send_a_message_to_kafka(self):
+ # when
+ self.test_obj.send(TOPIC_NAME, MESSAGE_1)
+
+ # then
+ self.producer.assert_has_calls([call.send(topic=TOPIC_NAME, value=MESSAGE_1)])
+
+ def test_get_all_messages_from_kafka_topic(self):
+ # when
+ messages = self.test_obj.get_all_messages_from(TOPIC_NAME)
+
+ # then
+ self.assertTrue(len(messages) == 2)
+ self.assertTrue(MESSAGE_1 in messages)
+ self.assertTrue(MESSAGE_2 in messages)
diff --git a/src/python/tox.ini b/src/python/tox.ini
index dd76991..c3a5083 100644
--- a/src/python/tox.ini
+++ b/src/python/tox.ini
@@ -1,11 +1,11 @@
[tox]
-envlist = py36
+envlist = [py36, py38]
skipsdist = true
[testenv]
commands = pytest
basepython = python3
-deps = -r test-requirements.txt
+deps = -rtest-requirements.txt
[testenv:pytest]
commands = pytest -v