aboutsummaryrefslogtreecommitdiffstats
path: root/test/security/check_certificates/check_certificates/check_certificates_validity.py
diff options
context:
space:
mode:
Diffstat (limited to 'test/security/check_certificates/check_certificates/check_certificates_validity.py')
-rw-r--r--test/security/check_certificates/check_certificates/check_certificates_validity.py38
1 files changed, 34 insertions, 4 deletions
diff --git a/test/security/check_certificates/check_certificates/check_certificates_validity.py b/test/security/check_certificates/check_certificates/check_certificates_validity.py
index a6fd9cd1b..5d19a7390 100644
--- a/test/security/check_certificates/check_certificates/check_certificates_validity.py
+++ b/test/security/check_certificates/check_certificates/check_certificates_validity.py
@@ -46,6 +46,10 @@ import OpenSSL
from datetime import datetime
from kubernetes import client, config
from jinja2 import Environment, FileSystemLoader, select_autoescape
+from socket import * # pylint: disable=W0614
+
+# Set SSL timeout
+setdefaulttimeout(10)
# Logger
LOG_LEVEL = 'INFO'
@@ -56,6 +60,7 @@ CERT_MODES = ['nodeport', 'ingress', 'internal']
EXP_CRITERIA_MIN = 30
EXP_CRITERIA_MAX = 389
EXPECTED_CERT_STRING = "C=US;O=ONAP;OU=OSAAF;CN=intermediateCA_9"
+EXPECTED_STRIMZI_CA_CERT_STRING = "O=io.strimzi;CN=cluster-ca v0"
RESULT_PATH = "."
@@ -89,6 +94,10 @@ args = parser.parse_args()
onap_namespace = args.namespace
LOGGER.info("Verification of the %s certificates started", onap_namespace)
+# Create the target dir (in case it does not exist)
+if os.pardir not in args.dir:
+ os.makedirs(args.dir, exist_ok=True)
+
# Nodeport specific section
# Retrieve the kubernetes IP for mode nodeport
if args.mode == "nodeport":
@@ -115,10 +124,13 @@ if args.mode == "nodeport":
# Kubernetes section
# retrieve the candidate ports first
-k8s_config = config.load_kube_config()
+if args.mode == "internal":
+ k8s_config = config.load_incluster_config()
+else:
+ k8s_config = config.load_kube_config()
core = client.CoreV1Api()
-api_instance = client.ExtensionsV1beta1Api(
+api_instance = client.NetworkingV1Api(
client.ApiClient(k8s_config))
k8s_services = core.list_namespaced_service(onap_namespace).items
k8s_ingress = api_instance.list_namespaced_ingress(onap_namespace).items
@@ -145,7 +157,7 @@ def get_certifificate_info(host, port):
issuer_info += (issuer_info_key.decode('utf-8') + "=" +
issuer_info_val.decode('utf-8') + ";")
cert_validity = False
- if issuer_info[:-1] == EXPECTED_CERT_STRING:
+ if issuer_info[:-1] in [EXPECTED_CERT_STRING, EXPECTED_STRIMZI_CA_CERT_STRING]:
cert_validity = True
return {'expiration_date': exp_date,
@@ -186,8 +198,15 @@ def test_services(k8s_services, mode):
if test_port in nodeports_xfail_list:
error_waiver = True
else: # internal mode
- test_url = service.spec.selector.app
test_port = port.port
+ test_url = ''
+ # in Internal mode there are 2 types
+ # app
+ # app.kubernetes.io/name
+ try:
+ test_url = service.spec.selector['app']
+ except KeyError:
+ test_url = service.spec.selector['app.kubernetes.io/name']
if test_port is not None:
LOGGER.info(
@@ -246,6 +265,8 @@ def test_services(k8s_services, mode):
{'pod_name': test_name,
'pod_port': test_port,
'error_details': str(e)})
+ except:
+ LOGGER.error("Unknown error")
# Create html summary
jinja_env = Environment(
@@ -259,6 +280,15 @@ def test_services(k8s_services, mode):
node_ports_type_error_list=node_ports_type_error_list,
node_ports_reset_error_list=node_ports_reset_error_list).dump(
'{}/certificates.html'.format(args.dir))
+ else:
+ jinja_env.get_template('cert-internal.html.j2').stream(
+ node_ports_list=node_ports_list,
+ node_ports_ssl_error_list=node_ports_ssl_error_list,
+ node_ports_connection_error_list=node_ports_connection_error_list,
+ node_ports_type_error_list=node_ports_type_error_list,
+ node_ports_reset_error_list=node_ports_reset_error_list).dump(
+ '{}/certificates.html'.format(args.dir))
+
return success_criteria