aboutsummaryrefslogtreecommitdiffstats
path: root/security/onap_security/security_tests.py
diff options
context:
space:
mode:
Diffstat (limited to 'security/onap_security/security_tests.py')
-rw-r--r--security/onap_security/security_tests.py173
1 files changed, 173 insertions, 0 deletions
diff --git a/security/onap_security/security_tests.py b/security/onap_security/security_tests.py
new file mode 100644
index 0000000..4136f66
--- /dev/null
+++ b/security/onap_security/security_tests.py
@@ -0,0 +1,173 @@
+#!/usr/bin/env python
+#
+# Copyright (c) 2018 All rights reserved
+# This program and the accompanying materials
+# are made available under the terms of the Apache License, Version 2.0
+# which accompanies this distribution, and is available at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+
+"""
+Define the parent for Kubernetes testing.
+"""
+
+from __future__ import division
+
+import logging
+import subprocess
+import time
+
+from xtesting.core import testcase
+from kubernetes import client, config
+
+
+class SecurityTesting(testcase.TestCase):
+ """Security test runner"""
+
+ __logger = logging.getLogger(__name__)
+
+ def __init__(self, **kwargs):
+ super(SecurityTesting, self).__init__(**kwargs)
+ self.cmd = []
+ self.result = 0
+ self.details = {}
+ self.start_time = 0
+ self.stop_time = 0
+ self.error_string = ""
+
+ def run_security(self): # pylint: disable=too-many-branches
+ """Run the test suites"""
+ cmd_line = self.cmd
+ self.__logger.info("Starting k8s test: '%s'.", cmd_line)
+
+ process = subprocess.Popen(cmd_line, stdout=subprocess.PIPE,
+ stderr=subprocess.STDOUT)
+ process.wait()
+ output = process.stdout.read().decode("utf-8")
+ if ('Error loading client' in output or
+ 'Unexpected error' in output):
+ raise Exception(output)
+
+ # create a log file
+ file_name = "/var/lib/xtesting/results/" + self.case_name + ".log"
+ log_file = open(file_name, "w")
+ log_file.write(output)
+ log_file.close()
+
+ # we consider the command return code for success criteria
+ if process.returncode is None:
+ success = False
+ details = self.error_string
+ if (self.case_name == 'kube_hunter' and
+ "No vulnerabilities were found" in output):
+ success = True
+ elif process.returncode != 0:
+ success = False
+ details = self.error_string
+ else:
+ success = True
+ details = "Test PASS"
+
+ self.details = details
+ self.__logger.info("details: %s", details)
+
+ if success:
+ self.result = 100
+ else:
+ self.result = 0
+
+ def run(self, **kwargs):
+ """Generic Run."""
+
+ self.start_time = time.time()
+ try:
+ self.run_security()
+ res = self.EX_OK
+ except Exception: # pylint: disable=broad-except
+ self.__logger.exception("Error with running Security tests:")
+ res = self.EX_RUN_ERROR
+
+ self.stop_time = time.time()
+ return res
+
+
+class OnapSecurityDockerRootTest(SecurityTesting):
+ """Test that the dockers launched as root."""
+ def __init__(self, **kwargs):
+ if "case_name" not in kwargs:
+ kwargs.get("case_name", 'root_pods')
+ super(OnapSecurityDockerRootTest, self).__init__(**kwargs)
+ self.cmd = ['/check_security_root.sh', 'onap', '-l', '/root_pods_xfail.txt']
+ self.error_string = "Pods launched with root users"
+
+
+class OnapSecurityUnlimittedPodTest(SecurityTesting):
+ """Check that no pod is launch without limits."""
+ def __init__(self, **kwargs):
+ if "case_name" not in kwargs:
+ kwargs.get("case_name", 'unlimitted_pods')
+ super(OnapSecurityUnlimittedPodTest, self).__init__(**kwargs)
+ self.cmd = ['/check_unlimitted_pods.sh']
+ self.error_string = "Pods lauched without limits"
+
+
+class OnapSecurityCisKubernetes(SecurityTesting):
+ """Check that kubernetes install is CIS compliant"""
+ def __init__(self, **kwargs):
+ if "case_name" not in kwargs:
+ kwargs.get("case_name", 'cis_kubernetes')
+ super(OnapSecurityCisKubernetes, self).__init__(**kwargs)
+ self.cmd = ['/check_cis_kubernetes.sh']
+ self.error_string = "Kubernetes Deployment is not CIS compatible"
+
+
+class OnapSecurityHttpPorts(SecurityTesting):
+ """Check all ports exposed outside of kubernetes cluster looking for plain
+ http endpoint."""
+ def __init__(self, **kwargs):
+ if "case_name" not in kwargs:
+ kwargs.get("case_name", 'http_public_endpoints')
+ super(OnapSecurityHttpPorts, self).__init__(**kwargs)
+ self.cmd = ['/check_for_nonssl_endpoints.sh', 'onap', '-l', '/nonssl_xfail.txt']
+ self.error_string = "Public http endpoints still found"
+
+
+class OnapSecurityNonSSLPorts(SecurityTesting):
+ """Check that all ports exposed outside of kubernetes cluster use SSL
+ tunnels."""
+ def __init__(self, **kwargs):
+ if "case_name" not in kwargs:
+ kwargs.get("case_name", 'nonssl_endpoints')
+ super(OnapSecurityNonSSLPorts, self).__init__(**kwargs)
+ self.cmd = ['/usr/local/bin/sslendpoints', '-xfail', '/nonssl_xfail.txt']
+ self.error_string = "Public non-SSL endpoints still found"
+
+
+class OnapSecurityJdwpPorts(SecurityTesting):
+ """Check that no jdwp ports are exposed."""
+ def __init__(self, **kwargs):
+ if "case_name" not in kwargs:
+ kwargs.get("case_name", 'jdpw_ports')
+ super(OnapSecurityJdwpPorts, self).__init__(**kwargs)
+ self.cmd = ['/check_for_jdwp.sh', 'onap', '-l', '/jdwp_xfail.txt']
+ self.error_string = "JDWP ports found"
+
+
+class OnapSecurityKubeHunter(SecurityTesting):
+ """Check k8s vulnerabilities."""
+ def __init__(self, **kwargs):
+ if "case_name" not in kwargs:
+ kwargs.get("case_name", 'kube_hunter')
+ super(OnapSecurityKubeHunter, self).__init__(**kwargs)
+ config.load_kube_config(config_file='/root/.kube/config')
+ client_kubernetes = client.CoreV1Api()
+ node_list = client_kubernetes.list_node()
+ kube_hunter_cmd = ['/kube-hunter/kube-hunter.py', '--remote']
+ for i in node_list.items:
+ addresses = i.status.addresses
+ for j in addresses:
+ if "External" in str(j):
+ kube_hunter_cmd.append(j.address)
+ self.cmd = kube_hunter_cmd
+ self.error_string = "Vulnerabilties detected."