aboutsummaryrefslogtreecommitdiffstats
path: root/test/security/check_versions/src
diff options
context:
space:
mode:
authorMichal Jagiello <michal.jagiello@t-mobile.pl>2023-04-19 09:53:38 +0000
committerMichal Jagiello <michal.jagiello@t-mobile.pl>2023-04-19 09:58:02 +0000
commit6e88d548362b32a15a094fdf8d83f082107c7962 (patch)
tree8603bba306091d8a323ac4ed32920bc075998af5 /test/security/check_versions/src
parentc57b24365c08afe394e52808d55e9b70ac878205 (diff)
Fix security versions script
That script was usused on security versions tests, so I updated it with the latest changes from repo which was really used, created needed files and after we merge it we could use that on security tests. Issue-ID: TEST-394 Signed-off-by: Michal Jagiello <michal.jagiello@t-mobile.pl> Change-Id: I8e5daa7d43e2723bbe3308cf85b1cae2b2f587ad
Diffstat (limited to 'test/security/check_versions/src')
-rw-r--r--test/security/check_versions/src/k8s_bin_versions_inspector.py745
1 files changed, 0 insertions, 745 deletions
diff --git a/test/security/check_versions/src/k8s_bin_versions_inspector.py b/test/security/check_versions/src/k8s_bin_versions_inspector.py
deleted file mode 100644
index f5ff53714..000000000
--- a/test/security/check_versions/src/k8s_bin_versions_inspector.py
+++ /dev/null
@@ -1,745 +0,0 @@
-#!/usr/bin/env python3
-
-# COPYRIGHT NOTICE STARTS HERE
-#
-# Copyright 2020 Samsung Electronics Co., Ltd.
-# Copyright 2023 Deutsche Telekom AG
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-# COPYRIGHT NOTICE ENDS HERE
-
-"""
-k8s_bin_versions_inspector is a module for verifying versions of CPython and
-OpenJDK binaries installed in the kubernetes cluster containers.
-"""
-
-__title__ = "k8s_bin_versions_inspector"
-__summary__ = (
- "Module for verifying versions of CPython and OpenJDK binaries installed"
- " in the kubernetes cluster containers."
-)
-__version__ = "0.1.0"
-__author__ = "kkkk.k@samsung.com"
-__license__ = "Apache-2.0"
-__copyright__ = "Copyright 2020 Samsung Electronics Co., Ltd."
-
-from typing import Iterable, List, Optional, Pattern, Union
-
-import argparse
-import dataclasses
-import itertools
-import json
-import pathlib
-import pprint
-import re
-import string
-import sys
-import tabulate
-import yaml
-
-import cerberus
-import kubernetes
-
-
-def parse_argv(argv: Optional[List[str]] = None) -> argparse.Namespace:
- """Function for parsing command line arguments.
-
- Args:
- argv: Unparsed list of command line arguments.
-
- Returns:
- Namespace with values from parsed arguments.
- """
-
- epilog = (
- f"Author: {__author__}\n"
- f"License: {__license__}\n"
- f"Copyright: {__copyright__}\n"
- )
-
- parser = argparse.ArgumentParser(
- formatter_class=argparse.RawTextHelpFormatter,
- prog=__title__,
- description=__summary__,
- epilog=epilog,
- add_help=False,
- )
-
- parser.add_argument("-c", "--config-file", help="Name of the kube-config file.")
-
- parser.add_argument(
- "-s",
- "--field-selector",
- default="",
- help="Kubernetes field selector, to filter out containers objects.",
- )
-
- parser.add_argument(
- "-o",
- "--output-file",
- type=pathlib.Path,
- help="Path to file, where output will be saved.",
- )
-
- parser.add_argument(
- "-f",
- "--output-format",
- choices=("tabulate", "pprint", "json"),
- default="tabulate",
- help="Format of the output file (tabulate, pprint, json).",
- )
-
- parser.add_argument(
- "-i",
- "--ignore-empty",
- action="store_true",
- help="Ignore containers without any versions.",
- )
-
- parser.add_argument(
- "-a",
- "--acceptable",
- type=pathlib.Path,
- help="Path to YAML file, with list of acceptable software versions.",
- )
-
- parser.add_argument(
- "-n",
- "--namespace",
- help="Namespace to use to list pods."
- "If empty pods are going to be listed from all namespaces"
- )
-
- parser.add_argument(
- "--check-istio-sidecar",
- action="store_true",
- help="Add if you want to check istio sidecars also"
- )
-
- parser.add_argument(
- "--istio-sidecar-name",
- default="istio-proxy",
- help="Name of istio sidecar to filter out"
- )
-
- parser.add_argument(
- "-d",
- "--debug",
- action="store_true",
- help="Enable debugging mode in the k8s API.",
- )
-
- parser.add_argument(
- "-q",
- "--quiet",
- action="store_true",
- help="Suppress printing text on standard output.",
- )
-
- parser.add_argument(
- "-V",
- "--version",
- action="version",
- version=f"{__title__} {__version__}",
- help="Display version information and exit.",
- )
-
- parser.add_argument(
- "-h", "--help", action="help", help="Display this help text and exit."
- )
-
- args = parser.parse_args(argv)
-
- return args
-
-
-@dataclasses.dataclass
-class ContainerExtra:
- "Data class, to storage extra informations about container."
-
- running: bool
- image: str
- identifier: str
-
-
-@dataclasses.dataclass
-class ContainerVersions:
- "Data class, to storage software versions from container."
-
- python: list
- java: list
-
-
-@dataclasses.dataclass
-class ContainerInfo:
- "Data class, to storage multiple informations about container."
-
- namespace: str
- pod: str
- container: str
- extra: ContainerExtra
- versions: ContainerVersions = None
-
-
-def is_container_running(
- status: kubernetes.client.models.v1_container_status.V1ContainerStatus,
-) -> bool:
- """Function to determine if k8s cluster container is in running state.
-
- Args:
- status: Single item from container_statuses list, that represents container status.
-
- Returns:
- If container is in running state.
- """
-
- if status.state.terminated:
- return False
-
- if status.state.waiting:
- return False
-
- if not status.state.running:
- return False
-
- return True
-
-
-def list_all_containers(
- api: kubernetes.client.api.core_v1_api.CoreV1Api,
- field_selector: str,
- namespace: Union[None, str],
- check_istio_sidecars: bool,
- istio_sidecar_name: str
-) -> Iterable[ContainerInfo]:
- """Get list of all containers names.
-
- Args:
- api: Client of the k8s cluster API.
- field_selector: Kubernetes field selector, to filter out containers objects.
- namespace: Namespace to limit reading pods from
- check_istio_sidecars: Flag to enable/disable istio sidecars check.
- Default to False
- istio_sidecar_name: If checking istio sidecars is disabled the name to filter
- containers out
-
- Yields:
- Objects for all containers in k8s cluster.
- """
-
- if namespace:
- pods = api.list_namespaced_pod(namespace, field_selector=field_selector).items
- else:
- pods = api.list_pod_for_all_namespaces(field_selector=field_selector).items
-
- containers_statuses = (
- (pod.metadata.namespace, pod.metadata.name, pod.status.container_statuses)
- for pod in pods
- if pod.status.container_statuses
- )
-
- containers_status = (
- itertools.product([namespace], [pod], statuses)
- for namespace, pod, statuses in containers_statuses
- )
-
- containers_chained = itertools.chain.from_iterable(containers_status)
-
- containers_fields = (
- (
- namespace,
- pod,
- status.name,
- is_container_running(status),
- status.image,
- status.container_id,
- )
- for namespace, pod, status in containers_chained
- )
-
- container_items = (
- ContainerInfo(
- namespace, pod, container, ContainerExtra(running, image, identifier)
- )
- for namespace, pod, container, running, image, identifier in containers_fields
- )
-
- if not check_istio_sidecars:
- container_items = filter(lambda container: container.container != istio_sidecar_name, container_items)
-
- yield from container_items
-
-
-def sync_post_namespaced_pod_exec(
- api: kubernetes.client.api.core_v1_api.CoreV1Api,
- container: ContainerInfo,
- command: Union[List[str], str],
-) -> dict:
- """Function to execute command on selected container.
-
- Args:
- api: Client of the k8s cluster API.
- container: Object, that represents container in k8s cluster.
- command: Command to execute as a list of arguments or single string.
-
- Returns:
- Dictionary that store informations about command execution.
- * stdout - Standard output captured from execution.
- * stderr - Standard error captured from execution.
- * error - Error object that was received from kubernetes API.
- * code - Exit code returned by executed process
- or -1 if container is not running
- or -2 if other failure occurred.
- """
-
- try:
- client = kubernetes.stream.stream(
- api.connect_post_namespaced_pod_exec,
- namespace=container.namespace,
- name=container.pod,
- container=container.container,
- command=command,
- stderr=True,
- stdin=False,
- stdout=True,
- tty=False,
- _request_timeout=1.0,
- _preload_content=False,
- )
- except (
- kubernetes.client.rest.ApiException,
- kubernetes.client.exceptions.ApiException,
- ):
-
- if container.extra.running:
- raise
-
- return {
- "stdout": "",
- "stderr": "",
- "error": {},
- "code": -1,
- }
-
- client.run_forever(timeout=5)
-
- stdout = client.read_stdout()
- stderr = client.read_stderr()
- error = yaml.safe_load(
- client.read_channel(kubernetes.stream.ws_client.ERROR_CHANNEL)
- )
-
- # TODO: Is there really no better way, to check
- # execution exit code in python k8s API client?
- code = -2
- try:
- code = (
- 0
- if error["status"] == "Success"
- else -2
- if error["reason"] != "NonZeroExitCode"
- else int(error["details"]["causes"][0]["message"])
- )
- except:
- pass
-
- return {
- "stdout": stdout,
- "stderr": stderr,
- "error": error,
- "code": code,
- }
-
-
-def generate_python_binaries() -> List[str]:
- """Function to generate list of names and paths for CPython binaries.
-
- Returns:
- List of names and paths, to CPython binaries.
- """
-
- dirnames = ["", "/usr/bin/", "/usr/local/bin/"]
-
- majors_minors = [
- f"{major}.{minor}" for major, minor in itertools.product("23", string.digits)
- ]
-
- suffixes = ["", "2", "3"] + majors_minors
-
- basenames = [f"python{suffix}" for suffix in suffixes]
-
- binaries = [f"{dir}{base}" for dir, base in itertools.product(dirnames, basenames)]
-
- return binaries
-
-
-def generate_java_binaries() -> List[str]:
- """Function to generate list of names and paths for OpenJDK binaries.
-
- Returns:
- List of names and paths, to OpenJDK binaries.
- """
-
- binaries = [
- "java",
- "/usr/bin/java",
- "/usr/local/bin/java",
- "/etc/alternatives/java",
- "/usr/java/openjdk-14/bin/java",
- ]
-
- return binaries
-
-
-def determine_versions_abstraction(
- api: kubernetes.client.api.core_v1_api.CoreV1Api,
- container: ContainerInfo,
- binaries: List[str],
- extractor: Pattern,
-) -> List[str]:
- """Function to determine list of software versions, that are installed in
- given container.
-
- Args:
- api: Client of the k8s cluster API.
- container: Object, that represents container in k8s cluster.
- binaries: List of names and paths to the abstract software binaries.
- extractor: Pattern to extract the version string from the output of the binary execution.
-
- Returns:
- List of installed software versions.
- """
-
- commands = ([binary, "--version"] for binary in binaries)
- commands_old = ([binary, "-version"] for binary in binaries)
- commands_all = itertools.chain(commands, commands_old)
-
- # TODO: This list comprehension should be parallelized
- results = (
- sync_post_namespaced_pod_exec(api, container, command)
- for command in commands_all
- )
-
- successes = (
- f"{result['stdout']}{result['stderr']}"
- for result in results
- if result["code"] == 0
- )
-
- extractions = (extractor.search(success) for success in successes)
-
- versions = sorted(
- set(extraction.group(1) for extraction in extractions if extraction)
- )
-
- return versions
-
-
-def determine_versions_of_python(
- api: kubernetes.client.api.core_v1_api.CoreV1Api, container: ContainerInfo
-) -> List[str]:
- """Function to determine list of CPython versions,
- that are installed in given container.
-
- Args:
- api: Client of the k8s cluster API.
- container: Object, that represents container in k8s cluster.
-
- Returns:
- List of installed CPython versions.
- """
-
- extractor = re.compile("Python ([0-9.]+)")
-
- binaries = generate_python_binaries()
-
- versions = determine_versions_abstraction(api, container, binaries, extractor)
-
- return versions
-
-
-def determine_versions_of_java(
- api: kubernetes.client.api.core_v1_api.CoreV1Api, container: ContainerInfo
-) -> List[str]:
- """Function to determine list of OpenJDK versions,
- that are installed in given container.
-
- Args:
- api: Client of the k8s cluster API.
- container: Object, that represents container in k8s cluster.
-
- Returns:
- List of installed OpenJDK versions.
- """
-
- extractor = re.compile('openjdk [version" ]*([0-9._]+)')
-
- binaries = generate_java_binaries()
-
- versions = determine_versions_abstraction(api, container, binaries, extractor)
-
- return versions
-
-
-def gather_containers_informations(
- api: kubernetes.client.api.core_v1_api.CoreV1Api,
- field_selector: str,
- ignore_empty: bool,
- namespace: Union[None, str],
- check_istio_sidecars: bool,
- istio_sidecar_name: str
-) -> List[ContainerInfo]:
- """Get list of all containers names.
-
- Args:
- api: Client of the k8s cluster API.
- field_selector: Kubernetes field selector, to filter out containers objects.
- ignore_empty: Determines, if containers with empty versions should be ignored.
- namespace: Namespace to limit reading pods from
- check_istio_sidecars: Flag to enable/disable istio sidecars check.
- Default to False
- istio_sidecar_name: If checking istio sidecars is disabled the name to filter
- containers out
-
- Returns:
- List of initialized objects for containers in k8s cluster.
- """
-
- containers = list(list_all_containers(api, field_selector, namespace,
- check_istio_sidecars, istio_sidecar_name))
-
- # TODO: This loop should be parallelized
- for container in containers:
- python_versions = determine_versions_of_python(api, container)
- java_versions = determine_versions_of_java(api, container)
- container.versions = ContainerVersions(python_versions, java_versions)
-
- if ignore_empty:
- containers = [c for c in containers if c.versions.python or c.versions.java]
-
- return containers
-
-
-def generate_output_tabulate(containers: Iterable[ContainerInfo]) -> str:
- """Function for generate output string in tabulate format.
-
- Args:
- containers: List of items, that represents containers in k8s cluster.
-
- Returns:
- Output string formatted by tabulate module.
- """
-
- headers = [
- "Namespace",
- "Pod",
- "Container",
- "Running",
- "CPython",
- "OpenJDK",
- ]
-
- rows = [
- [
- container.namespace,
- container.pod,
- container.container,
- container.extra.running,
- " ".join(container.versions.python),
- " ".join(container.versions.java),
- ]
- for container in containers
- ]
-
- output = tabulate.tabulate(rows, headers=headers)
-
- return output
-
-
-def generate_output_pprint(containers: Iterable[ContainerInfo]) -> str:
- """Function for generate output string in pprint format.
-
- Args:
- containers: List of items, that represents containers in k8s cluster.
-
- Returns:
- Output string formatted by pprint module.
- """
-
- output = pprint.pformat(containers)
-
- return output
-
-
-def generate_output_json(containers: Iterable[ContainerInfo]) -> str:
- """Function for generate output string in JSON format.
-
- Args:
- containers: List of items, that represents containers in k8s cluster.
-
- Returns:
- Output string formatted by json module.
- """
-
- data = [
- {
- "namespace": container.namespace,
- "pod": container.pod,
- "container": container.container,
- "extra": {
- "running": container.extra.running,
- "image": container.extra.image,
- "identifier": container.extra.identifier,
- },
- "versions": {
- "python": container.versions.python,
- "java": container.versions.java,
- },
- }
- for container in containers
- ]
-
- output = json.dumps(data, indent=4)
-
- return output
-
-
-def generate_and_handle_output(
- containers: List[ContainerInfo],
- output_format: str,
- output_file: pathlib.Path,
- quiet: bool,
-) -> None:
- """Generate and handle the output of the containers software versions.
-
- Args:
- containers: List of items, that represents containers in k8s cluster.
- output_format: String that will determine output format (tabulate, pprint, json).
- output_file: Path to file, where output will be save.
- quiet: Determines if output should be printed, to stdout.
- """
-
- output_generators = {
- "tabulate": generate_output_tabulate,
- "pprint": generate_output_pprint,
- "json": generate_output_json,
- }
-
- output = output_generators[output_format](containers)
-
- if output_file:
- output_file.write_text(output)
-
- if not quiet:
- print(output)
-
-
-def verify_versions_acceptability(
- containers: List[ContainerInfo], acceptable: pathlib.Path, quiet: bool
-) -> bool:
- """Function for verification of software versions installed in containers.
-
- Args:
- containers: List of items, that represents containers in k8s cluster.
- acceptable: Path to the YAML file, with the software verification parameters.
- quiet: Determines if output should be printed, to stdout.
-
- Returns:
- 0 if the verification was successful or 1 otherwise.
- """
-
- if not acceptable:
- return 0
-
- if not acceptable.is_file():
- raise FileNotFoundError(
- "File with configuration for acceptable does not exists!"
- )
-
- schema = {
- "python": {"type": "list", "schema": {"type": "string"}},
- "java": {"type": "list", "schema": {"type": "string"}},
- }
-
- validator = cerberus.Validator(schema)
-
- with open(acceptable) as stream:
- data = yaml.safe_load(stream)
-
- if not validator.validate(data):
- raise cerberus.SchemaError(
- "Schema of file with configuration for acceptable is not valid."
- )
-
- python_acceptable = data.get("python", [])
- java_acceptable = data.get("java", [])
-
- python_not_acceptable = [
- (container, "python", version)
- for container in containers
- for version in container.versions.python
- if version not in python_acceptable
- ]
-
- java_not_acceptable = [
- (container, "java", version)
- for container in containers
- for version in container.versions.java
- if version not in java_acceptable
- ]
-
- if not python_not_acceptable and not java_not_acceptable:
- return 0
-
- if quiet:
- return 1
-
- print("List of not acceptable versions")
- pprint.pprint(python_not_acceptable)
- pprint.pprint(java_not_acceptable)
-
- return 1
-
-
-def main(argv: Optional[List[str]] = None) -> str:
- """Main entrypoint of the module for verifying versions of CPython and
- OpenJDK installed in k8s cluster containers.
-
- Args:
- argv: List of command line arguments.
- """
-
- args = parse_argv(argv)
-
- kubernetes.config.load_kube_config(args.config_file)
-
- api = kubernetes.client.CoreV1Api()
- api.api_client.configuration.debug = args.debug
-
- containers = gather_containers_informations(
- api, args.field_selector, args.ignore_empty, args.namespace,
- args.check_istio_sidecar, args.istio_sidecar_name
- )
-
- generate_and_handle_output(
- containers, args.output_format, args.output_file, args.quiet
- )
-
- code = verify_versions_acceptability(containers, args.acceptable, args.quiet)
-
- return code
-
-
-if __name__ == "__main__":
- sys.exit(main())