From a3f62fa65e34b2dd1130ea8dd647a98e0575a330 Mon Sep 17 00:00:00 2001 From: Jack Lucas Date: Mon, 11 Mar 2019 17:43:37 -0400 Subject: Add multi-cluster support Issue-ID: DCAEGEN2-1136 Change-Id: I314e5d8c501198b3e87c45813201498935c7bacc Signed-off-by: Jack Lucas --- k8s/ChangeLog.md | 3 +++ k8s/configure/configure.py | 30 +++++++++++++------------ k8s/k8s-node-type.yaml | 13 +++++++++-- k8s/k8sclient/k8sclient.py | 53 +++++++++++++++++++++++++-------------------- k8s/k8splugin/tasks.py | 36 +++++++++++++++++++++++------- k8s/pom.xml | 2 +- k8s/setup.py | 2 +- k8s/tests/test_k8sclient.py | 4 ++-- k8s/tests/test_tasks.py | 10 ++++----- 9 files changed, 97 insertions(+), 56 deletions(-) diff --git a/k8s/ChangeLog.md b/k8s/ChangeLog.md index a59a016..4faaaf7 100644 --- a/k8s/ChangeLog.md +++ b/k8s/ChangeLog.md @@ -5,6 +5,9 @@ All notable changes to this project will be documented in this file. The format is based on [Keep a Changelog](http://keepachangelog.com/) and this project adheres to [Semantic Versioning](http://semver.org/). +## [1.4.10] + Support for deploying to multiple Kubernetes clusters. + ## [1.4.9] * Support for liveness probes (https://kubernetes.io/docs/tasks/configure-pod-container/configure-liveness-readiness-probes/) * fix the readiness probe to run script such as "/opt/app/snmptrap/bin/snmptrapd.sh status" diff --git a/k8s/configure/configure.py b/k8s/configure/configure.py index de196c0..e15939a 100644 --- a/k8s/configure/configure.py +++ b/k8s/configure/configure.py @@ -1,7 +1,7 @@ # ============LICENSE_START======================================================= # org.onap.dcae # ================================================================================ -# Copyright (c) 2018 AT&T Intellectual Property. All rights reserved. +# Copyright (c) 2018-2019 AT&T Intellectual Property. All rights reserved. # ================================================================================ # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -24,6 +24,7 @@ _CONSUL_KEY = "k8s-plugin" # Key under which CM configuration is st # Default configuration values DCAE_NAMESPACE = "dcae" CONSUL_DNS_NAME = "consul" +DEFAULT_K8S_LOCATION = "central" FB_LOG_PATH = "/var/log/onap" FB_DATA_PATH = "/usr/share/filebeat/data" @@ -38,20 +39,21 @@ TLS_IMAGE = "nexus3.onap.org:10001/onap/org.onap.dcaegen2.deployments.tls-init-c def _set_defaults(): """ Set default configuration parameters """ return { - "namespace" : DCAE_NAMESPACE, # k8s namespace to use for DCAE - "consul_dns_name" : CONSUL_DNS_NAME, # k8s internal DNS name for Consul - "image_pull_secrets" : [], # list of k8s secrets for accessing Docker registries - "filebeat": { # Configuration for setting up filebeat container - "log_path" : FB_LOG_PATH, # mount point for log volume in filebeat container - "data_path" : FB_DATA_PATH, # mount point for data volume in filebeat container - "config_path" : FB_CONFIG_PATH, # mount point for config volume in filebeat container - "config_subpath" : FB_CONFIG_SUBPATH, # subpath for config data in filebeat container - "config_map" : FB_CONFIG_MAP, # ConfigMap holding the filebeat configuration - "image": FB_IMAGE # Docker image to use for filebeat + "namespace" : DCAE_NAMESPACE, # k8s namespace to use for DCAE + "consul_dns_name" : CONSUL_DNS_NAME, # k8s internal DNS name for Consul + "default_k8s_location" : DEFAULT_K8S_LOCATION, # default k8s location to deploy components + "image_pull_secrets" : [], # list of k8s secrets for accessing Docker registries + "filebeat": { # Configuration for setting up filebeat container + "log_path" : FB_LOG_PATH, # mount point for log volume in filebeat container + "data_path" : FB_DATA_PATH, # mount point for data volume in filebeat container + "config_path" : FB_CONFIG_PATH, # mount point for config volume in filebeat container + "config_subpath" : FB_CONFIG_SUBPATH, # subpath for config data in filebeat container + "config_map" : FB_CONFIG_MAP, # ConfigMap holding the filebeat configuration + "image": FB_IMAGE # Docker image to use for filebeat }, - "tls": { # Configuration for setting up TLS init container - "cert_path" : TLS_CERT_PATH, # mount point for certificate volume in TLS init container - "image": TLS_IMAGE # Docker image to use for TLS init container + "tls": { # Configuration for setting up TLS init container + "cert_path" : TLS_CERT_PATH, # mount point for certificate volume in TLS init container + "image": TLS_IMAGE # Docker image to use for TLS init container } } diff --git a/k8s/k8s-node-type.yaml b/k8s/k8s-node-type.yaml index c803b81..5a19e8a 100644 --- a/k8s/k8s-node-type.yaml +++ b/k8s/k8s-node-type.yaml @@ -25,7 +25,7 @@ plugins: k8s: executor: 'central_deployment_agent' package_name: k8splugin - package_version: 1.4.9 + package_version: 1.4.10 data_types: @@ -126,7 +126,6 @@ node_types: Please specify "requests" property and/or a "limits" property, with subproproperties for cpu and memory. (https://kubernetes.io/docs/concepts/configuration/manage-compute-resources-container/) - log_info: type: dcae.types.LoggingInfo description: > @@ -153,6 +152,16 @@ node_types: not already present on the Docker host where the container is being launched. default: false + location_id: + type: string + description: > + The identifier for the location where the component is to be deployed. + If absent, the plugin uses its configured default location, typically the location + where the plugin is running (the central site). Also used to supply a location to + the DMaaP bus controller if the component is being provisioned as a publisher or + subscriber to a DMaaP feed or topic. + required: false + interfaces: dcae.interfaces.update: scale: diff --git a/k8s/k8sclient/k8sclient.py b/k8s/k8sclient/k8sclient.py index d3417a7..ee4250d 100644 --- a/k8s/k8sclient/k8sclient.py +++ b/k8s/k8sclient/k8sclient.py @@ -27,6 +27,9 @@ from kubernetes import config, client, stream PROBE_DEFAULT_PERIOD = 15 PROBE_DEFAULT_TIMEOUT = 1 +# Location of k8s cluster config file ("kubeconfig") +K8S_CONFIG_PATH="/opt/onap/kubeconfig" + # Regular expression for interval/timeout specification INTERVAL_SPEC = re.compile("^([0-9]+)(s|m|h)?$") # Conversion factors to seconds @@ -48,11 +51,10 @@ def _create_service_name(component_name): def _create_exposed_service_name(component_name): return ("x{0}".format(component_name))[:63] -def _configure_api(): - # Look for a kubernetes config file in ~/.kube/config - kubepath = os.path.join(os.environ["HOME"], '.kube/config') - if os.path.exists(kubepath): - config.load_kube_config(kubepath) +def _configure_api(location=None): + # Look for a kubernetes config file + if os.path.exists(K8S_CONFIG_PATH): + config.load_kube_config(config_file=K8S_CONFIG_PATH, context=location, persist_config=False) else: # Maybe we're running in a k8s container and we can use info provided by k8s # We would like to use: @@ -271,10 +273,10 @@ def _parse_volumes(volume_list): return volumes, volume_mounts -def _service_exists(namespace, component_name): +def _service_exists(location, namespace, component_name): exists = False try: - _configure_api() + _configure_api(location) client.CoreV1Api().read_namespaced_service(_create_service_name(component_name), namespace) exists = True except client.rest.ApiException: @@ -282,13 +284,14 @@ def _service_exists(namespace, component_name): return exists -def _patch_deployment(namespace, deployment, modify): +def _patch_deployment(location, namespace, deployment, modify): ''' - Gets the current spec for 'deployment' in 'namespace', + Gets the current spec for 'deployment' in 'namespace' + in the k8s cluster at 'location', uses the 'modify' function to change the spec, then sends the updated spec to k8s. ''' - _configure_api() + _configure_api(location) # Get deployment spec spec = client.ExtensionsV1beta1Api().read_namespaced_deployment(deployment, namespace) @@ -299,10 +302,11 @@ def _patch_deployment(namespace, deployment, modify): # Patch the deploy with updated spec client.ExtensionsV1beta1Api().patch_namespaced_deployment(deployment, namespace, spec) -def _execute_command_in_pod(namespace, pod_name, command): +def _execute_command_in_pod(location, namespace, pod_name, command): ''' Execute the command (specified by an argv-style list in the "command" parameter) in - the specified pod in the specified namespace. For now at least, we use this only to + the specified pod in the specified namespace at the specified location. + For now at least, we use this only to run a notification script in a pod after a configuration change. The straightforward way to do this is with the V1 Core API function "connect_get_namespaced_pod_exec". @@ -323,7 +327,7 @@ def _execute_command_in_pod(namespace, pod_name, command): The "stream" approach returns a string containing any output sent by the command to stdout or stderr. We'll return that so it can logged. ''' - _configure_api() + _configure_api(location) try: output = stream.stream(client.CoreV1Api().connect_get_namespaced_pod_exec, name=pod_name, @@ -399,6 +403,8 @@ def deploy(namespace, component_name, image, replicas, always_pull, k8sconfig, r - timeout: time (in seconds) to allow a probe to complete - endpoint: the path portion of the URL that points to the liveness endpoint for "http" and "https" types - path: the full path to the script to be executed in the container for "script" and "docker" types + - k8s_location: name of the Kubernetes location (cluster) where the component is to be deployed + ''' @@ -406,6 +412,7 @@ def deploy(namespace, component_name, image, replicas, always_pull, k8sconfig, r cip_service_created = False deployment_description = { "namespace": namespace, + "location" : kwargs.get("k8s_location"), "deployment": '', "services": [] } @@ -413,7 +420,7 @@ def deploy(namespace, component_name, image, replicas, always_pull, k8sconfig, r try: # Get API handles - _configure_api() + _configure_api(kwargs.get("k8s_location")) core = client.CoreV1Api() ext = client.ExtensionsV1beta1Api() @@ -523,7 +530,7 @@ def deploy(namespace, component_name, image, replicas, always_pull, k8sconfig, r return dep, deployment_description def undeploy(deployment_description): - _configure_api() + _configure_api(deployment_description["location"]) namespace = deployment_description["namespace"] @@ -535,8 +542,8 @@ def undeploy(deployment_description): options = client.V1DeleteOptions(propagation_policy="Foreground") client.ExtensionsV1beta1Api().delete_namespaced_deployment(deployment_description["deployment"], namespace, options) -def is_available(namespace, component_name): - _configure_api() +def is_available(location, namespace, component_name): + _configure_api(location) dep_status = client.AppsV1beta1Api().read_namespaced_deployment_status(_create_deployment_name(component_name), namespace) # Check if the number of available replicas is equal to the number requested and that the replicas match the current spec # This check can be used to verify completion of an initial deployment, a scale operation, or an update operation @@ -549,7 +556,7 @@ def scale(deployment_description, replicas): spec.spec.replicas = replicas return spec - _patch_deployment(deployment_description["namespace"], deployment_description["deployment"], update_replica_count) + _patch_deployment(deployment_description["location"], deployment_description["namespace"], deployment_description["deployment"], update_replica_count) def upgrade(deployment_description, image, container_index = 0): ''' Trigger a rolling upgrade by sending a new image name/tag to k8s ''' @@ -558,7 +565,7 @@ def upgrade(deployment_description, image, container_index = 0): spec.spec.template.spec.containers[container_index].image = image return spec - _patch_deployment(deployment_description["namespace"], deployment_description["deployment"], update_image) + _patch_deployment(deployment_description["location"], deployment_description["namespace"], deployment_description["deployment"], update_image) def rollback(deployment_description, rollback_to=0): ''' @@ -576,7 +583,7 @@ def rollback(deployment_description, rollback_to=0): - https://github.com/kubernetes/kubernetes/pull/63837 The fix has been merged into the master branch but is not in the latest release. ''' - _configure_api() + _configure_api(deployment_description["location"]) deployment = deployment_description["deployment"] namespace = deployment_description["namespace"] @@ -620,8 +627,8 @@ def execute_command_in_deployment(deployment_description, command): the pod that has the k8s deployment name. To list the pods, the code below queries for pods with the label carrying the deployment name. ''' - - _configure_api() + location = deployment_description["location"] + _configure_api(location) deployment = deployment_description["deployment"] namespace = deployment_description["namespace"] @@ -633,7 +640,7 @@ def execute_command_in_deployment(deployment_description, command): ).items] def do_execute(pod_name): - return _execute_command_in_pod(namespace, pod_name, command) + return _execute_command_in_pod(location, namespace, pod_name, command) # Execute command in the running pods return map(do_execute, pod_names) diff --git a/k8s/k8splugin/tasks.py b/k8s/k8splugin/tasks.py index 399bc9f..fdb00c5 100644 --- a/k8s/k8splugin/tasks.py +++ b/k8s/k8splugin/tasks.py @@ -43,6 +43,7 @@ CONSUL_HOST = plugin_conf.get("consul_host") CONSUL_INTERNAL_NAME = plugin_conf.get("consul_dns_name") DCAE_NAMESPACE = plugin_conf.get("namespace") DEFAULT_MAX_WAIT = plugin_conf.get("max_wait", 1800) +DEFAULT_K8S_LOCATION = plugin_conf.get("default_k8s_location") # Used to construct delivery urls for data router subscribers. Data router in FTL # requires https but this author believes that ONAP is to be defaulted to http. @@ -54,6 +55,7 @@ CONTAINER_ID = "container_id" APPLICATION_CONFIG = "application_config" K8S_DEPLOYMENT = "k8s_deployment" RESOURCE_KW = "resource_config" +LOCATION_ID = "location_id" # Utility methods @@ -106,6 +108,11 @@ def _get_resources(**kwargs): ctx.logger.info("set resources to None") return None +def _get_location(): + ''' Get the k8s location property. Set to the default if the property is missing, None, or zero-length ''' + return ctx.node.properties["location_id"] if "location_id" in ctx.node.properties and ctx.node.properties["location_id"] \ + else DEFAULT_K8S_LOCATION + @merge_inputs_for_create @monkeypatch_loggers @Policies.gather_policies_to_node() @@ -238,11 +245,13 @@ def _lookup_service(service_component_name, consul_host=CONSUL_HOST, else: return results[0]["ServiceAddress"] -def _verify_k8s_deployment(service_component_name, max_wait): +def _verify_k8s_deployment(location, service_component_name, max_wait): """Verify that the k8s Deployment is ready Args: ----- + location (string): location of the k8s cluster where the component was deployed + service_component_name: component's service component name max_wait (integer): limit to how may attempts to make which translates to seconds because each sleep is one second. 0 means infinite. @@ -253,7 +262,7 @@ def _verify_k8s_deployment(service_component_name, max_wait): num_attempts = 1 while True: - if k8sclient.is_available(DCAE_NAMESPACE, service_component_name): + if k8sclient.is_available(location, DCAE_NAMESPACE, service_component_name): return True else: num_attempts += 1 @@ -287,6 +296,7 @@ def _create_and_start_container(container_name, image, **kwargs): - replicas: number of replicas to be launched initially - readiness: object with information needed to create a readiness check - liveness: object with information needed to create a liveness check + - k8s_location: name of the Kubernetes location (cluster) where the component is to be deployed ''' env = { "CONSUL_HOST": CONSUL_INTERNAL_NAME, "CONFIG_BINDING_SERVICE": "config-binding-service" } @@ -310,7 +320,8 @@ def _create_and_start_container(container_name, image, **kwargs): labels = kwargs.get("labels", {}), log_info=kwargs.get("log_info"), readiness=kwargs.get("readiness"), - liveness=kwargs.get("liveness")) + liveness=kwargs.get("liveness"), + k8s_location=kwargs.get("k8s_location")) # Capture the result of deployment for future use ctx.instance.runtime_properties[K8S_DEPLOYMENT] = dep @@ -333,7 +344,7 @@ def _parse_cloudify_context(**kwargs): "cfynodeinstance": ctx.instance.id[:63] } - # Pick up the centralized logging info + # Pick up the centralized logging info if "log_info" in ctx.node.properties and "log_directory" in ctx.node.properties["log_info"]: kwargs["log_info"] = ctx.node.properties["log_info"] @@ -347,6 +358,9 @@ def _parse_cloudify_context(**kwargs): if "always_pull_image" in ctx.node.properties: kwargs["always_pull_image"] = ctx.node.properties["always_pull_image"] + # Pick up location + kwargs["k8s_location"] = _get_location() + return kwargs def _enhance_docker_params(**kwargs): @@ -402,7 +416,8 @@ def _create_and_start_component(**kwargs): "labels": kwargs.get("labels", {}), "resource_config": kwargs.get("resource_config",{}), "readiness": kwargs.get("readiness",{}), - "liveness": kwargs.get("liveness",{})} + "liveness": kwargs.get("liveness",{}), + "k8s_location": kwargs.get("k8s_location")} returned_args = _create_and_start_container(service_component_name, image, **sub_kwargs) kwargs[K8S_DEPLOYMENT] = returned_args[K8S_DEPLOYMENT] @@ -415,7 +430,7 @@ def _verify_component(**kwargs): max_wait = kwargs.get("max_wait", DEFAULT_MAX_WAIT) ctx.logger.info("Waiting up to {0} secs for {1} to become ready".format(max_wait, service_component_name)) - if _verify_k8s_deployment(service_component_name, max_wait): + if _verify_k8s_deployment(kwargs.get("k8s_location"), service_component_name, max_wait): ctx.logger.info("k8s deployment is ready for: {0}".format(service_component_name)) else: # The component did not become ready within the "max_wait" interval. @@ -569,6 +584,10 @@ def create_and_start_container_for_platforms(**kwargs): kwargs["replicas"] = ctx.node.properties["replicas"] if "always_pull_image" in ctx.node.properties: kwargs["always_pull_image"] = ctx.node.properties["always_pull_image"] + + # Pick up location + kwargs["k8s_location"] = _get_location() + returned_args = _create_and_start_container(service_component_name, image, **kwargs) # Verify that the k8s deployment is ready @@ -586,6 +605,7 @@ def create_and_start_container(**kwargs): ctx.instance.runtime_properties[SERVICE_COMPONENT_NAME] = service_component_name image = ctx.node.properties["image"] + kwargs["k8s_location"] = _get_location() _create_and_start_container(service_component_name, image,**kwargs) @@ -624,7 +644,7 @@ def scale(replicas, **kwargs): # Verify that the scaling took place as expected max_wait = kwargs.get("max_wait", DEFAULT_MAX_WAIT) ctx.logger.info("Waiting up to {0} secs for {1} to scale and become ready".format(max_wait, service_component_name)) - if _verify_k8s_deployment(service_component_name, max_wait): + if _verify_k8s_deployment(deployment_description["location"], service_component_name, max_wait): ctx.logger.info("Scaling complete: {0} from {1} to {2} replica(s)".format(service_component_name, current_replicas, replicas)) else: @@ -647,7 +667,7 @@ def update_image(image, **kwargs): # Verify that the update took place as expected max_wait = kwargs.get("max_wait", DEFAULT_MAX_WAIT) ctx.logger.info("Waiting up to {0} secs for {1} to be updated and become ready".format(max_wait, service_component_name)) - if _verify_k8s_deployment(service_component_name, max_wait): + if _verify_k8s_deployment(deployment_description["location"], service_component_name, max_wait): ctx.logger.info("Update complete: {0} from {1} to {2}".format(service_component_name, current_image, image)) else: diff --git a/k8s/pom.xml b/k8s/pom.xml index 3bde42e..22db83c 100644 --- a/k8s/pom.xml +++ b/k8s/pom.xml @@ -28,7 +28,7 @@ ECOMP is a trademark and service mark of AT&T Intellectual Property. org.onap.dcaegen2.platform.plugins k8s k8s-plugin - 1.4.8-SNAPSHOT + 1.4.10-SNAPSHOT http://maven.apache.org UTF-8 diff --git a/k8s/setup.py b/k8s/setup.py index a64efc8..12f1f5e 100644 --- a/k8s/setup.py +++ b/k8s/setup.py @@ -23,7 +23,7 @@ from setuptools import setup setup( name='k8splugin', description='Cloudify plugin for containerized components deployed using Kubernetes', - version="1.4.9", + version="1.4.10", author='J. F. Lucas, Michael Hwang, Tommy Carpenter', packages=['k8splugin','k8sclient','msb','configure'], zip_safe=False, diff --git a/k8s/tests/test_k8sclient.py b/k8s/tests/test_k8sclient.py index 2511239..43939ad 100644 --- a/k8s/tests/test_k8sclient.py +++ b/k8s/tests/test_k8sclient.py @@ -1,7 +1,7 @@ # ============LICENSE_START======================================================= # org.onap.dcae # ================================================================================ -# Copyright (c) 2018 AT&T Intellectual Property. All rights reserved. +# Copyright (c) 2018-2019 AT&T Intellectual Property. All rights reserved. # ================================================================================ # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -205,7 +205,7 @@ def test_deploy(monkeypatch): monkeypatch.setattr(ext,"create_namespaced_deployment", pseudo_deploy) return ext - def pseudo_configure(): + def pseudo_configure(loc): pass monkeypatch.setattr(k8sclient.k8sclient,"_configure_api", pseudo_configure) diff --git a/k8s/tests/test_tasks.py b/k8s/tests/test_tasks.py index cf78860..d56a443 100644 --- a/k8s/tests/test_tasks.py +++ b/k8s/tests/test_tasks.py @@ -1,7 +1,7 @@ # ============LICENSE_START======================================================= # org.onap.dcae # ================================================================================ -# Copyright (c) 2017-2018 AT&T Intellectual Property. All rights reserved. +# Copyright (c) 2017-2019 AT&T Intellectual Property. All rights reserved. # ================================================================================ # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -194,21 +194,21 @@ def test_verify_container(monkeypatch, mockconfig): from k8splugin import tasks from k8splugin.exceptions import DockerPluginDeploymentError - def fake_is_available_success(ch, scn): + def fake_is_available_success(loc, ch, scn): return True monkeypatch.setattr(k8sclient, "is_available", fake_is_available_success) - assert tasks._verify_k8s_deployment("some-name", 3) + assert tasks._verify_k8s_deployment("some-location","some-name", 3) - def fake_is_available_never_good(ch, scn): + def fake_is_available_never_good(loc, ch, scn): return False monkeypatch.setattr(k8sclient, "is_available", fake_is_available_never_good) - assert not tasks._verify_k8s_deployment("some-name", 2) + assert not tasks._verify_k8s_deployment("some-location", "some-name", 2) def test_update_delivery_url(monkeypatch, mockconfig): -- cgit 1.2.3-korg