diff options
Diffstat (limited to 'test/security/check_certificates/check_certificates/check_certificates_validity.py')
-rw-r--r-- | test/security/check_certificates/check_certificates/check_certificates_validity.py | 38 |
1 files changed, 34 insertions, 4 deletions
diff --git a/test/security/check_certificates/check_certificates/check_certificates_validity.py b/test/security/check_certificates/check_certificates/check_certificates_validity.py index a6fd9cd1b..5d19a7390 100644 --- a/test/security/check_certificates/check_certificates/check_certificates_validity.py +++ b/test/security/check_certificates/check_certificates/check_certificates_validity.py @@ -46,6 +46,10 @@ import OpenSSL from datetime import datetime from kubernetes import client, config from jinja2 import Environment, FileSystemLoader, select_autoescape +from socket import * # pylint: disable=W0614 + +# Set SSL timeout +setdefaulttimeout(10) # Logger LOG_LEVEL = 'INFO' @@ -56,6 +60,7 @@ CERT_MODES = ['nodeport', 'ingress', 'internal'] EXP_CRITERIA_MIN = 30 EXP_CRITERIA_MAX = 389 EXPECTED_CERT_STRING = "C=US;O=ONAP;OU=OSAAF;CN=intermediateCA_9" +EXPECTED_STRIMZI_CA_CERT_STRING = "O=io.strimzi;CN=cluster-ca v0" RESULT_PATH = "." @@ -89,6 +94,10 @@ args = parser.parse_args() onap_namespace = args.namespace LOGGER.info("Verification of the %s certificates started", onap_namespace) +# Create the target dir (in case it does not exist) +if os.pardir not in args.dir: + os.makedirs(args.dir, exist_ok=True) + # Nodeport specific section # Retrieve the kubernetes IP for mode nodeport if args.mode == "nodeport": @@ -115,10 +124,13 @@ if args.mode == "nodeport": # Kubernetes section # retrieve the candidate ports first -k8s_config = config.load_kube_config() +if args.mode == "internal": + k8s_config = config.load_incluster_config() +else: + k8s_config = config.load_kube_config() core = client.CoreV1Api() -api_instance = client.ExtensionsV1beta1Api( +api_instance = client.NetworkingV1Api( client.ApiClient(k8s_config)) k8s_services = core.list_namespaced_service(onap_namespace).items k8s_ingress = api_instance.list_namespaced_ingress(onap_namespace).items @@ -145,7 +157,7 @@ def get_certifificate_info(host, port): issuer_info += (issuer_info_key.decode('utf-8') + "=" + issuer_info_val.decode('utf-8') + ";") cert_validity = False - if issuer_info[:-1] == EXPECTED_CERT_STRING: + if issuer_info[:-1] in [EXPECTED_CERT_STRING, EXPECTED_STRIMZI_CA_CERT_STRING]: cert_validity = True return {'expiration_date': exp_date, @@ -186,8 +198,15 @@ def test_services(k8s_services, mode): if test_port in nodeports_xfail_list: error_waiver = True else: # internal mode - test_url = service.spec.selector.app test_port = port.port + test_url = '' + # in Internal mode there are 2 types + # app + # app.kubernetes.io/name + try: + test_url = service.spec.selector['app'] + except KeyError: + test_url = service.spec.selector['app.kubernetes.io/name'] if test_port is not None: LOGGER.info( @@ -246,6 +265,8 @@ def test_services(k8s_services, mode): {'pod_name': test_name, 'pod_port': test_port, 'error_details': str(e)}) + except: + LOGGER.error("Unknown error") # Create html summary jinja_env = Environment( @@ -259,6 +280,15 @@ def test_services(k8s_services, mode): node_ports_type_error_list=node_ports_type_error_list, node_ports_reset_error_list=node_ports_reset_error_list).dump( '{}/certificates.html'.format(args.dir)) + else: + jinja_env.get_template('cert-internal.html.j2').stream( + node_ports_list=node_ports_list, + node_ports_ssl_error_list=node_ports_ssl_error_list, + node_ports_connection_error_list=node_ports_connection_error_list, + node_ports_type_error_list=node_ports_type_error_list, + node_ports_reset_error_list=node_ports_reset_error_list).dump( + '{}/certificates.html'.format(args.dir)) + return success_criteria |