diff options
Diffstat (limited to 'tests/sdnc/sdnc_netconf_tls_post_deploy/libraries/ClientManager.py')
-rw-r--r-- | tests/sdnc/sdnc_netconf_tls_post_deploy/libraries/ClientManager.py | 207 |
1 files changed, 0 insertions, 207 deletions
diff --git a/tests/sdnc/sdnc_netconf_tls_post_deploy/libraries/ClientManager.py b/tests/sdnc/sdnc_netconf_tls_post_deploy/libraries/ClientManager.py deleted file mode 100644 index b1c024ff..00000000 --- a/tests/sdnc/sdnc_netconf_tls_post_deploy/libraries/ClientManager.py +++ /dev/null @@ -1,207 +0,0 @@ -# ============LICENSE_START======================================================= -# Copyright (C) 2020 Nordix Foundation. -# ================================================================================ -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# -# SPDX-License-Identifier: Apache-2.0 -# ============LICENSE_END========================================================= - -__author__ = "Ajay Deep Singh (ajay.deep.singh@est.tech)" -__copyright__ = "Copyright (C) 2020 Nordix Foundation" -__license__ = "Apache 2.0" - -import os -import shutil -import subprocess - -import docker -from OpenSSL import crypto -from docker.types import Mount - -DEV_NULL = open(os.devnull, 'wb') -NETCONF_PNP_SIM_CONTAINER_NAME = 'netconf-simulator' -ARCHIVES_PATH = os.getenv("WORKSPACE") + "/archives/" - - -class ClientManager: - - def __init__(self, mount_path, truststore_path): - self.mount_path = mount_path - self.truststore_path = truststore_path - self.keyPem = mount_path + '/key.pem' - self.caCertPem = mount_path + '/ca.pem' - self.serverKeyPem = mount_path + '/server_key.pem' - self.serverCertPem = mount_path + '/server_cert.pem' - self.keystorePemPath = mount_path + '/keystore.pem' - self.keystoreP12Path = mount_path + '/keystore.p12' - self.keystorePassPath = mount_path + '/keystore.pass' - self.truststorePemPath = mount_path + '/truststore.pem' - self.truststoreP12Path = mount_path + '/truststore.p12' - self.truststorePassPath = mount_path + '/truststore.pass' - - # Function Create docker container. - def run_client_container(self, client_image, container_name, path_to_env, request_url, network): - self.create_mount_dir() - client = docker.from_env() - environment = self.read_env_list_from_file(path_to_env) - environment.append("REQUEST_URL=" + request_url) - container = client.containers.run( - image=client_image, - name=container_name, - environment=environment, - network=network, - user='root', - mounts=[Mount(target='/var/certs', source=self.mount_path, type='bind'), - Mount(target='/etc/onap/aaf/certservice/certs/', source=self.truststore_path, type='bind')], - detach=True - ) - exitcode = container.wait() - return exitcode - - # Function to validate keystore/truststore can be opened with generated pass-phrase. - def can_open_keystore_and_truststore_with_pass(self, container_name): - if container_name != NETCONF_PNP_SIM_CONTAINER_NAME: - return self.can_open_keystore_and_truststore_pem_files() - else: - return self.can_open_keystore_and_truststore_p12_files() - - # Function to validate keystore.pem/truststore.pem exist and are not empty. - def can_open_keystore_and_truststore_pem_files(self): - try: - private_key = self.file_exist_and_not_empty(self.keyPem) - keystore_pem = self.file_exist_and_not_empty(self.keystorePemPath) - truststore_pem = self.file_exist_and_not_empty(self.truststorePemPath) - return private_key and keystore_pem and truststore_pem - except Exception as e: - print("UnExpected Error in validating keystore.pem/truststore.pem: {0}".format(e)) - return False - - # Function to validate keystore.p12/truststore.p12 can be opened with generated pass-phrase. - def can_open_keystore_and_truststore_p12_files(self): - can_open_keystore = self.can_open_p12_file_with_pass_file(self.keystorePassPath, self.keystoreP12Path) - can_open_truststore = self.can_open_p12_file_with_pass_file(self.truststorePassPath, self.truststoreP12Path) - return can_open_keystore & can_open_truststore - - # Method for Uploading Certificate in SDNC-Container. - # Creating/Uploading Server-key, Server-cert, Ca-cert PEM files in Netconf-Pnp-Simulator. - def can_install_keystore_and_truststore_certs(self, cmd, cmd_tls, container_name): - continue_exec = True - if container_name == NETCONF_PNP_SIM_CONTAINER_NAME: - print("Generating PEM files for {0} from P12 files".format(container_name)) - continue_exec = self.create_pem(self.keystorePassPath, self.keystoreP12Path, self.truststorePassPath, - self.truststoreP12Path) - else: - cmd = cmd_tls - if continue_exec: - print("Initiate Configuration Push for : {0}".format(container_name)) - resp_code = self.execute_bash_config(cmd, container_name) - if resp_code == 0: - print("Execution Successful for: {0}".format(container_name)) - return True - else: - print("Execution Failed for: {0}".format(container_name)) - return False - - def create_pem(self, keystore_pass_path, keystore_p12_path, truststore_pass_path, truststore_p12_path): - # Create [server_key.pem, server_cert.pem, ca.pem] files for Netconf-Pnp-Simulation/TLS Configuration. - try: - with open(self.serverKeyPem, "wb+") as key_file: - key_file.write(crypto.dump_privatekey(crypto.FILETYPE_PEM, - self.get_pkcs12(keystore_pass_path, - keystore_p12_path).get_privatekey())) - with open(self.serverCertPem, "wb+") as server_cert_file: - server_cert_file.write(crypto.dump_certificate(crypto.FILETYPE_PEM, - self.get_pkcs12(keystore_pass_path, - keystore_p12_path).get_certificate())) - with open(self.caCertPem, "wb+") as ca_cert_file: - ca_cert_file.write( - crypto.dump_certificate(crypto.FILETYPE_PEM, - self.get_pkcs12(truststore_pass_path, - truststore_p12_path).get_ca_certificates()[0])) - return True - except IOError as err: - print("I/O Error: {0}".format(err)) - return False - except Exception as e: - print("UnExpected Error: {0}".format(e)) - return False - - def can_open_p12_file_with_pass_file(self, pass_file_path, p12_file_path): - try: - if p12_file_path.split('/')[-1] == 'truststore.p12': - pkcs12 = self.get_pkcs12(pass_file_path, p12_file_path).get_ca_certificates()[0] - else: - pkcs12 = self.get_pkcs12(pass_file_path, p12_file_path).get_certificate() - if pkcs12 is None: - return False - return True - except IOError as err: - print("I/O Error PKCS12 Creation failed: {0}".format(err)) - return False - except Exception as e: - print("UnExpected Error PKCS12 Creation failed: {0}".format(e)) - return False - - def remove_client_container_and_save_logs(self, container_name, log_file_name): - client = docker.from_env() - container = client.containers.get(container_name) - text_file = open(ARCHIVES_PATH + container_name + '_' + log_file_name + ".log", "w") - text_file.write(container.logs()) - text_file.close() - container.remove() - self.remove_mount_dir() - - def create_mount_dir(self): - if not os.path.exists(self.mount_path): - os.makedirs(self.mount_path) - - def remove_mount_dir(self): - shutil.rmtree(self.mount_path) - - def file_exist_and_not_empty(self, path_to_file): - return os.path.isfile(path_to_file) and os.path.getsize(path_to_file) > 0 - - @staticmethod - def get_pkcs12(pass_file_path, p12_file_path): - # Load PKCS12 Object - password = open(pass_file_path, 'rb').read() - return crypto.load_pkcs12(open(p12_file_path, 'rb').read(), password) - - @staticmethod - def execute_bash_config(cmd, container_name): - # Run command with arguments. Wait for command to complete or timeout, return code attribute. - try: - resp_code = subprocess.call(["%s %s" % (cmd, container_name)], shell=True, stdout=DEV_NULL, - stderr=subprocess.STDOUT) - print("Response Code from Config.sh execution: {0}".format(resp_code)) - return resp_code - except subprocess.CalledProcessError as e: - print("CalledProcessError Certificate installation failed in SDNC-ODL Container: {0}".format(e)) - return 1 # Return Error Code - - @staticmethod - def get_container_logs(container_name): - client = docker.from_env() - container = client.containers.get(container_name) - logs = container.logs() - return logs - - @staticmethod - def read_env_list_from_file(path): - f = open(path, "r") - r_list = [] - for line in f: - line = line.strip() - if line[0] != "#": - r_list.append(line) - return r_list |