path: root/test/security/check_versions/src
diff options
Diffstat (limited to 'test/security/check_versions/src')
1 files changed, 691 insertions, 0 deletions
diff --git a/test/security/check_versions/src/k8s_bin_versions_inspector.py b/test/security/check_versions/src/k8s_bin_versions_inspector.py
new file mode 100644
index 000000000..bda7322f5
--- /dev/null
+++ b/test/security/check_versions/src/k8s_bin_versions_inspector.py
@@ -0,0 +1,691 @@
+#!/usr/bin/env python3
+# Copyright 2020 Samsung Electronics Co., Ltd.
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+# http://www.apache.org/licenses/LICENSE-2.0
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# See the License for the specific language governing permissions and
+# limitations under the License.
+k8s_bin_versions_inspector is a module for verifying versions of CPython and
+OpenJDK binaries installed in the kubernetes cluster containers.
+__title__ = "k8s_bin_versions_inspector"
+__summary__ = (
+ "Module for verifying versions of CPython and OpenJDK binaries installed"
+ " in the kubernetes cluster containers."
+__version__ = "0.1.0"
+__author__ = "kkkk.k@samsung.com"
+__license__ = "Apache-2.0"
+__copyright__ = "Copyright 2020 Samsung Electronics Co., Ltd."
+from typing import Iterable, List, Optional, Pattern, Union
+import argparse
+import dataclasses
+import itertools
+import json
+import pathlib
+import pprint
+import re
+import string
+import sys
+import tabulate
+import yaml
+import cerberus
+import kubernetes
+def parse_argv(argv: Optional[List[str]] = None) -> argparse.Namespace:
+ """Function for parsing command line arguments.
+ Args:
+ argv: Unparsed list of command line arguments.
+ Returns:
+ Namespace with values from parsed arguments.
+ """
+ epilog = (
+ f"Author: {__author__}\n"
+ f"License: {__license__}\n"
+ f"Copyright: {__copyright__}\n"
+ )
+ parser = argparse.ArgumentParser(
+ formatter_class=argparse.RawTextHelpFormatter,
+ prog=__title__,
+ description=__summary__,
+ epilog=epilog,
+ add_help=False,
+ )
+ parser.add_argument("-c", "--config-file", help="Name of the kube-config file.")
+ parser.add_argument(
+ "-s",
+ "--field-selector",
+ default="",
+ help="Kubernetes field selector, to filter out containers objects.",
+ )
+ parser.add_argument(
+ "-o",
+ "--output-file",
+ type=pathlib.Path,
+ help="Path to file, where output will be saved.",
+ )
+ parser.add_argument(
+ "-f",
+ "--output-format",
+ choices=("tabulate", "pprint", "json"),
+ default="tabulate",
+ help="Format of the output file (tabulate, pprint, json).",
+ )
+ parser.add_argument(
+ "-i",
+ "--ignore-empty",
+ action="store_true",
+ help="Ignore containers without any versions.",
+ )
+ parser.add_argument(
+ "-a",
+ "--acceptable",
+ type=pathlib.Path,
+ help="Path to YAML file, with list of acceptable software versions.",
+ )
+ parser.add_argument(
+ "-d",
+ "--debug",
+ action="store_true",
+ help="Enable debugging mode in the k8s API.",
+ )
+ parser.add_argument(
+ "-q",
+ "--quiet",
+ action="store_true",
+ help="Suppress printing text on standard output.",
+ )
+ parser.add_argument(
+ "-V",
+ "--version",
+ action="version",
+ version=f"{__title__} {__version__}",
+ help="Display version information and exit.",
+ )
+ parser.add_argument(
+ "-h", "--help", action="help", help="Display this help text and exit."
+ )
+ args = parser.parse_args(argv)
+ return args
+class ContainerExtra:
+ "Data class, to storage extra informations about container."
+ running: bool
+ image: str
+ identifier: str
+class ContainerVersions:
+ "Data class, to storage software versions from container."
+ python: list
+ java: list
+class ContainerInfo:
+ "Data class, to storage multiple informations about container."
+ namespace: str
+ pod: str
+ container: str
+ extra: ContainerExtra
+ versions: ContainerVersions = None
+def is_container_running(
+ status: kubernetes.client.models.v1_container_status.V1ContainerStatus,
+) -> bool:
+ """Function to determine if k8s cluster container is in running state.
+ Args:
+ status: Single item from container_statuses list, that represents container status.
+ Returns:
+ If container is in running state.
+ """
+ if status.state.terminated:
+ return False
+ if status.state.waiting:
+ return False
+ if not status.state.running:
+ return False
+ return True
+def list_all_containers(
+ api: kubernetes.client.api.core_v1_api.CoreV1Api, field_selector: str,
+) -> Iterable[ContainerInfo]:
+ """Get list of all containers names.
+ Args:
+ api: Client of the k8s cluster API.
+ field_selector: Kubernetes field selector, to filter out containers objects.
+ Yields:
+ Objects for all containers in k8s cluster.
+ """
+ pods = api.list_pod_for_all_namespaces(field_selector=field_selector).items
+ containers_statuses = (
+ (pod.metadata.namespace, pod.metadata.name, pod.status.container_statuses)
+ for pod in pods
+ if pod.status.container_statuses
+ )
+ containers_status = (
+ itertools.product([namespace], [pod], statuses)
+ for namespace, pod, statuses in containers_statuses
+ )
+ containers_chained = itertools.chain.from_iterable(containers_status)
+ containers_fields = (
+ (
+ namespace,
+ pod,
+ status.name,
+ is_container_running(status),
+ status.image,
+ status.container_id,
+ )
+ for namespace, pod, status in containers_chained
+ )
+ container_items = (
+ ContainerInfo(
+ namespace, pod, container, ContainerExtra(running, image, identifier)
+ )
+ for namespace, pod, container, running, image, identifier in containers_fields
+ )
+ yield from container_items
+def sync_post_namespaced_pod_exec(
+ api: kubernetes.client.api.core_v1_api.CoreV1Api,
+ container: ContainerInfo,
+ command: Union[List[str], str],
+) -> dict:
+ """Function to execute command on selected container.
+ Args:
+ api: Client of the k8s cluster API.
+ container: Object, that represents container in k8s cluster.
+ command: Command to execute as a list of arguments or single string.
+ Returns:
+ Dictionary that store informations about command execution.
+ * stdout - Standard output captured from execution.
+ * stderr - Standard error captured from execution.
+ * error - Error object that was received from kubernetes API.
+ * code - Exit code returned by executed process
+ or -1 if container is not running
+ or -2 if other failure occurred.
+ """
+ try:
+ client = kubernetes.stream.stream(
+ api.connect_post_namespaced_pod_exec,
+ namespace=container.namespace,
+ name=container.pod,
+ container=container.container,
+ command=command,
+ stderr=True,
+ stdin=False,
+ stdout=True,
+ tty=False,
+ _request_timeout=1.0,
+ _preload_content=False,
+ )
+ except kubernetes.client.rest.ApiException:
+ if container.extra.running:
+ raise
+ return {
+ "stdout": "",
+ "stderr": "",
+ "error": {},
+ "code": -1,
+ }
+ client.run_forever(timeout=5)
+ stdout = client.read_stdout()
+ stderr = client.read_stderr()
+ error = yaml.safe_load(
+ client.read_channel(kubernetes.stream.ws_client.ERROR_CHANNEL)
+ )
+ # TODO: Is there really no better way, to check
+ # execution exit code in python k8s API client?
+ code = (
+ 0
+ if error["status"] == "Success"
+ else -2
+ if error["reason"] != "NonZeroExitCode"
+ else int(error["details"]["causes"][0]["message"])
+ )
+ return {
+ "stdout": stdout,
+ "stderr": stderr,
+ "error": error,
+ "code": code,
+ }
+def generate_python_binaries() -> List[str]:
+ """Function to generate list of names and paths for CPython binaries.
+ Returns:
+ List of names and paths, to CPython binaries.
+ """
+ dirnames = ["", "/usr/bin/", "/usr/local/bin/"]
+ majors_minors = [
+ f"{major}.{minor}" for major, minor in itertools.product("23", string.digits)
+ ]
+ suffixes = ["", "2", "3"] + majors_minors
+ basenames = [f"python{suffix}" for suffix in suffixes]
+ binaries = [f"{dir}{base}" for dir, base in itertools.product(dirnames, basenames)]
+ return binaries
+def generate_java_binaries() -> List[str]:
+ """Function to generate list of names and paths for OpenJDK binaries.
+ Returns:
+ List of names and paths, to OpenJDK binaries.
+ """
+ binaries = [
+ "java",
+ "/usr/bin/java",
+ "/usr/local/bin/java",
+ "/etc/alternatives/java",
+ "/usr/java/openjdk-14/bin/java",
+ ]
+ return binaries
+def determine_versions_abstraction(
+ api: kubernetes.client.api.core_v1_api.CoreV1Api,
+ container: ContainerInfo,
+ binaries: List[str],
+ extractor: Pattern,
+) -> List[str]:
+ """Function to determine list of software versions, that are installed in
+ given container.
+ Args:
+ api: Client of the k8s cluster API.
+ container: Object, that represents container in k8s cluster.
+ binaries: List of names and paths to the abstract software binaries.
+ extractor: Pattern to extract the version string from the output of the binary execution.
+ Returns:
+ List of installed software versions.
+ """
+ commands = ([binary, "--version"] for binary in binaries)
+ # TODO: This list comprehension should be parallelized
+ results = (
+ sync_post_namespaced_pod_exec(api, container, command) for command in commands
+ )
+ successes = (
+ f"{result['stdout']}{result['stderr']}"
+ for result in results
+ if result["code"] == 0
+ )
+ extractions = (extractor.search(success) for success in successes)
+ versions = sorted(
+ set(extraction.group(1) for extraction in extractions if extraction)
+ )
+ return versions
+def determine_versions_of_python(
+ api: kubernetes.client.api.core_v1_api.CoreV1Api, container: ContainerInfo
+) -> List[str]:
+ """Function to determine list of CPython versions,
+ that are installed in given container.
+ Args:
+ api: Client of the k8s cluster API.
+ container: Object, that represents container in k8s cluster.
+ Returns:
+ List of installed CPython versions.
+ """
+ extractor = re.compile("Python ([0-9.]+)")
+ binaries = generate_python_binaries()
+ versions = determine_versions_abstraction(api, container, binaries, extractor)
+ return versions
+def determine_versions_of_java(
+ api: kubernetes.client.api.core_v1_api.CoreV1Api, container: ContainerInfo
+) -> List[str]:
+ """Function to determine list of OpenJDK versions,
+ that are installed in given container.
+ Args:
+ api: Client of the k8s cluster API.
+ container: Object, that represents container in k8s cluster.
+ Returns:
+ List of installed OpenJDK versions.
+ """
+ extractor = re.compile("openjdk ([0-9.]+)")
+ binaries = generate_java_binaries()
+ versions = determine_versions_abstraction(api, container, binaries, extractor)
+ return versions
+def gather_containers_informations(
+ api: kubernetes.client.api.core_v1_api.CoreV1Api,
+ field_selector: str,
+ ignore_empty: bool,
+) -> List[ContainerInfo]:
+ """Get list of all containers names.
+ Args:
+ api: Client of the k8s cluster API.
+ field_selector: Kubernetes field selector, to filter out containers objects.
+ ignore_empty: Determines, if containers with empty versions should be ignored.
+ Returns:
+ List of initialized objects for containers in k8s cluster.
+ """
+ containers = list(list_all_containers(api, field_selector))
+ # TODO: This loop should be parallelized
+ for container in containers:
+ python_versions = determine_versions_of_python(api, container)
+ java_versions = determine_versions_of_java(api, container)
+ container.versions = ContainerVersions(python_versions, java_versions)
+ if ignore_empty:
+ containers = [c for c in containers if c.versions.python or c.versions.java]
+ return containers
+def generate_output_tabulate(containers: Iterable[ContainerInfo]) -> str:
+ """Function for generate output string in tabulate format.
+ Args:
+ containers: List of items, that represents containers in k8s cluster.
+ Returns:
+ Output string formatted by tabulate module.
+ """
+ headers = [
+ "Namespace",
+ "Pod",
+ "Container",
+ "Running",
+ "CPython",
+ "OpenJDK",
+ ]
+ rows = [
+ [
+ container.namespace,
+ container.pod,
+ container.container,
+ container.extra.running,
+ " ".join(container.versions.python),
+ " ".join(container.versions.java),
+ ]
+ for container in containers
+ ]
+ output = tabulate.tabulate(rows, headers=headers)
+ return output
+def generate_output_pprint(containers: Iterable[ContainerInfo]) -> str:
+ """Function for generate output string in pprint format.
+ Args:
+ containers: List of items, that represents containers in k8s cluster.
+ Returns:
+ Output string formatted by pprint module.
+ """
+ output = pprint.pformat(containers)
+ return output
+def generate_output_json(containers: Iterable[ContainerInfo]) -> str:
+ """Function for generate output string in JSON format.
+ Args:
+ containers: List of items, that represents containers in k8s cluster.
+ Returns:
+ Output string formatted by json module.
+ """
+ data = [
+ {
+ "namespace": container.namespace,
+ "pod": container.pod,
+ "container": container.container,
+ "extra": {
+ "running": container.extra.running,
+ "image": container.extra.image,
+ "identifier": container.extra.identifier,
+ },
+ "versions": {
+ "python": container.versions.python,
+ "java": container.versions.java,
+ },
+ }
+ for container in containers
+ ]
+ output = json.dumps(data, indent=4)
+ return output
+def generate_and_handle_output(
+ containers: List[ContainerInfo],
+ output_format: str,
+ output_file: pathlib.Path,
+ quiet: bool,
+) -> None:
+ """Generate and handle the output of the containers software versions.
+ Args:
+ containers: List of items, that represents containers in k8s cluster.
+ output_format: String that will determine output format (tabulate, pprint, json).
+ output_file: Path to file, where output will be save.
+ quiet: Determines if output should be printed, to stdout.
+ """
+ output_generators = {
+ "tabulate": generate_output_tabulate,
+ "pprint": generate_output_pprint,
+ "json": generate_output_json,
+ }
+ output = output_generators[output_format](containers)
+ if output_file:
+ output_file.write_text(output)
+ if not quiet:
+ print(output)
+def verify_versions_acceptability(
+ containers: List[ContainerInfo], acceptable: pathlib.Path, quiet: bool
+) -> bool:
+ """Function for verification of software versions installed in containers.
+ Args:
+ containers: List of items, that represents containers in k8s cluster.
+ acceptable: Path to the YAML file, with the software verification parameters.
+ quiet: Determines if output should be printed, to stdout.
+ Returns:
+ 0 if the verification was successful or 1 otherwise.
+ """
+ if not acceptable:
+ return 0
+ if not acceptable.is_file():
+ raise FileNotFoundError(
+ "File with configuration for acceptable does not exists!"
+ )
+ schema = {
+ "python": {"type": "list", "schema": {"type": "string"}},
+ "java": {"type": "list", "schema": {"type": "string"}},
+ }
+ validator = cerberus.Validator(schema)
+ with open(acceptable) as stream:
+ data = yaml.safe_load(stream)
+ if not validator.validate(data):
+ raise cerberus.SchemaError(
+ "Schema of file with configuration for acceptable is not valid."
+ )
+ python_acceptable = data.get("python", [])
+ java_acceptable = data.get("java", [])
+ python_not_acceptable = [
+ (container, "python", version)
+ for container in containers
+ for version in container.versions.python
+ if version not in python_acceptable
+ ]
+ java_not_acceptable = [
+ (container, "java", version)
+ for container in containers
+ for version in container.versions.java
+ if version not in java_acceptable
+ ]
+ if not python_not_acceptable and not java_not_acceptable:
+ return 0
+ if quiet:
+ return 1
+ print("List of not acceptable versions")
+ pprint.pprint(python_not_acceptable)
+ pprint.pprint(java_not_acceptable)
+ return 1
+def main(argv: Optional[List[str]] = None) -> str:
+ """Main entrypoint of the module for verifying versions of CPython and
+ OpenJDK installed in k8s cluster containers.
+ Args:
+ argv: List of command line arguments.
+ """
+ args = parse_argv(argv)
+ kubernetes.config.load_kube_config(args.config_file)
+ api = kubernetes.client.CoreV1Api()
+ api.api_client.configuration.debug = args.debug
+ containers = gather_containers_informations(
+ api, args.field_selector, args.ignore_empty
+ )
+ generate_and_handle_output(
+ containers, args.output_format, args.output_file, args.quiet
+ )
+ code = verify_versions_acceptability(containers, args.acceptable, args.quiet)
+ return code
+if __name__ == "__main__":
+ sys.exit(main())