aboutsummaryrefslogtreecommitdiffstats
path: root/tests/aaf/certservice/libraries
diff options
context:
space:
mode:
Diffstat (limited to 'tests/aaf/certservice/libraries')
-rw-r--r--tests/aaf/certservice/libraries/ArtifactParser.py40
-rw-r--r--tests/aaf/certservice/libraries/CertClientManager.py72
-rw-r--r--tests/aaf/certservice/libraries/EnvsReader.py11
-rw-r--r--tests/aaf/certservice/libraries/JksArtifactsValidator.py45
-rw-r--r--tests/aaf/certservice/libraries/P12ArtifactsValidator.py37
-rw-r--r--tests/aaf/certservice/libraries/PemArtifactsValidator.py39
6 files changed, 0 insertions, 244 deletions
diff --git a/tests/aaf/certservice/libraries/ArtifactParser.py b/tests/aaf/certservice/libraries/ArtifactParser.py
deleted file mode 100644
index 54e8d0ff..00000000
--- a/tests/aaf/certservice/libraries/ArtifactParser.py
+++ /dev/null
@@ -1,40 +0,0 @@
-from cryptography.x509.oid import ExtensionOID
-from cryptography import x509
-
-class ArtifactParser:
-
- def __init__(self, mount_path, ext):
- self.keystorePassPath = mount_path + '/keystore.pass'
- self.keystorePath = mount_path + '/keystore.' + ext
- self.truststorePassPath = mount_path + '/truststore.pass'
- self.truststorePath = mount_path + '/truststore.' + ext
-
- def contains_expected_data(self, data):
- expectedData = data.expectedData
- actualData = data.actualData
- return cmp(expectedData, actualData) == 0
-
- def get_owner_data_from_certificate(self, certificate):
- list = certificate.get_subject().get_components()
- return dict((k, v) for k, v in list)
-
- def get_sans(self, cert):
- extension = cert.to_cryptography().extensions.get_extension_for_oid(ExtensionOID.SUBJECT_ALTERNATIVE_NAME)
- dnsList = extension.value.get_values_for_type(x509.DNSName)
- return ':'.join(map(lambda dns: dns.encode('ascii','ignore'), dnsList))
-
- def get_envs_as_dict(self, list):
- envs = self.get_list_of_pairs_by_mappings(list)
- return self.remove_nones_from_dict(envs)
-
- def remove_nones_from_dict(self, dictionary):
- return dict((k, v) for k, v in dictionary.iteritems() if k is not None)
-
- def get_list_of_pairs_by_mappings(self, list):
- mappings = self.get_mappings()
- listOfEnvs = map(lambda k: k.split('='), list)
- return dict((mappings.get(a[0]), a[1]) for a in listOfEnvs)
-
- def get_mappings(self):
- return {'COMMON_NAME':'CN', 'ORGANIZATION':'O', 'ORGANIZATION_UNIT':'OU', 'LOCATION':'L', 'STATE':'ST', 'COUNTRY':'C', 'SANS':'SANS'}
-
diff --git a/tests/aaf/certservice/libraries/CertClientManager.py b/tests/aaf/certservice/libraries/CertClientManager.py
deleted file mode 100644
index a4a0df23..00000000
--- a/tests/aaf/certservice/libraries/CertClientManager.py
+++ /dev/null
@@ -1,72 +0,0 @@
-import docker
-import os
-import shutil
-import re
-from EnvsReader import EnvsReader
-from docker.types import Mount
-
-ARCHIVES_PATH = os.getenv("WORKSPACE") + "/archives/"
-
-ERROR_API_REGEX = 'Error on API response.*[0-9]{3}'
-RESPONSE_CODE_REGEX = '[0-9]{3}'
-
-
-class CertClientManager:
-
- def __init__(self, mount_path, truststore_path):
- self.mount_path = mount_path
- self.truststore_path = truststore_path
-
- def run_client_container(self, client_image, container_name, path_to_env, request_url, network):
- self.create_mount_dir()
- client = docker.from_env()
- environment = EnvsReader().read_env_list_from_file(path_to_env)
- environment.append("REQUEST_URL=" + request_url)
- container = client.containers.run(
- image=client_image,
- name=container_name,
- environment=environment,
- network=network,
- user='root', # Run container as root to avoid permission issues with volume mount access
- mounts=[Mount(target='/var/certs', source=self.mount_path, type='bind'),
- Mount(target='/etc/onap/aaf/certservice/certs/', source=self.truststore_path, type='bind')],
- detach=True
- )
- exitcode = container.wait()
- return exitcode
-
- def remove_client_container_and_save_logs(self, container_name, log_file_name):
- client = docker.from_env()
- container = client.containers.get(container_name)
- text_file = open(ARCHIVES_PATH + "client_container_" + log_file_name + ".log", "w")
- text_file.write(container.logs())
- text_file.close()
- container.remove()
- self.remove_mount_dir()
-
- def create_mount_dir(self):
- if not os.path.exists(self.mount_path):
- os.makedirs(self.mount_path)
-
- def remove_mount_dir(self):
- shutil.rmtree(self.mount_path)
-
- def can_find_api_response_in_logs(self, container_name):
- logs = self.get_container_logs(container_name)
- api_logs = re.findall(ERROR_API_REGEX, logs)
- if api_logs:
- return True
- else:
- return False
-
- def get_api_response_from_logs(self, container_name):
- logs = self.get_container_logs(container_name)
- error_api_message = re.findall(ERROR_API_REGEX, logs)
- code = re.findall(RESPONSE_CODE_REGEX, error_api_message[0])
- return code[0]
-
- def get_container_logs(self, container_name):
- client = docker.from_env()
- container = client.containers.get(container_name)
- logs = container.logs()
- return logs
diff --git a/tests/aaf/certservice/libraries/EnvsReader.py b/tests/aaf/certservice/libraries/EnvsReader.py
deleted file mode 100644
index cc60eed6..00000000
--- a/tests/aaf/certservice/libraries/EnvsReader.py
+++ /dev/null
@@ -1,11 +0,0 @@
-
-class EnvsReader:
-
- def read_env_list_from_file(self, path):
- f = open(path, "r")
- r_list = []
- for line in f:
- line = line.strip()
- if line[0] != "#":
- r_list.append(line)
- return r_list
diff --git a/tests/aaf/certservice/libraries/JksArtifactsValidator.py b/tests/aaf/certservice/libraries/JksArtifactsValidator.py
deleted file mode 100644
index e2fdde91..00000000
--- a/tests/aaf/certservice/libraries/JksArtifactsValidator.py
+++ /dev/null
@@ -1,45 +0,0 @@
-import jks
-from OpenSSL import crypto
-from cryptography import x509
-from cryptography.hazmat.backends import default_backend
-from EnvsReader import EnvsReader
-from ArtifactParser import ArtifactParser
-
-class JksArtifactsValidator:
-
- def __init__(self, mount_path):
- self.parser = ArtifactParser(mount_path, "jks")
-
- def get_and_compare_data_jks(self, path_to_env):
- data = self.get_data_jks(path_to_env)
- return data, self.parser.contains_expected_data(data)
-
- def get_keystore(self):
- keystore = jks.KeyStore.load(self.parser.keystorePath, open(self.parser.keystorePassPath, 'rb').read())
- return keystore.private_keys['certificate'].cert_chain[0][1]
-
- def get_truststore(self):
- truststore = jks.KeyStore.load(self.parser.truststorePath, open(self.parser.truststorePassPath, 'rb').read())
- return truststore.certs
-
- def can_open_keystore_and_truststore_with_pass_jks(self):
- try:
- jks.KeyStore.load(self.parser.keystorePath, open(self.parser.keystorePassPath, 'rb').read())
- jks.KeyStore.load(self.parser.truststorePath, open(self.parser.truststorePassPath, 'rb').read())
- return True
- except:
- return False
-
- def get_data_jks(self, path_to_env):
- envs = self.parser.get_envs_as_dict(EnvsReader().read_env_list_from_file(path_to_env))
- certificate = self.get_keystore_certificate()
- data = self.parser.get_owner_data_from_certificate(certificate)
- data['SANS'] = self.parser.get_sans(certificate)
- return type('', (object,), {"expectedData": envs, "actualData": data})
-
- def get_keystore_certificate(self):
- return crypto.X509.from_cryptography(self.load_x509_certificate(self.get_keystore()))
-
- def load_x509_certificate(self, data):
- cert = x509.load_der_x509_certificate(data, default_backend())
- return cert
diff --git a/tests/aaf/certservice/libraries/P12ArtifactsValidator.py b/tests/aaf/certservice/libraries/P12ArtifactsValidator.py
deleted file mode 100644
index b0701718..00000000
--- a/tests/aaf/certservice/libraries/P12ArtifactsValidator.py
+++ /dev/null
@@ -1,37 +0,0 @@
-from OpenSSL import crypto
-from EnvsReader import EnvsReader
-from ArtifactParser import ArtifactParser
-
-class P12ArtifactsValidator:
-
- def __init__(self, mount_path):
- self.parser = ArtifactParser(mount_path, "p12")
-
- def get_and_compare_data_p12(self, path_to_env):
- data = self.get_data(path_to_env)
- return data, self.parser.contains_expected_data(data)
-
- def can_open_keystore_and_truststore_with_pass(self):
- can_open_keystore = self.can_open_store_file_with_pass_file(self.parser.keystorePassPath, self.parser.keystorePath)
- can_open_truststore = self.can_open_store_file_with_pass_file(self.parser.truststorePassPath, self.parser.truststorePath)
-
- return can_open_keystore & can_open_truststore
-
- def can_open_store_file_with_pass_file(self, pass_file_path, store_file_path):
- try:
- self.get_certificate(pass_file_path, store_file_path)
- return True
- except:
- return False
-
- def get_data(self, path_to_env):
- envs = self.parser.get_envs_as_dict(EnvsReader().read_env_list_from_file(path_to_env))
- certificate = self.get_certificate(self.parser.keystorePassPath, self.parser.keystorePath)
- data = self.parser.get_owner_data_from_certificate(certificate)
- data['SANS'] = self.parser.get_sans(certificate)
- return type('', (object,), {"expectedData": envs, "actualData": data})
-
- def get_certificate(self, pass_file_path, store_file_path):
- password = open(pass_file_path, 'rb').read()
- crypto.load_pkcs12(open(store_file_path, 'rb').read(), password)
- return crypto.load_pkcs12(open(store_file_path, 'rb').read(), password).get_certificate()
diff --git a/tests/aaf/certservice/libraries/PemArtifactsValidator.py b/tests/aaf/certservice/libraries/PemArtifactsValidator.py
deleted file mode 100644
index 46e0357e..00000000
--- a/tests/aaf/certservice/libraries/PemArtifactsValidator.py
+++ /dev/null
@@ -1,39 +0,0 @@
-import os
-from OpenSSL import crypto
-from cryptography import x509
-from cryptography.hazmat.backends import default_backend
-from EnvsReader import EnvsReader
-from ArtifactParser import ArtifactParser
-
-class PemArtifactsValidator:
-
- def __init__(self, mount_path):
- self.parser = ArtifactParser(mount_path, "pem")
- self.key = mount_path + '/key.pem'
-
- def get_and_compare_data_pem(self, path_to_env):
- data = self.get_data_pem(path_to_env)
- return data, self.parser.contains_expected_data(data)
-
- def artifacts_exist_and_are_not_empty(self):
- keystoreExists = self.file_exists_and_is_not_empty(self.parser.keystorePath)
- truststoreExists = self.file_exists_and_is_not_empty(self.parser.truststorePath)
- keyExists = self.file_exists_and_is_not_empty(self.key)
- return keystoreExists and truststoreExists and keyExists
-
- def file_exists_and_is_not_empty(self, pathToFile):
- return os.path.isfile(pathToFile) and os.path.getsize(pathToFile) > 0
-
- def get_data_pem(self, path_to_env):
- envs = self.parser.get_envs_as_dict(EnvsReader().read_env_list_from_file(path_to_env))
- certificate = self.get_keystore_certificate()
- data = self.parser.get_owner_data_from_certificate(certificate)
- data['SANS'] = self.parser.get_sans(certificate)
- return type('', (object,), {"expectedData": envs, "actualData": data})
-
- def get_keystore_certificate(self):
- return crypto.X509.from_cryptography(self.load_x509_certificate())
-
- def load_x509_certificate(self):
- cert = x509.load_pem_x509_certificate(open(self.parser.keystorePath, 'rb').read(), default_backend())
- return cert