diff options
Diffstat (limited to 'security/onap_security/security_tests.py')
-rw-r--r-- | security/onap_security/security_tests.py | 173 |
1 files changed, 173 insertions, 0 deletions
diff --git a/security/onap_security/security_tests.py b/security/onap_security/security_tests.py new file mode 100644 index 0000000..4136f66 --- /dev/null +++ b/security/onap_security/security_tests.py @@ -0,0 +1,173 @@ +#!/usr/bin/env python +# +# Copyright (c) 2018 All rights reserved +# This program and the accompanying materials +# are made available under the terms of the Apache License, Version 2.0 +# which accompanies this distribution, and is available at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# + +""" +Define the parent for Kubernetes testing. +""" + +from __future__ import division + +import logging +import subprocess +import time + +from xtesting.core import testcase +from kubernetes import client, config + + +class SecurityTesting(testcase.TestCase): + """Security test runner""" + + __logger = logging.getLogger(__name__) + + def __init__(self, **kwargs): + super(SecurityTesting, self).__init__(**kwargs) + self.cmd = [] + self.result = 0 + self.details = {} + self.start_time = 0 + self.stop_time = 0 + self.error_string = "" + + def run_security(self): # pylint: disable=too-many-branches + """Run the test suites""" + cmd_line = self.cmd + self.__logger.info("Starting k8s test: '%s'.", cmd_line) + + process = subprocess.Popen(cmd_line, stdout=subprocess.PIPE, + stderr=subprocess.STDOUT) + process.wait() + output = process.stdout.read().decode("utf-8") + if ('Error loading client' in output or + 'Unexpected error' in output): + raise Exception(output) + + # create a log file + file_name = "/var/lib/xtesting/results/" + self.case_name + ".log" + log_file = open(file_name, "w") + log_file.write(output) + log_file.close() + + # we consider the command return code for success criteria + if process.returncode is None: + success = False + details = self.error_string + if (self.case_name == 'kube_hunter' and + "No vulnerabilities were found" in output): + success = True + elif process.returncode != 0: + success = False + details = self.error_string + else: + success = True + details = "Test PASS" + + self.details = details + self.__logger.info("details: %s", details) + + if success: + self.result = 100 + else: + self.result = 0 + + def run(self, **kwargs): + """Generic Run.""" + + self.start_time = time.time() + try: + self.run_security() + res = self.EX_OK + except Exception: # pylint: disable=broad-except + self.__logger.exception("Error with running Security tests:") + res = self.EX_RUN_ERROR + + self.stop_time = time.time() + return res + + +class OnapSecurityDockerRootTest(SecurityTesting): + """Test that the dockers launched as root.""" + def __init__(self, **kwargs): + if "case_name" not in kwargs: + kwargs.get("case_name", 'root_pods') + super(OnapSecurityDockerRootTest, self).__init__(**kwargs) + self.cmd = ['/check_security_root.sh', 'onap', '-l', '/root_pods_xfail.txt'] + self.error_string = "Pods launched with root users" + + +class OnapSecurityUnlimittedPodTest(SecurityTesting): + """Check that no pod is launch without limits.""" + def __init__(self, **kwargs): + if "case_name" not in kwargs: + kwargs.get("case_name", 'unlimitted_pods') + super(OnapSecurityUnlimittedPodTest, self).__init__(**kwargs) + self.cmd = ['/check_unlimitted_pods.sh'] + self.error_string = "Pods lauched without limits" + + +class OnapSecurityCisKubernetes(SecurityTesting): + """Check that kubernetes install is CIS compliant""" + def __init__(self, **kwargs): + if "case_name" not in kwargs: + kwargs.get("case_name", 'cis_kubernetes') + super(OnapSecurityCisKubernetes, self).__init__(**kwargs) + self.cmd = ['/check_cis_kubernetes.sh'] + self.error_string = "Kubernetes Deployment is not CIS compatible" + + +class OnapSecurityHttpPorts(SecurityTesting): + """Check all ports exposed outside of kubernetes cluster looking for plain + http endpoint.""" + def __init__(self, **kwargs): + if "case_name" not in kwargs: + kwargs.get("case_name", 'http_public_endpoints') + super(OnapSecurityHttpPorts, self).__init__(**kwargs) + self.cmd = ['/check_for_nonssl_endpoints.sh', 'onap', '-l', '/nonssl_xfail.txt'] + self.error_string = "Public http endpoints still found" + + +class OnapSecurityNonSSLPorts(SecurityTesting): + """Check that all ports exposed outside of kubernetes cluster use SSL + tunnels.""" + def __init__(self, **kwargs): + if "case_name" not in kwargs: + kwargs.get("case_name", 'nonssl_endpoints') + super(OnapSecurityNonSSLPorts, self).__init__(**kwargs) + self.cmd = ['/usr/local/bin/sslendpoints', '-xfail', '/nonssl_xfail.txt'] + self.error_string = "Public non-SSL endpoints still found" + + +class OnapSecurityJdwpPorts(SecurityTesting): + """Check that no jdwp ports are exposed.""" + def __init__(self, **kwargs): + if "case_name" not in kwargs: + kwargs.get("case_name", 'jdpw_ports') + super(OnapSecurityJdwpPorts, self).__init__(**kwargs) + self.cmd = ['/check_for_jdwp.sh', 'onap', '-l', '/jdwp_xfail.txt'] + self.error_string = "JDWP ports found" + + +class OnapSecurityKubeHunter(SecurityTesting): + """Check k8s vulnerabilities.""" + def __init__(self, **kwargs): + if "case_name" not in kwargs: + kwargs.get("case_name", 'kube_hunter') + super(OnapSecurityKubeHunter, self).__init__(**kwargs) + config.load_kube_config(config_file='/root/.kube/config') + client_kubernetes = client.CoreV1Api() + node_list = client_kubernetes.list_node() + kube_hunter_cmd = ['/kube-hunter/kube-hunter.py', '--remote'] + for i in node_list.items: + addresses = i.status.addresses + for j in addresses: + if "External" in str(j): + kube_hunter_cmd.append(j.address) + self.cmd = kube_hunter_cmd + self.error_string = "Vulnerabilties detected." |