aboutsummaryrefslogtreecommitdiffstats
path: root/test/security/check_versions/src
diff options
context:
space:
mode:
authorPawel Wieczorek <p.wieczorek2@samsung.com>2020-07-16 16:15:06 +0200
committerPawel Wieczorek <p.wieczorek2@samsung.com>2020-07-28 15:06:43 +0200
commit30e199a70b32a6256c2a148eec870800ef1fbefc (patch)
tree148c814d55b920f04a1fa0ce5f4a68896f51ef6f /test/security/check_versions/src
parent3301d5325c59d3e721fd2ec341318c5a0ede0b0c (diff)
Import upstream component version inspection tool
This patch adds utility to check versions of binaries available in Docker containers run on Kubernetes cluster. It has been contributed by: kkkk-k <kkkk.k@samsung.com> Several minor changes were made to comply with ONAP CI linter rules. Issue-ID: INT-1571 Change-Id: Id0e4b557212dec1bf8d2bac580968d69e2cf5595 Signed-off-by: Pawel Wieczorek <p.wieczorek2@samsung.com>
Diffstat (limited to 'test/security/check_versions/src')
-rw-r--r--test/security/check_versions/src/k8s_bin_versions_inspector.py691
1 files changed, 691 insertions, 0 deletions
diff --git a/test/security/check_versions/src/k8s_bin_versions_inspector.py b/test/security/check_versions/src/k8s_bin_versions_inspector.py
new file mode 100644
index 000000000..bda7322f5
--- /dev/null
+++ b/test/security/check_versions/src/k8s_bin_versions_inspector.py
@@ -0,0 +1,691 @@
+#!/usr/bin/env python3
+
+# COPYRIGHT NOTICE STARTS HERE
+#
+# Copyright 2020 Samsung Electronics Co., Ltd.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+# COPYRIGHT NOTICE ENDS HERE
+
+"""
+k8s_bin_versions_inspector is a module for verifying versions of CPython and
+OpenJDK binaries installed in the kubernetes cluster containers.
+"""
+
+__title__ = "k8s_bin_versions_inspector"
+__summary__ = (
+ "Module for verifying versions of CPython and OpenJDK binaries installed"
+ " in the kubernetes cluster containers."
+)
+__version__ = "0.1.0"
+__author__ = "kkkk.k@samsung.com"
+__license__ = "Apache-2.0"
+__copyright__ = "Copyright 2020 Samsung Electronics Co., Ltd."
+
+from typing import Iterable, List, Optional, Pattern, Union
+
+import argparse
+import dataclasses
+import itertools
+import json
+import pathlib
+import pprint
+import re
+import string
+import sys
+import tabulate
+import yaml
+
+import cerberus
+import kubernetes
+
+
+def parse_argv(argv: Optional[List[str]] = None) -> argparse.Namespace:
+ """Function for parsing command line arguments.
+
+ Args:
+ argv: Unparsed list of command line arguments.
+
+ Returns:
+ Namespace with values from parsed arguments.
+ """
+
+ epilog = (
+ f"Author: {__author__}\n"
+ f"License: {__license__}\n"
+ f"Copyright: {__copyright__}\n"
+ )
+
+ parser = argparse.ArgumentParser(
+ formatter_class=argparse.RawTextHelpFormatter,
+ prog=__title__,
+ description=__summary__,
+ epilog=epilog,
+ add_help=False,
+ )
+
+ parser.add_argument("-c", "--config-file", help="Name of the kube-config file.")
+
+ parser.add_argument(
+ "-s",
+ "--field-selector",
+ default="",
+ help="Kubernetes field selector, to filter out containers objects.",
+ )
+
+ parser.add_argument(
+ "-o",
+ "--output-file",
+ type=pathlib.Path,
+ help="Path to file, where output will be saved.",
+ )
+
+ parser.add_argument(
+ "-f",
+ "--output-format",
+ choices=("tabulate", "pprint", "json"),
+ default="tabulate",
+ help="Format of the output file (tabulate, pprint, json).",
+ )
+
+ parser.add_argument(
+ "-i",
+ "--ignore-empty",
+ action="store_true",
+ help="Ignore containers without any versions.",
+ )
+
+ parser.add_argument(
+ "-a",
+ "--acceptable",
+ type=pathlib.Path,
+ help="Path to YAML file, with list of acceptable software versions.",
+ )
+
+ parser.add_argument(
+ "-d",
+ "--debug",
+ action="store_true",
+ help="Enable debugging mode in the k8s API.",
+ )
+
+ parser.add_argument(
+ "-q",
+ "--quiet",
+ action="store_true",
+ help="Suppress printing text on standard output.",
+ )
+
+ parser.add_argument(
+ "-V",
+ "--version",
+ action="version",
+ version=f"{__title__} {__version__}",
+ help="Display version information and exit.",
+ )
+
+ parser.add_argument(
+ "-h", "--help", action="help", help="Display this help text and exit."
+ )
+
+ args = parser.parse_args(argv)
+
+ return args
+
+
+@dataclasses.dataclass
+class ContainerExtra:
+ "Data class, to storage extra informations about container."
+
+ running: bool
+ image: str
+ identifier: str
+
+
+@dataclasses.dataclass
+class ContainerVersions:
+ "Data class, to storage software versions from container."
+
+ python: list
+ java: list
+
+
+@dataclasses.dataclass
+class ContainerInfo:
+ "Data class, to storage multiple informations about container."
+
+ namespace: str
+ pod: str
+ container: str
+ extra: ContainerExtra
+ versions: ContainerVersions = None
+
+
+def is_container_running(
+ status: kubernetes.client.models.v1_container_status.V1ContainerStatus,
+) -> bool:
+ """Function to determine if k8s cluster container is in running state.
+
+ Args:
+ status: Single item from container_statuses list, that represents container status.
+
+ Returns:
+ If container is in running state.
+ """
+
+ if status.state.terminated:
+ return False
+
+ if status.state.waiting:
+ return False
+
+ if not status.state.running:
+ return False
+
+ return True
+
+
+def list_all_containers(
+ api: kubernetes.client.api.core_v1_api.CoreV1Api, field_selector: str,
+) -> Iterable[ContainerInfo]:
+ """Get list of all containers names.
+
+ Args:
+ api: Client of the k8s cluster API.
+ field_selector: Kubernetes field selector, to filter out containers objects.
+
+ Yields:
+ Objects for all containers in k8s cluster.
+ """
+
+ pods = api.list_pod_for_all_namespaces(field_selector=field_selector).items
+
+ containers_statuses = (
+ (pod.metadata.namespace, pod.metadata.name, pod.status.container_statuses)
+ for pod in pods
+ if pod.status.container_statuses
+ )
+
+ containers_status = (
+ itertools.product([namespace], [pod], statuses)
+ for namespace, pod, statuses in containers_statuses
+ )
+
+ containers_chained = itertools.chain.from_iterable(containers_status)
+
+ containers_fields = (
+ (
+ namespace,
+ pod,
+ status.name,
+ is_container_running(status),
+ status.image,
+ status.container_id,
+ )
+ for namespace, pod, status in containers_chained
+ )
+
+ container_items = (
+ ContainerInfo(
+ namespace, pod, container, ContainerExtra(running, image, identifier)
+ )
+ for namespace, pod, container, running, image, identifier in containers_fields
+ )
+
+ yield from container_items
+
+
+def sync_post_namespaced_pod_exec(
+ api: kubernetes.client.api.core_v1_api.CoreV1Api,
+ container: ContainerInfo,
+ command: Union[List[str], str],
+) -> dict:
+ """Function to execute command on selected container.
+
+ Args:
+ api: Client of the k8s cluster API.
+ container: Object, that represents container in k8s cluster.
+ command: Command to execute as a list of arguments or single string.
+
+ Returns:
+ Dictionary that store informations about command execution.
+ * stdout - Standard output captured from execution.
+ * stderr - Standard error captured from execution.
+ * error - Error object that was received from kubernetes API.
+ * code - Exit code returned by executed process
+ or -1 if container is not running
+ or -2 if other failure occurred.
+ """
+
+ try:
+ client = kubernetes.stream.stream(
+ api.connect_post_namespaced_pod_exec,
+ namespace=container.namespace,
+ name=container.pod,
+ container=container.container,
+ command=command,
+ stderr=True,
+ stdin=False,
+ stdout=True,
+ tty=False,
+ _request_timeout=1.0,
+ _preload_content=False,
+ )
+
+ except kubernetes.client.rest.ApiException:
+
+ if container.extra.running:
+ raise
+
+ return {
+ "stdout": "",
+ "stderr": "",
+ "error": {},
+ "code": -1,
+ }
+
+ client.run_forever(timeout=5)
+
+ stdout = client.read_stdout()
+ stderr = client.read_stderr()
+ error = yaml.safe_load(
+ client.read_channel(kubernetes.stream.ws_client.ERROR_CHANNEL)
+ )
+
+ # TODO: Is there really no better way, to check
+ # execution exit code in python k8s API client?
+ code = (
+ 0
+ if error["status"] == "Success"
+ else -2
+ if error["reason"] != "NonZeroExitCode"
+ else int(error["details"]["causes"][0]["message"])
+ )
+
+ return {
+ "stdout": stdout,
+ "stderr": stderr,
+ "error": error,
+ "code": code,
+ }
+
+
+def generate_python_binaries() -> List[str]:
+ """Function to generate list of names and paths for CPython binaries.
+
+ Returns:
+ List of names and paths, to CPython binaries.
+ """
+
+ dirnames = ["", "/usr/bin/", "/usr/local/bin/"]
+
+ majors_minors = [
+ f"{major}.{minor}" for major, minor in itertools.product("23", string.digits)
+ ]
+
+ suffixes = ["", "2", "3"] + majors_minors
+
+ basenames = [f"python{suffix}" for suffix in suffixes]
+
+ binaries = [f"{dir}{base}" for dir, base in itertools.product(dirnames, basenames)]
+
+ return binaries
+
+
+def generate_java_binaries() -> List[str]:
+ """Function to generate list of names and paths for OpenJDK binaries.
+
+ Returns:
+ List of names and paths, to OpenJDK binaries.
+ """
+
+ binaries = [
+ "java",
+ "/usr/bin/java",
+ "/usr/local/bin/java",
+ "/etc/alternatives/java",
+ "/usr/java/openjdk-14/bin/java",
+ ]
+
+ return binaries
+
+
+def determine_versions_abstraction(
+ api: kubernetes.client.api.core_v1_api.CoreV1Api,
+ container: ContainerInfo,
+ binaries: List[str],
+ extractor: Pattern,
+) -> List[str]:
+ """Function to determine list of software versions, that are installed in
+ given container.
+
+ Args:
+ api: Client of the k8s cluster API.
+ container: Object, that represents container in k8s cluster.
+ binaries: List of names and paths to the abstract software binaries.
+ extractor: Pattern to extract the version string from the output of the binary execution.
+
+ Returns:
+ List of installed software versions.
+ """
+
+ commands = ([binary, "--version"] for binary in binaries)
+
+ # TODO: This list comprehension should be parallelized
+ results = (
+ sync_post_namespaced_pod_exec(api, container, command) for command in commands
+ )
+
+ successes = (
+ f"{result['stdout']}{result['stderr']}"
+ for result in results
+ if result["code"] == 0
+ )
+
+ extractions = (extractor.search(success) for success in successes)
+
+ versions = sorted(
+ set(extraction.group(1) for extraction in extractions if extraction)
+ )
+
+ return versions
+
+
+def determine_versions_of_python(
+ api: kubernetes.client.api.core_v1_api.CoreV1Api, container: ContainerInfo
+) -> List[str]:
+ """Function to determine list of CPython versions,
+ that are installed in given container.
+
+ Args:
+ api: Client of the k8s cluster API.
+ container: Object, that represents container in k8s cluster.
+
+ Returns:
+ List of installed CPython versions.
+ """
+
+ extractor = re.compile("Python ([0-9.]+)")
+
+ binaries = generate_python_binaries()
+
+ versions = determine_versions_abstraction(api, container, binaries, extractor)
+
+ return versions
+
+
+def determine_versions_of_java(
+ api: kubernetes.client.api.core_v1_api.CoreV1Api, container: ContainerInfo
+) -> List[str]:
+ """Function to determine list of OpenJDK versions,
+ that are installed in given container.
+
+ Args:
+ api: Client of the k8s cluster API.
+ container: Object, that represents container in k8s cluster.
+
+ Returns:
+ List of installed OpenJDK versions.
+ """
+
+ extractor = re.compile("openjdk ([0-9.]+)")
+
+ binaries = generate_java_binaries()
+
+ versions = determine_versions_abstraction(api, container, binaries, extractor)
+
+ return versions
+
+
+def gather_containers_informations(
+ api: kubernetes.client.api.core_v1_api.CoreV1Api,
+ field_selector: str,
+ ignore_empty: bool,
+) -> List[ContainerInfo]:
+ """Get list of all containers names.
+
+ Args:
+ api: Client of the k8s cluster API.
+ field_selector: Kubernetes field selector, to filter out containers objects.
+ ignore_empty: Determines, if containers with empty versions should be ignored.
+
+ Returns:
+ List of initialized objects for containers in k8s cluster.
+ """
+
+ containers = list(list_all_containers(api, field_selector))
+
+ # TODO: This loop should be parallelized
+ for container in containers:
+ python_versions = determine_versions_of_python(api, container)
+ java_versions = determine_versions_of_java(api, container)
+ container.versions = ContainerVersions(python_versions, java_versions)
+
+ if ignore_empty:
+ containers = [c for c in containers if c.versions.python or c.versions.java]
+
+ return containers
+
+
+def generate_output_tabulate(containers: Iterable[ContainerInfo]) -> str:
+ """Function for generate output string in tabulate format.
+
+ Args:
+ containers: List of items, that represents containers in k8s cluster.
+
+ Returns:
+ Output string formatted by tabulate module.
+ """
+
+ headers = [
+ "Namespace",
+ "Pod",
+ "Container",
+ "Running",
+ "CPython",
+ "OpenJDK",
+ ]
+
+ rows = [
+ [
+ container.namespace,
+ container.pod,
+ container.container,
+ container.extra.running,
+ " ".join(container.versions.python),
+ " ".join(container.versions.java),
+ ]
+ for container in containers
+ ]
+
+ output = tabulate.tabulate(rows, headers=headers)
+
+ return output
+
+
+def generate_output_pprint(containers: Iterable[ContainerInfo]) -> str:
+ """Function for generate output string in pprint format.
+
+ Args:
+ containers: List of items, that represents containers in k8s cluster.
+
+ Returns:
+ Output string formatted by pprint module.
+ """
+
+ output = pprint.pformat(containers)
+
+ return output
+
+
+def generate_output_json(containers: Iterable[ContainerInfo]) -> str:
+ """Function for generate output string in JSON format.
+
+ Args:
+ containers: List of items, that represents containers in k8s cluster.
+
+ Returns:
+ Output string formatted by json module.
+ """
+
+ data = [
+ {
+ "namespace": container.namespace,
+ "pod": container.pod,
+ "container": container.container,
+ "extra": {
+ "running": container.extra.running,
+ "image": container.extra.image,
+ "identifier": container.extra.identifier,
+ },
+ "versions": {
+ "python": container.versions.python,
+ "java": container.versions.java,
+ },
+ }
+ for container in containers
+ ]
+
+ output = json.dumps(data, indent=4)
+
+ return output
+
+
+def generate_and_handle_output(
+ containers: List[ContainerInfo],
+ output_format: str,
+ output_file: pathlib.Path,
+ quiet: bool,
+) -> None:
+ """Generate and handle the output of the containers software versions.
+
+ Args:
+ containers: List of items, that represents containers in k8s cluster.
+ output_format: String that will determine output format (tabulate, pprint, json).
+ output_file: Path to file, where output will be save.
+ quiet: Determines if output should be printed, to stdout.
+ """
+
+ output_generators = {
+ "tabulate": generate_output_tabulate,
+ "pprint": generate_output_pprint,
+ "json": generate_output_json,
+ }
+
+ output = output_generators[output_format](containers)
+
+ if output_file:
+ output_file.write_text(output)
+
+ if not quiet:
+ print(output)
+
+
+def verify_versions_acceptability(
+ containers: List[ContainerInfo], acceptable: pathlib.Path, quiet: bool
+) -> bool:
+ """Function for verification of software versions installed in containers.
+
+ Args:
+ containers: List of items, that represents containers in k8s cluster.
+ acceptable: Path to the YAML file, with the software verification parameters.
+ quiet: Determines if output should be printed, to stdout.
+
+ Returns:
+ 0 if the verification was successful or 1 otherwise.
+ """
+
+ if not acceptable:
+ return 0
+
+ if not acceptable.is_file():
+ raise FileNotFoundError(
+ "File with configuration for acceptable does not exists!"
+ )
+
+ schema = {
+ "python": {"type": "list", "schema": {"type": "string"}},
+ "java": {"type": "list", "schema": {"type": "string"}},
+ }
+
+ validator = cerberus.Validator(schema)
+
+ with open(acceptable) as stream:
+ data = yaml.safe_load(stream)
+
+ if not validator.validate(data):
+ raise cerberus.SchemaError(
+ "Schema of file with configuration for acceptable is not valid."
+ )
+
+ python_acceptable = data.get("python", [])
+ java_acceptable = data.get("java", [])
+
+ python_not_acceptable = [
+ (container, "python", version)
+ for container in containers
+ for version in container.versions.python
+ if version not in python_acceptable
+ ]
+
+ java_not_acceptable = [
+ (container, "java", version)
+ for container in containers
+ for version in container.versions.java
+ if version not in java_acceptable
+ ]
+
+ if not python_not_acceptable and not java_not_acceptable:
+ return 0
+
+ if quiet:
+ return 1
+
+ print("List of not acceptable versions")
+ pprint.pprint(python_not_acceptable)
+ pprint.pprint(java_not_acceptable)
+
+ return 1
+
+
+def main(argv: Optional[List[str]] = None) -> str:
+ """Main entrypoint of the module for verifying versions of CPython and
+ OpenJDK installed in k8s cluster containers.
+
+ Args:
+ argv: List of command line arguments.
+ """
+
+ args = parse_argv(argv)
+
+ kubernetes.config.load_kube_config(args.config_file)
+
+ api = kubernetes.client.CoreV1Api()
+ api.api_client.configuration.debug = args.debug
+
+ containers = gather_containers_informations(
+ api, args.field_selector, args.ignore_empty
+ )
+
+ generate_and_handle_output(
+ containers, args.output_format, args.output_file, args.quiet
+ )
+
+ code = verify_versions_acceptability(containers, args.acceptable, args.quiet)
+
+ return code
+
+
+if __name__ == "__main__":
+ sys.exit(main())