path: root/k8s/k8sclient/
diff options
authorMiroslav Los <>2019-12-18 18:28:59 +0100
committerMiroslav Los <>2020-01-17 19:02:50 +0100
commitb10bd92fc39633dbfffba2e5bd5c4630baf880e6 (patch)
treea58da0e1a1b53869553cb94f8619d97daf0bca09 /k8s/k8sclient/
parent0208d99f681e2a0f00a4f0cd8e7a45f41ea67c1a (diff)
Address k8s plugin code smells reported by sonar
Make 'resources' argument to k8sclient.k8sclient.deploy an optional kwarg, update its uses and document it. Split off code reported complex from deploy into functions. Tweak a nested if in tasks. Signed-off-by: Miroslav Los <> Issue-ID: DCAEGEN2-2006 Change-Id: I13a091de9207bab1c7d4eee3179263c5d994ffbf
Diffstat (limited to 'k8s/k8sclient/')
1 files changed, 103 insertions, 96 deletions
diff --git a/k8s/k8sclient/ b/k8s/k8sclient/
index 323a208..9aeec24 100644
--- a/k8s/k8sclient/
+++ b/k8s/k8sclient/
@@ -2,7 +2,7 @@
# org.onap.dcae
# ================================================================================
# Copyright (c) 2019 AT&T Intellectual Property. All rights reserved.
-# Copyright (c) 2019 All rights reserved.
+# Copyright (c) 2020 All rights reserved.
# ================================================================================
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -134,9 +134,10 @@ def _create_resources(resources=None):
return None
-def _create_container_object(name, image, always_pull, env={}, container_ports=[], volume_mounts = [], resources = None, readiness = None, liveness = None):
+def _create_container_object(name, image, always_pull, **kwargs):
# Set up environment variables
# Copy any passed in environment variables
+ env = kwargs.get('env') or {}
env_vars = [client.V1EnvVar(name=k, value=env[k]) for k in env]
# Add POD_IP with the IP address of the pod running the container
pod_ip = client.V1EnvVarSource(field_ref = client.V1ObjectFieldSelector(field_path="status.podIP"))
@@ -144,35 +145,29 @@ def _create_container_object(name, image, always_pull, env={}, container_ports=[
# If a health check is specified, create a readiness/liveness probe
# (For an HTTP-based check, we assume it's at the first container port)
- probe = None
- live_probe = None
- if readiness:
- hc_port = None
- if len(container_ports) > 0:
- (hc_port, proto) = container_ports[0]
- probe = _create_probe(readiness, hc_port)
- if liveness:
- hc_port = None
- if len(container_ports) > 0:
- (hc_port, proto) = container_ports[0]
- live_probe = _create_probe(liveness, hc_port)
- if resources:
- resources_obj = _create_resources(resources)
- else:
- resources_obj = None
+ readiness = kwargs.get('readiness')
+ liveness = kwargs.get('liveness')
+ resources = kwargs.get('resources')
+ container_ports = kwargs.get('container_ports') or []
+ hc_port = container_ports[0][0] if container_ports else None
+ probe = _create_probe(readiness, hc_port) if readiness else None
+ live_probe = _create_probe(liveness, hc_port) if liveness else None
+ resources_obj = _create_resources(resources) if resources else None
+ port_objs = [client.V1ContainerPort(container_port=port, protocol=proto)
+ for port, proto in container_ports]
# Define container for pod
return client.V1Container(
image_pull_policy='Always' if always_pull else 'IfNotPresent',
- ports=[client.V1ContainerPort(container_port=p, protocol=proto) for (p, proto) in container_ports],
- volume_mounts = volume_mounts,
- resources = resources_obj,
- readiness_probe = probe,
- liveness_probe = live_probe
+ ports=port_objs,
+ volume_mounts=kwargs.get('volume_mounts') or [],
+ resources=resources_obj,
+ readiness_probe=probe,
+ liveness_probe=live_probe
def _create_deployment_object(component_name,
@@ -274,6 +269,75 @@ def _parse_volumes(volume_list):
return volumes, volume_mounts
+def _add_elk_logging_sidecar(containers, volumes, volume_mounts, component_name, log_info, filebeat):
+ if not log_info or not filebeat:
+ return
+ log_dir = log_info.get("log_directory")
+ if not log_dir:
+ return
+ sidecar_volume_mounts = []
+ # Create the volume for component log files and volume mounts for the component and sidecar containers
+ volumes.append(client.V1Volume(name="component-log", empty_dir=client.V1EmptyDirVolumeSource()))
+ volume_mounts.append(client.V1VolumeMount(name="component-log", mount_path=log_dir))
+ sc_path = log_info.get("alternate_fb_path") or "{0}/{1}".format(filebeat["log_path"], component_name)
+ sidecar_volume_mounts.append(client.V1VolumeMount(name="component-log", mount_path=sc_path))
+ # Create the volume for sidecar data and the volume mount for it
+ volumes.append(client.V1Volume(name="filebeat-data", empty_dir=client.V1EmptyDirVolumeSource()))
+ sidecar_volume_mounts.append(client.V1VolumeMount(name="filebeat-data", mount_path=filebeat["data_path"]))
+ # Create the volume for the sidecar configuration data and the volume mount for it
+ # The configuration data is in a k8s ConfigMap that should be created when DCAE is installed.
+ volumes.append(
+ client.V1Volume(name="filebeat-conf", config_map=client.V1ConfigMapVolumeSource(name=filebeat["config_map"])))
+ sidecar_volume_mounts.append(
+ client.V1VolumeMount(name="filebeat-conf", mount_path=filebeat["config_path"], sub_path=filebeat["config_subpath"]))
+ # Finally create the container for the sidecar
+ containers.append(_create_container_object("filebeat", filebeat["image"], False, volume_mounts=sidecar_volume_mounts))
+def _add_tls_init_container(init_containers, volumes, volume_mounts, tls_info, tls_config):
+ # Two different ways of doing this, depending on whether the container will act as a TLS server or as a client only
+ # If a server, then tls_info will be passed, and tls_info["use_tls"] will be set to true. We create an InitContainer
+ # that sets up the CA cert, the server cert, and the keys.
+ # If a client only, only the CA cert is needed. We mount the CA cert from a ConfigMap that has been created as part
+ # of the installation process. If there is cert_directory information in tls_info, we use that directory in the mount path.
+ # Otherwise, we use the configured default path in tls_config.
+ cert_directory = None
+ if tls_info:
+ cert_directory = tls_info.get("cert_directory")
+ if cert_directory and tls_info.get("use_tls"):
+ # Use an InitContainer to set up the certificate information
+ # Create the certificate volume and volume mounts
+ volumes.append(client.V1Volume(name="tls-info", empty_dir=client.V1EmptyDirVolumeSource()))
+ volume_mounts.append(client.V1VolumeMount(name="tls-info", mount_path=cert_directory))
+ init_volume_mounts = [client.V1VolumeMount(name="tls-info", mount_path=tls_config["cert_path"])]
+ # Just create the init container
+ init_containers.append(_create_container_object("init-tls", tls_config["image"], False, volume_mounts=init_volume_mounts))
+ return
+ # Use a config map
+ # Create the CA cert volume
+ volumes.append(client.V1Volume(name="tls-cacert", config_map=client.V1ConfigMapVolumeSource(name=tls_config["ca_cert_configmap"])))
+ # Create the volume mount
+ mount_path = cert_directory or os.path.dirname(tls_config["component_ca_cert_path"])
+ volume_mounts.append(client.V1VolumeMount(name="tls-cacert", mount_path=mount_path))
+def _process_port_map(port_map):
+ service_ports = [] # Ports exposed internally on the k8s network
+ exposed_ports = [] # Ports to be mapped to ports on the k8s nodes via NodePort
+ for (cport, proto), hport in port_map.items():
+ name = "xport-{0}-{1}".format(proto[0].lower(), cport)
+ cport = int(cport)
+ hport = int(hport)
+ service_ports.append(client.V1ServicePort(port=cport, protocol=proto, name=name[1:]))
+ if hport != 0:
+ exposed_ports.append(client.V1ServicePort(port=cport, protocol=proto, node_port=hport, name=name))
+ return service_ports, exposed_ports
def _service_exists(location, namespace, component_name):
exists = False
@@ -353,7 +417,7 @@ def _execute_command_in_pod(location, namespace, pod_name, command):
return {"pod" : pod_name, "output" : output}
-def deploy(namespace, component_name, image, replicas, always_pull, k8sconfig, resources, **kwargs):
+def deploy(namespace, component_name, image, replicas, always_pull, k8sconfig, **kwargs):
This will create a k8s Deployment and, if needed, one or two k8s Services.
(We are being opinionated in our use of k8s... this code decides what k8s abstractions and features to use.
@@ -394,6 +458,9 @@ def deploy(namespace, component_name, image, replicas, always_pull, k8sconfig, r
{"use_tls": true, "cert_directory": "/path/to/container/cert/directory" }
- labels: dict with label-name/label-value pairs, e.g. {"cfydeployment" : "lsdfkladflksdfsjkl", "cfynode":"mycomponent"}
These label will be set on all the pods deployed as a result of this deploy() invocation.
+ - resources: dict with optional "limits" and "requests" resource requirements, each a dict containing:
+ - cpu: number CPU usage, like 0.5
+ - memory: string memory requirement, like "2Gi"
- readiness: dict with health check info; if present, used to create a readiness probe for the main container. Includes:
- type: check is done by making http(s) request to an endpoint ("http", "https") or by exec'ing a script in the container ("script", "docker")
- interval: period (in seconds) between probes
@@ -408,7 +475,6 @@ def deploy(namespace, component_name, image, replicas, always_pull, k8sconfig, r
- path: the full path to the script to be executed in the container for "script" and "docker" types
- k8s_location: name of the Kubernetes location (cluster) where the component is to be deployed
deployment_ok = False
@@ -431,82 +497,28 @@ def deploy(namespace, component_name, image, replicas, always_pull, k8sconfig, r
container_ports, port_map = parse_ports(kwargs.get("ports", []))
# Parse the volumes list into volumes and volume_mounts for the deployment
- volumes, volume_mounts = _parse_volumes(kwargs.get("volumes",[]))
+ volumes, volume_mounts = _parse_volumes(kwargs.get("volumes", []))
# Initialize the list of containers that will be part of the pod
containers = []
init_containers = []
# Set up the ELK logging sidecar container, if needed
- log_info = kwargs.get("log_info")
- if log_info and "log_directory" in log_info:
- log_dir = log_info["log_directory"]
- fb = k8sconfig["filebeat"]
- sidecar_volume_mounts = []
- # Create the volume for component log files and volume mounts for the component and sidecar containers
- volumes.append(client.V1Volume(name="component-log", empty_dir=client.V1EmptyDirVolumeSource()))
- volume_mounts.append(client.V1VolumeMount(name="component-log", mount_path=log_dir))
- sc_path = log_info["alternate_fb_path"] if "alternate_fb_path" in log_info \
- else "{0}/{1}".format(fb["log_path"], component_name)
- sidecar_volume_mounts.append(client.V1VolumeMount(name="component-log", mount_path=sc_path))
- # Create the volume for sidecar data and the volume mount for it
- volumes.append(client.V1Volume(name="filebeat-data", empty_dir=client.V1EmptyDirVolumeSource()))
- sidecar_volume_mounts.append(client.V1VolumeMount(name="filebeat-data", mount_path=fb["data_path"]))
- # Create the container for the sidecar
- containers.append(_create_container_object("filebeat", fb["image"], False, {}, [], sidecar_volume_mounts))
- # Create the volume for the sidecar configuration data and the volume mount for it
- # The configuration data is in a k8s ConfigMap that should be created when DCAE is installed.
- volumes.append(
- client.V1Volume(name="filebeat-conf", config_map=client.V1ConfigMapVolumeSource(name=fb["config_map"])))
- sidecar_volume_mounts.append(
- client.V1VolumeMount(name="filebeat-conf", mount_path=fb["config_path"], sub_path=fb["config_subpath"]))
+ _add_elk_logging_sidecar(containers, volumes, volume_mounts, component_name, kwargs.get("log_info"), k8sconfig.get("filebeat"))
# Set up TLS information
- # Two different ways of doing this, depending on whether the container will act as a TLS server or as a client only
- # If a server, then tls_info will be passed, and tls_info["use_tls"] will be set to true. We create an InitContainer
- # that sets up the CA cert, the server cert, and the keys.
- # If a client only, only the CA cert is needed. We mount the CA cert from a ConfigMap that has been created as part
- # of the installation process. If there is cert_directory information in tls_info, we use that directory in the mount path.
- # Otherwise, we use the configured default path in tls_config.
- tls_info = kwargs.get("tls_info")
- tls_config = k8sconfig["tls"]
- tls_server = False
- cert_directory = None
- if tls_info and "cert_directory" in tls_info and len(tls_info["cert_directory"]) > 0:
- cert_directory = tls_info["cert_directory"]
- if tls_info and tls_info.get("use_tls", False):
- tls_server = True
- # Use an InitContainer to set up the certificate information
- # Create the certificate volume and volume mounts
- volumes.append(client.V1Volume(name="tls-info", empty_dir=client.V1EmptyDirVolumeSource()))
- volume_mounts.append(client.V1VolumeMount(name="tls-info", mount_path=cert_directory))
- init_volume_mounts = [client.V1VolumeMount(name="tls-info", mount_path=tls_config["cert_path"])]
- # Create the init container
- init_containers.append(_create_container_object("init-tls", tls_config["image"], False, {}, [], init_volume_mounts))
- if not tls_server:
- # Use a config map
- # Create the CA cert volume
- volumes.append(client.V1Volume(name="tls-cacert", config_map=client.V1ConfigMapVolumeSource(name=tls_config["ca_cert_configmap"])))
- # Create the volume mount
- mount_path= cert_directory if cert_directory else os.path.dirname(tls_config["component_ca_cert_path"])
- volume_mounts.append(client.V1VolumeMount(name="tls-cacert", mount_path=mount_path))
+ _add_tls_init_container(init_containers, volumes, volume_mounts, kwargs.get("tls_info") or {}, k8sconfig.get("tls"))
# Create the container for the component
# Make it the first container in the pod
- containers.insert(0, _create_container_object(component_name, image, always_pull, kwargs.get("env", {}), container_ports, volume_mounts, resources, kwargs["readiness"], kwargs.get("liveness")))
+ container_args = {key: kwargs.get(key) for key in ("env", "readiness", "liveness", "resources")}
+ container_args['container_ports'] = container_ports
+ container_args['volume_mounts'] = volume_mounts
+ containers.insert(0, _create_container_object(component_name, image, always_pull, **container_args))
# Build the k8s Deployment object
labels = kwargs.get("labels", {})
- labels.update({"app": component_name})
+ labels["app"] = component_name
dep = _create_deployment_object(component_name, containers, init_containers, replicas, volumes, labels, pull_secrets=k8sconfig["image_pull_secrets"])
# Have k8s deploy it
@@ -516,12 +528,7 @@ def deploy(namespace, component_name, image, replicas, always_pull, k8sconfig, r
# Create service(s), if a port mapping is specified
if port_map:
- service_ports = [] # Ports exposed internally on the k8s network
- exposed_ports = [] # Ports to be mapped to ports on the k8s nodes via NodePort
- for (cport, proto), hport in port_map.items():
- service_ports.append(client.V1ServicePort(port=int(cport),protocol=proto,name="port-{0}-{1}".format(proto[0].lower(), cport)))
- if int(hport) != 0:
- exposed_ports.append(client.V1ServicePort(port=int(cport),protocol=proto,node_port=int(hport),name="xport-{0}-{1}".format(proto[0].lower(),cport)))
+ service_ports, exposed_ports = _process_port_map(port_map)
# If there are ports to be exposed via MSB, set up the annotation for the service
msb_list = kwargs.get("msb_list")
@@ -534,7 +541,7 @@ def deploy(namespace, component_name, image, replicas, always_pull, k8sconfig, r
# If there are ports to be exposed on the k8s nodes, create a "NodePort" service
- if len(exposed_ports) > 0:
+ if exposed_ports:
exposed_service = \
_create_service_object(_create_exposed_service_name(component_name), component_name, exposed_ports, '', labels, "NodePort")
core.create_namespaced_service(namespace, exposed_service)