From ee0c74e28de9552e683724264b101362c144694c Mon Sep 17 00:00:00 2001 From: ebo Date: Sun, 19 Apr 2020 01:33:21 +0100 Subject: netconf-pnp-simulator: fix sysrepod crash on TLS reconfig The crash was caused by: - the '--permanent' option while updating the ietf-keystore by sysrepocfg - missing some Yang modules on sysrepo installation Other changes: 1. Added TLS integration tests, including reconfiguration 2. reconfigure-*.sh are now synchronous, only returnig after restart is completed Issue-ID: INT-1516 Change-Id: Iddc03fc968aaab60931596045437ba0c78448b08 Signed-off-by: ebo --- .../netconf-pnp-simulator/engine/tests/test_tls.py | 115 +++++++++++++++++++++ 1 file changed, 115 insertions(+) create mode 100644 test/mocks/netconf-pnp-simulator/engine/tests/test_tls.py (limited to 'test/mocks/netconf-pnp-simulator/engine/tests/test_tls.py') diff --git a/test/mocks/netconf-pnp-simulator/engine/tests/test_tls.py b/test/mocks/netconf-pnp-simulator/engine/tests/test_tls.py new file mode 100644 index 000000000..f0adf447f --- /dev/null +++ b/test/mocks/netconf-pnp-simulator/engine/tests/test_tls.py @@ -0,0 +1,115 @@ +import os +import socket +import ssl +import tarfile +import tempfile +import time +from io import StringIO +from typing import List + +import docker +import pytest +from docker.models.containers import Container +from lxml import etree +from ncclient.transport.ssh import MSG_DELIM + +import settings + +HELLO_DTD = etree.DTD(StringIO(""" + + + + + +""")) + +INITIAL_CONFIG_DIR = "data/tls_initial" +NEW_CONFIG_DIR = "data/tls_new" + + +class TestTLS: + container: Container + + @classmethod + def setup_class(cls): + dkr = docker.from_env() + containers = dkr.containers.list(filters={"ancestor": "netconf-pnp-simulator:latest"}) + assert len(containers) == 1 + cls.container = containers[0] + + def test_tls_connect(self): + nc_connect(INITIAL_CONFIG_DIR) + + @pytest.mark.parametrize("round_id", [f"round #{i + 1}" for i in range(6)]) + def test_tls_reconfiguration(self, round_id): + # pylint: disable=W0613 + self.reconfigure_and_check(NEW_CONFIG_DIR, INITIAL_CONFIG_DIR) + self.reconfigure_and_check(INITIAL_CONFIG_DIR, NEW_CONFIG_DIR) + + def reconfigure_and_check(self, good_config_dir: str, bad_config_dir: str): + with simple_tar([f"{good_config_dir}/{b}.pem" for b in ["ca", "server_key", "server_cert"]]) as config_tar: + status = self.container.put_archive(f"/config/tls", config_tar) + assert status + test_start = int(time.time()) + exit_code, (_, err) = self.container.exec_run("/opt/bin/reconfigure-tls.sh", demux=True) + if exit_code != 0: + print(f"reconfigure-tls.sh failed with rc={exit_code}") + log_all("stderr", err) + log_all("Container Logs", self.container.logs(since=test_start)) + assert False + nc_connect(good_config_dir) + # Exception matching must be compatible with Py36 and Py37+ + with pytest.raises(ssl.SSLError, match=r".*\[SSL: CERTIFICATE_VERIFY_FAILED\].*"): + nc_connect(bad_config_dir) + + +def log_all(heading: str, lines: object): + print(f"{heading}:") + if isinstance(lines, bytes): + lines = lines.decode("utf-8") + if isinstance(lines, str): + lines = lines.split("\n") + for line in lines: + print(" ", line) + + +def simple_tar(paths: List[str]): + file = tempfile.NamedTemporaryFile() + with tarfile.open(mode="w", fileobj=file) as tar: + for path in paths: + abs_path = os.path.abspath(path) + tar.add(abs_path, arcname=os.path.basename(path), recursive=False) + file.seek(0) + return file + + +def nc_connect(config_dir: str): + with socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0) as sock: + context = ssl.create_default_context() + context.load_verify_locations(f"{config_dir}/ca.pem") + context.load_cert_chain(certfile=f"{config_dir}/client_cert.pem", keyfile=f"{config_dir}/client_key.pem") + context.check_hostname = False + with context.wrap_socket(sock, server_side=False, server_hostname=settings.HOST) as conn: + conn.connect((settings.HOST, settings.TLS_PORT)) + buf = nc_read_msg(conn) + print(f"Received NETCONF HelloMessage:\n{buf}") + conn.close() + assert buf.endswith(MSG_DELIM) + hello_root = etree.XML(buf[:-len(MSG_DELIM)]) + valid = HELLO_DTD.validate(hello_root) + if not valid: + log_all("Invalid NETCONF msg", list(HELLO_DTD.error_log.filter_from_errors())) + assert False + + +def nc_read_msg(conn: ssl.SSLSocket): + buf = '' + while True: + data = conn.recv(4096) + if data: + buf += data.decode(encoding="utf-8") + if buf.endswith(MSG_DELIM): + break + else: + break + return buf -- cgit 1.2.3-korg