aboutsummaryrefslogtreecommitdiffstats
path: root/tests/oom-platform-cert-service/postprocessor/libraries
diff options
context:
space:
mode:
Diffstat (limited to 'tests/oom-platform-cert-service/postprocessor/libraries')
-rw-r--r--tests/oom-platform-cert-service/postprocessor/libraries/EnvsReader.py11
-rw-r--r--tests/oom-platform-cert-service/postprocessor/libraries/JksValidator.py28
-rw-r--r--tests/oom-platform-cert-service/postprocessor/libraries/PemTruststoreValidator.py19
-rw-r--r--tests/oom-platform-cert-service/postprocessor/libraries/TrustMergerManager.py47
4 files changed, 105 insertions, 0 deletions
diff --git a/tests/oom-platform-cert-service/postprocessor/libraries/EnvsReader.py b/tests/oom-platform-cert-service/postprocessor/libraries/EnvsReader.py
new file mode 100644
index 00000000..cc60eed6
--- /dev/null
+++ b/tests/oom-platform-cert-service/postprocessor/libraries/EnvsReader.py
@@ -0,0 +1,11 @@
+
+class EnvsReader:
+
+ def read_env_list_from_file(self, path):
+ f = open(path, "r")
+ r_list = []
+ for line in f:
+ line = line.strip()
+ if line[0] != "#":
+ r_list.append(line)
+ return r_list
diff --git a/tests/oom-platform-cert-service/postprocessor/libraries/JksValidator.py b/tests/oom-platform-cert-service/postprocessor/libraries/JksValidator.py
new file mode 100644
index 00000000..983f66bb
--- /dev/null
+++ b/tests/oom-platform-cert-service/postprocessor/libraries/JksValidator.py
@@ -0,0 +1,28 @@
+
+import jks
+
+class JksValidator:
+
+ def get_jks_entries(self, jks_path, password_path):
+ store = jks.KeyStore.load(jks_path, open(password_path, 'rb').read())
+ return store.entries
+
+ def assert_jks_truststores_equal(self, result_truststore_path, password_path, expected_truststore_path):
+ result_keys = self.get_jks_entries(result_truststore_path, password_path)
+ expected_keys = self.get_jks_entries(expected_truststore_path, password_path)
+ if len(result_keys) != len(expected_keys):
+ return False
+ for k in result_keys:
+ if not (k in expected_keys and result_keys[k].cert == expected_keys[k].cert):
+ return False
+ return True
+
+ def assert_jks_keystores_equal(self, result_keystore_path, password_path, expected_keystore_path):
+ result_keys = self.get_jks_entries(result_keystore_path, password_path)
+ expected_keys = self.get_jks_entries(expected_keystore_path, password_path)
+ if len(result_keys) != len(expected_keys):
+ return False
+ for k in result_keys:
+ if not (k in expected_keys and result_keys[k].pkey == expected_keys[k].pkey):
+ return False
+ return True
diff --git a/tests/oom-platform-cert-service/postprocessor/libraries/PemTruststoreValidator.py b/tests/oom-platform-cert-service/postprocessor/libraries/PemTruststoreValidator.py
new file mode 100644
index 00000000..8dc9623d
--- /dev/null
+++ b/tests/oom-platform-cert-service/postprocessor/libraries/PemTruststoreValidator.py
@@ -0,0 +1,19 @@
+import re
+
+BEGIN_CERT = "-----BEGIN CERTIFICATE-----"
+END_CERT = "-----END CERTIFICATE-----"
+
+class PemTruststoreValidator:
+
+ def assert_pem_truststores_equal(self, result_pem_path, expected_pem_path):
+ result_certs = self.get_list_of_pem_certificates(result_pem_path)
+ expected_certs = self.get_list_of_pem_certificates(expected_pem_path)
+ result_certs.sort()
+ expected_certs.sort()
+ if len(result_certs) != len(expected_certs):
+ return False
+ return result_certs == expected_certs
+
+
+ def get_list_of_pem_certificates(self, path):
+ return re.findall(BEGIN_CERT + '(.+?)' + END_CERT, open(path, 'rb').read(), re.DOTALL)
diff --git a/tests/oom-platform-cert-service/postprocessor/libraries/TrustMergerManager.py b/tests/oom-platform-cert-service/postprocessor/libraries/TrustMergerManager.py
new file mode 100644
index 00000000..f7a493c4
--- /dev/null
+++ b/tests/oom-platform-cert-service/postprocessor/libraries/TrustMergerManager.py
@@ -0,0 +1,47 @@
+import docker
+import os
+import shutil
+from EnvsReader import EnvsReader
+from docker.types import Mount
+
+ARCHIVES_PATH = os.getenv("WORKSPACE") + "/archives/"
+
+
+class TrustMergerManager:
+
+ def __init__(self, mount_path, truststores_path):
+ self.mount_path = mount_path
+ self.truststores_path = truststores_path
+
+ def run_merger_container(self, merger_image, merger_name, path_to_env):
+ self.remove_mount_dir()
+ shutil.copytree(self.truststores_path, self.mount_path)
+ client = docker.from_env()
+ environment = EnvsReader().read_env_list_from_file(path_to_env)
+ container = client.containers.run(
+ image=merger_image,
+ name=merger_name,
+ environment=environment,
+ user='root', # Run container as root to avoid permission issues with volume mount access
+ mounts=[Mount(target='/var/certs', source=self.mount_path, type='bind')],
+ detach=True
+ )
+ exitcode = container.wait()
+ return exitcode
+
+ def create_mount_dir(self):
+ if not os.path.exists(self.mount_path):
+ os.makedirs(self.mount_path)
+
+ def remove_mount_dir(self):
+ if os.path.exists(self.mount_path):
+ shutil.rmtree(self.mount_path)
+
+ def remove_merger_container_and_save_logs(self, container_name, log_file_name):
+ client = docker.from_env()
+ container = client.containers.get(container_name)
+ text_file = open(ARCHIVES_PATH + "merger_container_" + log_file_name + ".log", "w")
+ text_file.write(container.logs())
+ text_file.close()
+ container.remove()
+ self.remove_mount_dir()