From 6e88d548362b32a15a094fdf8d83f082107c7962 Mon Sep 17 00:00:00 2001 From: Michal Jagiello Date: Wed, 19 Apr 2023 09:53:38 +0000 Subject: Fix security versions script That script was usused on security versions tests, so I updated it with the latest changes from repo which was really used, created needed files and after we merge it we could use that on security tests. Issue-ID: TEST-394 Signed-off-by: Michal Jagiello Change-Id: I8e5daa7d43e2723bbe3308cf85b1cae2b2f587ad --- .../src/k8s_bin_versions_inspector.py | 745 --------------------- 1 file changed, 745 deletions(-) delete mode 100644 test/security/check_versions/src/k8s_bin_versions_inspector.py (limited to 'test/security/check_versions/src') diff --git a/test/security/check_versions/src/k8s_bin_versions_inspector.py b/test/security/check_versions/src/k8s_bin_versions_inspector.py deleted file mode 100644 index f5ff53714..000000000 --- a/test/security/check_versions/src/k8s_bin_versions_inspector.py +++ /dev/null @@ -1,745 +0,0 @@ -#!/usr/bin/env python3 - -# COPYRIGHT NOTICE STARTS HERE -# -# Copyright 2020 Samsung Electronics Co., Ltd. -# Copyright 2023 Deutsche Telekom AG -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# -# COPYRIGHT NOTICE ENDS HERE - -""" -k8s_bin_versions_inspector is a module for verifying versions of CPython and -OpenJDK binaries installed in the kubernetes cluster containers. -""" - -__title__ = "k8s_bin_versions_inspector" -__summary__ = ( - "Module for verifying versions of CPython and OpenJDK binaries installed" - " in the kubernetes cluster containers." -) -__version__ = "0.1.0" -__author__ = "kkkk.k@samsung.com" -__license__ = "Apache-2.0" -__copyright__ = "Copyright 2020 Samsung Electronics Co., Ltd." - -from typing import Iterable, List, Optional, Pattern, Union - -import argparse -import dataclasses -import itertools -import json -import pathlib -import pprint -import re -import string -import sys -import tabulate -import yaml - -import cerberus -import kubernetes - - -def parse_argv(argv: Optional[List[str]] = None) -> argparse.Namespace: - """Function for parsing command line arguments. - - Args: - argv: Unparsed list of command line arguments. - - Returns: - Namespace with values from parsed arguments. - """ - - epilog = ( - f"Author: {__author__}\n" - f"License: {__license__}\n" - f"Copyright: {__copyright__}\n" - ) - - parser = argparse.ArgumentParser( - formatter_class=argparse.RawTextHelpFormatter, - prog=__title__, - description=__summary__, - epilog=epilog, - add_help=False, - ) - - parser.add_argument("-c", "--config-file", help="Name of the kube-config file.") - - parser.add_argument( - "-s", - "--field-selector", - default="", - help="Kubernetes field selector, to filter out containers objects.", - ) - - parser.add_argument( - "-o", - "--output-file", - type=pathlib.Path, - help="Path to file, where output will be saved.", - ) - - parser.add_argument( - "-f", - "--output-format", - choices=("tabulate", "pprint", "json"), - default="tabulate", - help="Format of the output file (tabulate, pprint, json).", - ) - - parser.add_argument( - "-i", - "--ignore-empty", - action="store_true", - help="Ignore containers without any versions.", - ) - - parser.add_argument( - "-a", - "--acceptable", - type=pathlib.Path, - help="Path to YAML file, with list of acceptable software versions.", - ) - - parser.add_argument( - "-n", - "--namespace", - help="Namespace to use to list pods." - "If empty pods are going to be listed from all namespaces" - ) - - parser.add_argument( - "--check-istio-sidecar", - action="store_true", - help="Add if you want to check istio sidecars also" - ) - - parser.add_argument( - "--istio-sidecar-name", - default="istio-proxy", - help="Name of istio sidecar to filter out" - ) - - parser.add_argument( - "-d", - "--debug", - action="store_true", - help="Enable debugging mode in the k8s API.", - ) - - parser.add_argument( - "-q", - "--quiet", - action="store_true", - help="Suppress printing text on standard output.", - ) - - parser.add_argument( - "-V", - "--version", - action="version", - version=f"{__title__} {__version__}", - help="Display version information and exit.", - ) - - parser.add_argument( - "-h", "--help", action="help", help="Display this help text and exit." - ) - - args = parser.parse_args(argv) - - return args - - -@dataclasses.dataclass -class ContainerExtra: - "Data class, to storage extra informations about container." - - running: bool - image: str - identifier: str - - -@dataclasses.dataclass -class ContainerVersions: - "Data class, to storage software versions from container." - - python: list - java: list - - -@dataclasses.dataclass -class ContainerInfo: - "Data class, to storage multiple informations about container." - - namespace: str - pod: str - container: str - extra: ContainerExtra - versions: ContainerVersions = None - - -def is_container_running( - status: kubernetes.client.models.v1_container_status.V1ContainerStatus, -) -> bool: - """Function to determine if k8s cluster container is in running state. - - Args: - status: Single item from container_statuses list, that represents container status. - - Returns: - If container is in running state. - """ - - if status.state.terminated: - return False - - if status.state.waiting: - return False - - if not status.state.running: - return False - - return True - - -def list_all_containers( - api: kubernetes.client.api.core_v1_api.CoreV1Api, - field_selector: str, - namespace: Union[None, str], - check_istio_sidecars: bool, - istio_sidecar_name: str -) -> Iterable[ContainerInfo]: - """Get list of all containers names. - - Args: - api: Client of the k8s cluster API. - field_selector: Kubernetes field selector, to filter out containers objects. - namespace: Namespace to limit reading pods from - check_istio_sidecars: Flag to enable/disable istio sidecars check. - Default to False - istio_sidecar_name: If checking istio sidecars is disabled the name to filter - containers out - - Yields: - Objects for all containers in k8s cluster. - """ - - if namespace: - pods = api.list_namespaced_pod(namespace, field_selector=field_selector).items - else: - pods = api.list_pod_for_all_namespaces(field_selector=field_selector).items - - containers_statuses = ( - (pod.metadata.namespace, pod.metadata.name, pod.status.container_statuses) - for pod in pods - if pod.status.container_statuses - ) - - containers_status = ( - itertools.product([namespace], [pod], statuses) - for namespace, pod, statuses in containers_statuses - ) - - containers_chained = itertools.chain.from_iterable(containers_status) - - containers_fields = ( - ( - namespace, - pod, - status.name, - is_container_running(status), - status.image, - status.container_id, - ) - for namespace, pod, status in containers_chained - ) - - container_items = ( - ContainerInfo( - namespace, pod, container, ContainerExtra(running, image, identifier) - ) - for namespace, pod, container, running, image, identifier in containers_fields - ) - - if not check_istio_sidecars: - container_items = filter(lambda container: container.container != istio_sidecar_name, container_items) - - yield from container_items - - -def sync_post_namespaced_pod_exec( - api: kubernetes.client.api.core_v1_api.CoreV1Api, - container: ContainerInfo, - command: Union[List[str], str], -) -> dict: - """Function to execute command on selected container. - - Args: - api: Client of the k8s cluster API. - container: Object, that represents container in k8s cluster. - command: Command to execute as a list of arguments or single string. - - Returns: - Dictionary that store informations about command execution. - * stdout - Standard output captured from execution. - * stderr - Standard error captured from execution. - * error - Error object that was received from kubernetes API. - * code - Exit code returned by executed process - or -1 if container is not running - or -2 if other failure occurred. - """ - - try: - client = kubernetes.stream.stream( - api.connect_post_namespaced_pod_exec, - namespace=container.namespace, - name=container.pod, - container=container.container, - command=command, - stderr=True, - stdin=False, - stdout=True, - tty=False, - _request_timeout=1.0, - _preload_content=False, - ) - except ( - kubernetes.client.rest.ApiException, - kubernetes.client.exceptions.ApiException, - ): - - if container.extra.running: - raise - - return { - "stdout": "", - "stderr": "", - "error": {}, - "code": -1, - } - - client.run_forever(timeout=5) - - stdout = client.read_stdout() - stderr = client.read_stderr() - error = yaml.safe_load( - client.read_channel(kubernetes.stream.ws_client.ERROR_CHANNEL) - ) - - # TODO: Is there really no better way, to check - # execution exit code in python k8s API client? - code = -2 - try: - code = ( - 0 - if error["status"] == "Success" - else -2 - if error["reason"] != "NonZeroExitCode" - else int(error["details"]["causes"][0]["message"]) - ) - except: - pass - - return { - "stdout": stdout, - "stderr": stderr, - "error": error, - "code": code, - } - - -def generate_python_binaries() -> List[str]: - """Function to generate list of names and paths for CPython binaries. - - Returns: - List of names and paths, to CPython binaries. - """ - - dirnames = ["", "/usr/bin/", "/usr/local/bin/"] - - majors_minors = [ - f"{major}.{minor}" for major, minor in itertools.product("23", string.digits) - ] - - suffixes = ["", "2", "3"] + majors_minors - - basenames = [f"python{suffix}" for suffix in suffixes] - - binaries = [f"{dir}{base}" for dir, base in itertools.product(dirnames, basenames)] - - return binaries - - -def generate_java_binaries() -> List[str]: - """Function to generate list of names and paths for OpenJDK binaries. - - Returns: - List of names and paths, to OpenJDK binaries. - """ - - binaries = [ - "java", - "/usr/bin/java", - "/usr/local/bin/java", - "/etc/alternatives/java", - "/usr/java/openjdk-14/bin/java", - ] - - return binaries - - -def determine_versions_abstraction( - api: kubernetes.client.api.core_v1_api.CoreV1Api, - container: ContainerInfo, - binaries: List[str], - extractor: Pattern, -) -> List[str]: - """Function to determine list of software versions, that are installed in - given container. - - Args: - api: Client of the k8s cluster API. - container: Object, that represents container in k8s cluster. - binaries: List of names and paths to the abstract software binaries. - extractor: Pattern to extract the version string from the output of the binary execution. - - Returns: - List of installed software versions. - """ - - commands = ([binary, "--version"] for binary in binaries) - commands_old = ([binary, "-version"] for binary in binaries) - commands_all = itertools.chain(commands, commands_old) - - # TODO: This list comprehension should be parallelized - results = ( - sync_post_namespaced_pod_exec(api, container, command) - for command in commands_all - ) - - successes = ( - f"{result['stdout']}{result['stderr']}" - for result in results - if result["code"] == 0 - ) - - extractions = (extractor.search(success) for success in successes) - - versions = sorted( - set(extraction.group(1) for extraction in extractions if extraction) - ) - - return versions - - -def determine_versions_of_python( - api: kubernetes.client.api.core_v1_api.CoreV1Api, container: ContainerInfo -) -> List[str]: - """Function to determine list of CPython versions, - that are installed in given container. - - Args: - api: Client of the k8s cluster API. - container: Object, that represents container in k8s cluster. - - Returns: - List of installed CPython versions. - """ - - extractor = re.compile("Python ([0-9.]+)") - - binaries = generate_python_binaries() - - versions = determine_versions_abstraction(api, container, binaries, extractor) - - return versions - - -def determine_versions_of_java( - api: kubernetes.client.api.core_v1_api.CoreV1Api, container: ContainerInfo -) -> List[str]: - """Function to determine list of OpenJDK versions, - that are installed in given container. - - Args: - api: Client of the k8s cluster API. - container: Object, that represents container in k8s cluster. - - Returns: - List of installed OpenJDK versions. - """ - - extractor = re.compile('openjdk [version" ]*([0-9._]+)') - - binaries = generate_java_binaries() - - versions = determine_versions_abstraction(api, container, binaries, extractor) - - return versions - - -def gather_containers_informations( - api: kubernetes.client.api.core_v1_api.CoreV1Api, - field_selector: str, - ignore_empty: bool, - namespace: Union[None, str], - check_istio_sidecars: bool, - istio_sidecar_name: str -) -> List[ContainerInfo]: - """Get list of all containers names. - - Args: - api: Client of the k8s cluster API. - field_selector: Kubernetes field selector, to filter out containers objects. - ignore_empty: Determines, if containers with empty versions should be ignored. - namespace: Namespace to limit reading pods from - check_istio_sidecars: Flag to enable/disable istio sidecars check. - Default to False - istio_sidecar_name: If checking istio sidecars is disabled the name to filter - containers out - - Returns: - List of initialized objects for containers in k8s cluster. - """ - - containers = list(list_all_containers(api, field_selector, namespace, - check_istio_sidecars, istio_sidecar_name)) - - # TODO: This loop should be parallelized - for container in containers: - python_versions = determine_versions_of_python(api, container) - java_versions = determine_versions_of_java(api, container) - container.versions = ContainerVersions(python_versions, java_versions) - - if ignore_empty: - containers = [c for c in containers if c.versions.python or c.versions.java] - - return containers - - -def generate_output_tabulate(containers: Iterable[ContainerInfo]) -> str: - """Function for generate output string in tabulate format. - - Args: - containers: List of items, that represents containers in k8s cluster. - - Returns: - Output string formatted by tabulate module. - """ - - headers = [ - "Namespace", - "Pod", - "Container", - "Running", - "CPython", - "OpenJDK", - ] - - rows = [ - [ - container.namespace, - container.pod, - container.container, - container.extra.running, - " ".join(container.versions.python), - " ".join(container.versions.java), - ] - for container in containers - ] - - output = tabulate.tabulate(rows, headers=headers) - - return output - - -def generate_output_pprint(containers: Iterable[ContainerInfo]) -> str: - """Function for generate output string in pprint format. - - Args: - containers: List of items, that represents containers in k8s cluster. - - Returns: - Output string formatted by pprint module. - """ - - output = pprint.pformat(containers) - - return output - - -def generate_output_json(containers: Iterable[ContainerInfo]) -> str: - """Function for generate output string in JSON format. - - Args: - containers: List of items, that represents containers in k8s cluster. - - Returns: - Output string formatted by json module. - """ - - data = [ - { - "namespace": container.namespace, - "pod": container.pod, - "container": container.container, - "extra": { - "running": container.extra.running, - "image": container.extra.image, - "identifier": container.extra.identifier, - }, - "versions": { - "python": container.versions.python, - "java": container.versions.java, - }, - } - for container in containers - ] - - output = json.dumps(data, indent=4) - - return output - - -def generate_and_handle_output( - containers: List[ContainerInfo], - output_format: str, - output_file: pathlib.Path, - quiet: bool, -) -> None: - """Generate and handle the output of the containers software versions. - - Args: - containers: List of items, that represents containers in k8s cluster. - output_format: String that will determine output format (tabulate, pprint, json). - output_file: Path to file, where output will be save. - quiet: Determines if output should be printed, to stdout. - """ - - output_generators = { - "tabulate": generate_output_tabulate, - "pprint": generate_output_pprint, - "json": generate_output_json, - } - - output = output_generators[output_format](containers) - - if output_file: - output_file.write_text(output) - - if not quiet: - print(output) - - -def verify_versions_acceptability( - containers: List[ContainerInfo], acceptable: pathlib.Path, quiet: bool -) -> bool: - """Function for verification of software versions installed in containers. - - Args: - containers: List of items, that represents containers in k8s cluster. - acceptable: Path to the YAML file, with the software verification parameters. - quiet: Determines if output should be printed, to stdout. - - Returns: - 0 if the verification was successful or 1 otherwise. - """ - - if not acceptable: - return 0 - - if not acceptable.is_file(): - raise FileNotFoundError( - "File with configuration for acceptable does not exists!" - ) - - schema = { - "python": {"type": "list", "schema": {"type": "string"}}, - "java": {"type": "list", "schema": {"type": "string"}}, - } - - validator = cerberus.Validator(schema) - - with open(acceptable) as stream: - data = yaml.safe_load(stream) - - if not validator.validate(data): - raise cerberus.SchemaError( - "Schema of file with configuration for acceptable is not valid." - ) - - python_acceptable = data.get("python", []) - java_acceptable = data.get("java", []) - - python_not_acceptable = [ - (container, "python", version) - for container in containers - for version in container.versions.python - if version not in python_acceptable - ] - - java_not_acceptable = [ - (container, "java", version) - for container in containers - for version in container.versions.java - if version not in java_acceptable - ] - - if not python_not_acceptable and not java_not_acceptable: - return 0 - - if quiet: - return 1 - - print("List of not acceptable versions") - pprint.pprint(python_not_acceptable) - pprint.pprint(java_not_acceptable) - - return 1 - - -def main(argv: Optional[List[str]] = None) -> str: - """Main entrypoint of the module for verifying versions of CPython and - OpenJDK installed in k8s cluster containers. - - Args: - argv: List of command line arguments. - """ - - args = parse_argv(argv) - - kubernetes.config.load_kube_config(args.config_file) - - api = kubernetes.client.CoreV1Api() - api.api_client.configuration.debug = args.debug - - containers = gather_containers_informations( - api, args.field_selector, args.ignore_empty, args.namespace, - args.check_istio_sidecar, args.istio_sidecar_name - ) - - generate_and_handle_output( - containers, args.output_format, args.output_file, args.quiet - ) - - code = verify_versions_acceptability(containers, args.acceptable, args.quiet) - - return code - - -if __name__ == "__main__": - sys.exit(main()) -- cgit 1.2.3-korg