diff options
Diffstat (limited to 'k8s/k8splugin')
-rw-r--r-- | k8s/k8splugin/decorators.py | 8 | ||||
-rw-r--r-- | k8s/k8splugin/discovery.py | 16 | ||||
-rw-r--r-- | k8s/k8splugin/tasks.py | 165 |
3 files changed, 87 insertions, 102 deletions
diff --git a/k8s/k8splugin/decorators.py b/k8s/k8splugin/decorators.py index 59d14d8..b9b32bf 100644 --- a/k8s/k8splugin/decorators.py +++ b/k8s/k8splugin/decorators.py @@ -19,12 +19,14 @@ # ECOMP is a trademark and service mark of AT&T Intellectual Property. import copy + from cloudify import ctx from cloudify.exceptions import NonRecoverableError, RecoverableError + from k8splugin import discovery as dis -from k8splugin.exceptions import DockerPluginDeploymentError, \ - DockerPluginDependencyNotReadyError from k8splugin import utils +from k8splugin.exceptions import (DockerPluginDependencyNotReadyError, + DockerPluginDeploymentError) def monkeypatch_loggers(task_func): @@ -111,4 +113,4 @@ def wrap_error_handling_update(update_func): ctx.logger.error ("Unexpected error during update operation: {0}".format(str(e))) raise NonRecoverableError(e) - return wrapper
\ No newline at end of file + return wrapper diff --git a/k8s/k8splugin/discovery.py b/k8s/k8splugin/discovery.py index f3b87b6..56f8260 100644 --- a/k8s/k8splugin/discovery.py +++ b/k8s/k8splugin/discovery.py @@ -18,14 +18,14 @@ # # ECOMP is a trademark and service mark of AT&T Intellectual Property. -from functools import partial import json import logging -import uuid -import requests -import consul import re +import uuid +from functools import partial +import consul +import requests logger = logging.getLogger("discovery") @@ -54,16 +54,16 @@ def _wrap_consul_call(consul_func, *args, **kwargs): def generate_service_component_name(service_component_type): """Generate service component id used to pass into the service component instance and used as the key to the service component configuration. - + Updated for use with Kubernetes. Sometimes the service component name gets used in Kubernetes in contexts (such as naming a Kubernetes Service) that - requires the name to conform to the RFC1035 DNS "label" syntax: + requires the name to conform to the RFC1035 DNS "label" syntax: -- starts with an alpha -- contains only of alphanumerics and "-" -- <= 63 characters long Format: - s<service component id>-<service component type>, + s<service component id>-<service component type>, truncated to 63 characters, "_" replaced with "-" in service_component_type, other non-conforming characters removed from service_component_type """ @@ -197,7 +197,7 @@ def is_healthy(consul_host, instance): def add_to_entry(conn, key, add_name, add_value): """ - Find 'key' in consul. + Find 'key' in consul. Treat its value as a JSON string representing a dict. Extend the dict by adding an entry with key 'add_name' and value 'add_value'. Turn the resulting extended dict into a JSON string. diff --git a/k8s/k8splugin/tasks.py b/k8s/k8splugin/tasks.py index 4205122..c9df9f4 100644 --- a/k8s/k8splugin/tasks.py +++ b/k8s/k8splugin/tasks.py @@ -24,6 +24,7 @@ import cloudify_importer import time, copy +import json from cloudify import ctx from cloudify.decorators import operation from cloudify.exceptions import NonRecoverableError, RecoverableError @@ -103,7 +104,7 @@ def _done_for_create(**kwargs): @Policies.gather_policies_to_node() @operation def create_for_components(**create_inputs): - """Create step for Docker containers that are components + """Create step for service components This interface is responsible for: @@ -185,7 +186,7 @@ def _setup_for_discovery_streams(**kwargs): @Policies.gather_policies_to_node() @operation def create_for_components_with_streams(**create_inputs): - """Create step for Docker containers that are components that use DMaaP + """Create step for service components that use DMaaP This interface is responsible for: @@ -206,7 +207,7 @@ def create_for_components_with_streams(**create_inputs): @monkeypatch_loggers @operation def create_for_platforms(**create_inputs): - """Create step for Docker containers that are platform components + """Create step for platform components This interface is responible for: @@ -231,8 +232,8 @@ def _lookup_service(service_component_name, consul_host=CONSUL_HOST, return results[0]["ServiceAddress"] -def _verify_container(service_component_name, max_wait): - """Verify that the container is healthy +def _verify_k8s_deployment(service_component_name, max_wait): + """Verify that the k8s Deployment is ready Args: ----- @@ -241,7 +242,7 @@ def _verify_container(service_component_name, max_wait): Return: ------- - True if component is healthy else a DockerPluginDeploymentError exception + True if deployment is ready else a DockerPluginDeploymentError exception will be raised. """ num_attempts = 1 @@ -253,7 +254,7 @@ def _verify_container(service_component_name, max_wait): num_attempts += 1 if max_wait > 0 and max_wait < num_attempts: - raise DockerPluginDeploymentError("Container never became healthy") + raise DockerPluginDeploymentError("k8s deployment never became ready for {0}".format(service_component_name)) time.sleep(1) @@ -284,7 +285,7 @@ def _create_and_start_container(container_name, image, **kwargs): env = { "CONSUL_HOST": CONSUL_INTERNAL_NAME, "CONFIG_BINDING_SERVICE": "config-binding-service" } env.update(kwargs.get("envs", {})) - ctx.logger.info("Deploying {}, image: {}, env: {}, kwargs: {}".format(container_name, image, env, kwargs)) + ctx.logger.info("Starting k8s deployment for {}, image: {}, env: {}, kwargs: {}".format(container_name, image, env, kwargs)) ctx.logger.info("Passing k8sconfig: {}".format(plugin_conf)) replicas = kwargs.get("replicas", 1) _,dep = k8sclient.deploy(DCAE_NAMESPACE, @@ -304,7 +305,7 @@ def _create_and_start_container(container_name, image, **kwargs): # Capture the result of deployment for future use ctx.instance.runtime_properties["k8s_deployment"] = dep ctx.instance.runtime_properties["replicas"] = replicas - ctx.logger.info ("Deployment complete: {0}".format(dep)) + ctx.logger.info ("k8s deployment initiated successfully for {0}: {1}".format(container_name, dep)) def _parse_cloudify_context(**kwargs): """Parse Cloudify context @@ -384,24 +385,17 @@ def _create_and_start_component(**kwargs): "readiness": kwargs.get("readiness",{})} _create_and_start_container(service_component_name, image, **sub_kwargs) - # TODO: Use regular logging here - ctx.logger.info("Container started: {0}".format(service_component_name)) - return kwargs def _verify_component(**kwargs): - """Verify component (container) is healthy""" + """Verify deployment is ready""" service_component_name = kwargs[SERVICE_COMPONENT_NAME] max_wait = kwargs.get("max_wait", 300) + ctx.logger.info("Waiting up to {0} secs for {1} to become ready".format(max_wait, service_component_name)) - # Verify that the container is healthy - - if _verify_container(service_component_name, max_wait): - service_component_name = kwargs[SERVICE_COMPONENT_NAME] - - # TODO: Use regular logging here - ctx.logger.info("Container is healthy: {0}".format(service_component_name)) + if _verify_k8s_deployment(service_component_name, max_wait): + ctx.logger.info("k8s deployment is ready for: {0}".format(service_component_name)) return kwargs @@ -426,11 +420,11 @@ def _setup_msb_registration(service_name, msb_reg): @monkeypatch_loggers @operation def create_and_start_container_for_components(**start_inputs): - """Create Docker container and start for components + """Initiate Kubernetes deployment for service components - This operation method is to be used with the DockerContainerForComponents - node type. After launching the container, the plugin will verify with Consul - that the app is up and healthy before terminating. + This operation method is to be used with the ContainerizedServiceComponent + node type. After initiating a Kubernetes deployment, the plugin will verify with Kubernetes + that the app is up and responding successfully to readiness probes. """ _done_for_start( **_verify_component( @@ -467,11 +461,11 @@ def _update_delivery_url(**kwargs): @monkeypatch_loggers @operation def create_and_start_container_for_components_with_streams(**start_inputs): - """Create Docker container and start for components that have streams + """Initiate Kubernetes deployment for service components that have streams - This operation method is to be used with the DockerContainerForComponents - node type. After launching the container, the plugin will verify with Consul - that the app is up and healthy before terminating. + This operation method is to be used with the ContainerizedServiceComponentUsingDmaap + node type. After initiating the Kubernetes deployment, the plugin will verify with + Kubernetes that the app is up and responding successfully to readiness probes. """ _done_for_start( **_update_delivery_url( @@ -485,7 +479,7 @@ def create_and_start_container_for_components_with_streams(**start_inputs): @monkeypatch_loggers @operation def create_and_start_container_for_platforms(**kwargs): - """Create Docker container and start for platform services + """Initiate Kubernetes deployment for platform components This operation method is to be used with the ContainerizedPlatformComponent node type. @@ -538,21 +532,20 @@ def create_and_start_container_for_platforms(**kwargs): kwargs["always_pull_image"] = ctx.node.properties["always_pull_image"] _create_and_start_container(service_component_name, image, **kwargs) - ctx.logger.info("Container started: {0}".format(service_component_name)) - - # Verify that the container is healthy + # Verify that the k8s deployment is ready max_wait = kwargs.get("max_wait", 300) + ctx.logger.info("Waiting up to {0} secs for {1} to become ready".format(max_wait, service_component_name)) - if _verify_container(service_component_name, max_wait): - ctx.logger.info("Container is healthy: {0}".format(service_component_name)) + if _verify_k8s_deployment(service_component_name, max_wait): + ctx.logger.info("k8s deployment ready for: {0}".format(service_component_name)) @wrap_error_handling_start @monkeypatch_loggers @operation def create_and_start_container(**kwargs): - """Create Docker container and start""" + """Initiate a Kubernetes deployment for the generic ContainerizedApplication node type""" service_component_name = ctx.node.properties["name"] ctx.instance.runtime_properties[SERVICE_COMPONENT_NAME] = service_component_name @@ -560,19 +553,16 @@ def create_and_start_container(**kwargs): _create_and_start_container(service_component_name, image,**kwargs) - ctx.logger.info("Component deployed: {0}".format(service_component_name)) - - @monkeypatch_loggers @operation def stop_and_remove_container(**kwargs): - """Stop and remove Docker container""" + """Delete Kubernetes deployment""" try: deployment_description = ctx.instance.runtime_properties["k8s_deployment"] k8sclient.undeploy(deployment_description) except Exception as e: - ctx.logger.error("Unexpected error while stopping container: {0}" + ctx.logger.error("Unexpected error while deleting k8s deployment: {0}" .format(str(e))) @wrap_error_handling_update @@ -580,41 +570,46 @@ def stop_and_remove_container(**kwargs): @operation def scale(replicas, **kwargs): """Change number of replicas in the deployment""" + service_component_name = ctx.instance.runtime_properties["service_component_name"] + if replicas > 0: current_replicas = ctx.instance.runtime_properties["replicas"] - ctx.logger.info("Scaling from {0} to {1}".format(current_replicas, replicas)) + ctx.logger.info("Scaling {0} from {1} to {2} replica(s)".format(service_component_name, current_replicas, replicas)) deployment_description = ctx.instance.runtime_properties["k8s_deployment"] k8sclient.scale(deployment_description, replicas) ctx.instance.runtime_properties["replicas"] = replicas # Verify that the scaling took place as expected max_wait = kwargs.get("max_wait", 300) - service_component_name = ctx.instance.runtime_properties["service_component_name"] - if _verify_container(service_component_name, max_wait): - ctx.logger.info("Scaling complete : {0} from {1} to {2} instance(s)".format(service_component_name, current_replicas, replicas)) + ctx.logger.info("Waiting up to {0} secs for {1} to scale and become ready".format(max_wait, service_component_name)) + if _verify_k8s_deployment(service_component_name, max_wait): + ctx.logger.info("Scaling complete: {0} from {1} to {2} replica(s)".format(service_component_name, current_replicas, replicas)) else: - ctx.logger.info("Ignoring request to scale to zero replicas") + ctx.logger.info("Ignoring request to scale {0} to zero replicas".format(service_component_name)) @wrap_error_handling_update @monkeypatch_loggers @operation def update_image(image, **kwargs): + """ Restart component with a new Docker image """ + + service_component_name = ctx.instance.runtime_properties["service_component_name"] if image: current_image = ctx.instance.runtime_properties["image"] - ctx.logger.info("Updating application container image from {0} to {1}".format(current_image, image)) + ctx.logger.info("Updating app image for {0} from {1} to {2}".format(service_component_name, current_image, image)) deployment_description = ctx.instance.runtime_properties["k8s_deployment"] k8sclient.upgrade(deployment_description, image) ctx.instance.runtime_properties["image"] = image # Verify that the update took place as expected max_wait = kwargs.get("max_wait", 300) - service_component_name = ctx.instance.runtime_properties["service_component_name"] - if _verify_container(service_component_name, max_wait): - ctx.logger.info("Update complete : {0} from {1} to {2} instance(s)".format(service_component_name, current_image, image)) + ctx.logger.info("Waiting up to {0} secs for {1} to be updated and become ready".format(max_wait, service_component_name)) + if _verify_k8s_deployment(service_component_name, max_wait): + ctx.logger.info("Update complete: {0} from {1} to {2}".format(service_component_name, current_image, image)) else: - ctx.logger.info("Ignoring update_image request with unusable image '{0}'".format(str(image))) + ctx.logger.info("Ignoring update_image request for {0} with unusable image '{1}'".format(service_component_name, str(image))) #TODO: implement rollback operation when kubernetes python client fix is available. # (See comments in k8sclient.py.) @@ -636,56 +631,40 @@ def cleanup_discovery(**kwargs): def _notify_container(**kwargs): - """Notify container using the policy section in the docker_config""" + """ + Notify container using the policy section in the docker_config. + Notification consists of running a script in the application container + in each pod in the Kubernetes deployment associated with this node. + Return the list of notification results. + """ dc = kwargs["docker_config"] + resp = [] if "policy" in dc: if dc["policy"]["trigger_type"] == "docker": - pass - """ - Need replacement for this in kubernetes. - Need to find all the pods that have been deployed - and execute the script in them. - Kubernetes does not appear to have a way to ask for a script - to be executed in all of the currently running pods for a - Kubernetes Deployment or ReplicaSet. We will have to find - each of them and run the script. The problem is that set of - pods could be changing. We can query to get all the pods, but - there's no guarantee the list won't change while we're trying to - execute the script. - - In ONAP R2, all of the policy-driven components rely on polling. - """ - """ - # REVIEW: Need to finalize on the docker config policy data structure + + # Build the command to execute in the container + # SCRIPT_PATH policies {"policies" : ...., "updated_policies" : ..., "removed_policies": ...} script_path = dc["policy"]["script_path"] - updated_policies = kwargs["updated_policies"] - removed_policies = kwargs["removed_policies"] - policies = kwargs["policies"] - cmd = doc.build_policy_update_cmd(script_path, use_sh=False, - msg_type="policies", - updated_policies=updated_policies, - removed_policies=removed_policies, - policies=policies - ) - - docker_host = kwargs[SELECTED_CONTAINER_DESTINATION] - docker_host_ip = _lookup_service(docker_host) - logins = _get_docker_logins() - client = doc.create_client(docker_host_ip, DOCKER_PORT, logins=logins) - - container_id = kwargs["container_id"] - - doc.notify_for_policy_update(client, container_id, cmd) - """ - # else the default is no trigger + policy_data = { + "policies": kwargs["policies"], + "updated_policies": kwargs["updated_policies"], + "removed_policies": kwargs["removed_policies"] + } - return kwargs + command = [script_path, "policies", json.dumps(policy_data)] + + # Execute the command + deployment_description = ctx.instance.runtime_properties["k8s_deployment"] + resp = k8sclient.execute_command_in_deployment(deployment_description, command) + # else the default is no trigger + + return resp +@operation @monkeypatch_loggers @Policies.update_policies_on_node() -@operation def policy_update(updated_policies, removed_policies=None, policies=None, **kwargs): """Policy update task @@ -696,9 +675,13 @@ def policy_update(updated_policies, removed_policies=None, policies=None, **kwar :updated_policies: contains the list of changed policy-configs when configs_only=True (default) Use configs_only=False to bring the full policy objects in :updated_policies:. """ + service_component_name = ctx.instance.runtime_properties[SERVICE_COMPONENT_NAME] + ctx.logger.info("policy_update for {0}-- updated_policies: {1}, removed_policies: {2}, policies: {3}" + .format(service_component_name, updated_policies, removed_policies, policies)) update_inputs = copy.deepcopy(ctx.instance.runtime_properties) update_inputs["updated_policies"] = updated_policies update_inputs["removed_policies"] = removed_policies update_inputs["policies"] = policies - _notify_container(**update_inputs) + resp = _notify_container(**update_inputs) + ctx.logger.info("policy_update complete for {0}--notification results: {1}".format(service_component_name,json.dumps(resp))) |