diff options
author | Michael Hwang <mhwang@research.att.com> | 2017-08-23 14:26:36 -0400 |
---|---|---|
committer | Michael Hwang <mhwang@research.att.com> | 2017-08-23 14:31:51 -0400 |
commit | 21af561cafe31681f94479e8c70f157f6e6ecc53 (patch) | |
tree | f41f69e1419867fd70af1f7697f78e3490d41b87 /docker/dockerplugin/tasks.py | |
parent | 94cbaca0f5d9447afe9b0f392f248470420422e5 (diff) |
Add docker and relationships plugin
Change-Id: I323599ae2965f081f2061b6791635bbeddb09811
Issue-Id: DCAEGEN2-79
Signed-off-by: Michael Hwang <mhwang@research.att.com>
Diffstat (limited to 'docker/dockerplugin/tasks.py')
-rw-r--r-- | docker/dockerplugin/tasks.py | 563 |
1 files changed, 563 insertions, 0 deletions
diff --git a/docker/dockerplugin/tasks.py b/docker/dockerplugin/tasks.py new file mode 100644 index 0000000..a41f143 --- /dev/null +++ b/docker/dockerplugin/tasks.py @@ -0,0 +1,563 @@ +# ============LICENSE_START======================================================= +# org.onap.dcae +# ================================================================================ +# Copyright (c) 2017 AT&T Intellectual Property. All rights reserved. +# ================================================================================ +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============LICENSE_END========================================================= +# +# ECOMP is a trademark and service mark of AT&T Intellectual Property. + +# Lifecycle interface calls for DockerContainer + +import json, time, copy +from cloudify import ctx +from cloudify.decorators import operation +from cloudify.exceptions import NonRecoverableError, RecoverableError +import dockering as doc +from dockerplugin import discovery as dis +from dockerplugin.decorators import monkeypatch_loggers, wrap_error_handling_start, \ + merge_inputs_for_start +from dockerplugin.exceptions import DockerPluginDeploymentError, \ + DockerPluginDependencyNotReadyError +from dockerplugin import utils + +# TODO: Remove this Docker port hardcoding and query for this port instead +DOCKER_PORT = 2376 +# Always use the local Consul agent for interfacing with Consul from the plugin. +# Safe to assume that its always there. +CONSUL_HOST = "localhost" + +# Used to construct delivery urls for data router subscribers. Data router in FTL +# requires https but this author believes that ONAP is to be defaulted to http. +DEFAULT_SCHEME = "http" + +# Property keys +SERVICE_COMPONENT_NAME = "service_component_name" +SELECTED_CONTAINER_DESTINATION = "selected_container_destination" +CONTAINER_ID = "container_id" + +# Lifecycle interface calls for dcae.nodes.DockerContainer + +def _setup_for_discovery(**kwargs): + """Setup for config discovery""" + try: + name = kwargs['name'] + application_config = kwargs['application_config'] + + # NOTE: application_config is no longer a json string and is inputed as a + # YAML map which translates to a dict. We don't have to do any + # preprocessing anymore. + conn = dis.create_kv_conn(CONSUL_HOST) + dis.push_service_component_config(conn, name, application_config) + return kwargs + except dis.DiscoveryConnectionError as e: + raise RecoverableError(e) + except Exception as e: + ctx.logger.error("Unexpected error while pushing configuration: {0}" + .format(str(e))) + raise NonRecoverableError(e) + +def _generate_component_name(**kwargs): + """Generate component name""" + service_component_type = kwargs['service_component_type'] + service_id = kwargs['service_id'] + location_id = kwargs['location_id'] + + name_override = kwargs['service_component_name_override'] + + kwargs['name'] = name_override if name_override \ + else dis.generate_service_component_name(service_component_type, + service_id, location_id) + return kwargs + +def _done_for_create(**kwargs): + """Wrap up create operation""" + name = kwargs['name'] + kwargs[SERVICE_COMPONENT_NAME] = name + # All updates to the runtime_properties happens here. I don't see a reason + # why we shouldn't do this because the context is not being mutated by + # something else and will keep the other functions pure (pure in the sense + # not dealing with CloudifyContext). + ctx.instance.runtime_properties.update(kwargs) + ctx.logger.info("Done setting up: {0}".format(name)) + return kwargs + + +@monkeypatch_loggers +@operation +def create_for_components(**kwargs): + """Create step for Docker containers that are components + + This interface is responible for: + + 1. Generating service component name + 2. Populating config information into Consul + """ + _done_for_create( + **_setup_for_discovery( + **_generate_component_name( + **ctx.node.properties))) + + +def _parse_streams(**kwargs): + """Parse streams and setup for DMaaP plugin""" + # The DMaaP plugin requires this plugin to set the runtime properties + # keyed by the node name. + def setup_publishes(s): + kwargs[s["name"]] = s + + map(setup_publishes, kwargs["streams_publishes"]) + + def setup_subscribes(s): + if s["type"] == "data_router": + # If username and password has been provided then generate it. The + # DMaaP plugin doesn't generate for subscribers. The generation code + # and length of username password has been lifted from the DMaaP + # plugin. + + # Don't want to mutate the source + s = copy.deepcopy(s) + if not s.get("username", None): + s["username"] = utils.random_string(8) + if not s.get("password", None): + s["password"] = utils.random_string(10) + + kwargs[s["name"]] = s + + # NOTE: That the delivery url is constructed and setup in the start operation + map(setup_subscribes, kwargs["streams_subscribes"]) + + return kwargs + +def _setup_for_discovery_streams(**kwargs): + """Setup for discovery of streams + + Specifically, there's a race condition this call addresses for data router + subscriber case. The component needs its feed subscriber information but the + DMaaP plugin doesn't provide this until after the docker plugin start + operation. + """ + dr_subs = [kwargs[s["name"]] for s in kwargs["streams_subscribes"] \ + if s["type"] == "data_router"] + + if dr_subs: + dmaap_kv_key = "{0}:dmaap".format(kwargs["name"]) + conn = dis.create_kv_conn(CONSUL_HOST) + + def add_feed(dr_sub): + # delivery url and subscriber id will be fill by the dmaap plugin later + v = { "location": dr_sub["location"], "delivery_url": None, + "username": dr_sub["username"], "password": dr_sub["password"], + "subscriber_id": None } + return dis.add_to_entry(conn, dmaap_kv_key, dr_sub["name"], v) != None + + try: + if all(map(add_feed, dr_subs)): + return kwargs + except Exception as e: + raise NonRecoverableError(e) + + # You should never get here + raise NonRecoverableError("Failure updating feed streams in Consul") + else: + return kwargs + + +@monkeypatch_loggers +@operation +def create_for_components_with_streams(**kwargs): + """Create step for Docker containers that are components that use DMaaP + + This interface is responible for: + + 1. Generating service component name + 2. Setup runtime properties for DMaaP plugin + 3. Populating application config into Consul + 4. Populating DMaaP config for data router subscribers in Consul + """ + _done_for_create( + **_setup_for_discovery( + **_setup_for_discovery_streams( + **_parse_streams( + **_generate_component_name( + **ctx.node.properties))))) + + +@monkeypatch_loggers +@operation +def create_for_platforms(**kwargs): + """Create step for Docker containers that are platform components + + This interface is responible for: + + 1. Populating config information into Consul + """ + _done_for_create( + **_setup_for_discovery( + **ctx.node.properties)) + + +def _lookup_service(service_component_name, consul_host=CONSUL_HOST, + with_port=False): + conn = dis.create_kv_conn(consul_host) + results = dis.lookup_service(conn, service_component_name) + + if with_port: + # Just grab first + result = results[0] + return "{address}:{port}".format(address=result["ServiceAddress"], + port=result["ServicePort"]) + else: + return results[0]["ServiceAddress"] + + +def _verify_container(service_component_name, max_wait, consul_host=CONSUL_HOST): + """Verify that the container is healthy + + Args: + ----- + max_wait (integer): limit to how may attempts to make which translates to + seconds because each sleep is one second. 0 means infinite. + + Return: + ------- + True if component is healthy else a DockerPluginDeploymentError exception + will be raised. + """ + num_attempts = 1 + + while True: + if dis.is_healthy(consul_host, service_component_name): + return True + else: + num_attempts += 1 + + if max_wait > 0 and max_wait < num_attempts: + raise DockerPluginDeploymentError("Container never became healthy") + + time.sleep(1) + + +def _create_and_start_container(container_name, image, docker_host, + consul_host=CONSUL_HOST, **kwargs): + """Create and start Docker container + + This is the function that actually does more of the heavy lifting including + resolving the docker host to connect and common things to do in setting up + docker containers like making sure CONSUL_HOST gets set as the local docker + host ip. + + This method raises DockerPluginDependencyNotReadyError + """ + try: + # Setup for Docker operations + + docker_host_ip = _lookup_service(docker_host, consul_host=consul_host) + + client = doc.create_client(docker_host_ip, DOCKER_PORT) + + hcp = doc.add_host_config_params_volumes(volumes=kwargs.get("volumes", + None)) + hcp = doc.add_host_config_params_ports(ports=kwargs.get("ports", None), + host_config_params=hcp) + hcp = doc.add_host_config_params_dns(docker_host_ip, + host_config_params=hcp) + + # NOTE: The critical env variable CONSUL_HOST is being assigned the + # docker host ip itself because there should be a local Consul agent. We + # want services to register with their local Consul agent. + # CONFIG_BINDING_SERVICE is here for backwards compatibility. This is a + # well-known name now. + platform_envs = { "CONSUL_HOST": docker_host_ip, + "CONFIG_BINDING_SERVICE": "config_binding_service" } + # NOTE: The order of the envs being passed in is **important**. The + # kwargs["envs"] getting passed in last ensures that manual overrides + # will override the hardcoded envs. + envs = doc.create_envs(container_name, platform_envs, kwargs.get("envs", {})) + + # Do Docker operations + + container = doc.create_container(client, image, container_name, envs, hcp) + container_id = doc.start_container(client, container) + + return container_id + except (doc.DockerConnectionError, dis.DiscoveryConnectionError, + dis.DiscoveryServiceNotFoundError) as e: + raise DockerPluginDependencyNotReadyError(e) + + +def _enhance_docker_params(**kwargs): + """Setup Docker envs""" + docker_config = kwargs.get("docker_config", {}) + + envs = kwargs.get("envs", {}) + # NOTE: Healthchecks are optional until prepared to handle use cases that + # don't necessarily use http + envs_healthcheck = doc.create_envs_healthcheck(docker_config) \ + if "healthcheck" in docker_config else {} + envs.update(envs_healthcheck) + kwargs["envs"] = envs + + def combine_params(key, docker_config, kwargs): + v = docker_config.get(key, []) + kwargs.get(key, []) + if v: + kwargs[key] = v + return kwargs + + # Add the lists of ports and volumes unintelligently - meaning just add the + # lists together with no deduping. + kwargs = combine_params("ports", docker_config, kwargs) + kwargs = combine_params("volumes", docker_config, kwargs) + + return kwargs + +def _create_and_start_component(**kwargs): + """Create and start component (container)""" + image = kwargs["image"] + service_component_name = kwargs[SERVICE_COMPONENT_NAME] + docker_host = kwargs[SELECTED_CONTAINER_DESTINATION] + # Need to be picky and manually select out pieces because just using kwargs + # which contains everything confused the execution of + # _create_and_start_container because duplicate variables exist + sub_kwargs = { "volumes": kwargs.get("volumes", []), + "ports": kwargs.get("ports", None), "envs": kwargs.get("envs", {}) } + + container_id = _create_and_start_container(service_component_name, image, + docker_host, **sub_kwargs) + kwargs[CONTAINER_ID] = container_id + + # TODO: Use regular logging here + ctx.logger.info("Container started: {0}, {1}".format(container_id, + service_component_name)) + + return kwargs + +def _verify_component(**kwargs): + """Verify component (container) is healthy""" + service_component_name = kwargs[SERVICE_COMPONENT_NAME] + # TODO: "Consul doesn't make its first health check immediately upon registration. + # Instead it waits for the health check interval to pass." + # Possible enhancement is to read the interval (and possibly the timeout) from + # docker_config and multiply that by a number to come up with a more suitable + # max_wait. + max_wait = kwargs.get("max_wait", 300) + + # Verify that the container is healthy + + if _verify_container(service_component_name, max_wait): + container_id = kwargs[CONTAINER_ID] + service_component_name = kwargs[SERVICE_COMPONENT_NAME] + + # TODO: Use regular logging here + ctx.logger.info("Container is healthy: {0}, {1}".format(container_id, + service_component_name)) + + return kwargs + +def _done_for_start(**kwargs): + ctx.instance.runtime_properties.update(kwargs) + ctx.logger.info("Done starting: {0}".format(kwargs["name"])) + return kwargs + +@wrap_error_handling_start +@merge_inputs_for_start +@monkeypatch_loggers +@operation +def create_and_start_container_for_components(**start_inputs): + """Create Docker container and start for components + + This operation method is to be used with the DockerContainerForComponents + node type. After launching the container, the plugin will verify with Consul + that the app is up and healthy before terminating. + """ + _done_for_start( + **_verify_component( + **_create_and_start_component( + **_enhance_docker_params(**start_inputs)))) + + +def _update_delivery_url(**kwargs): + """Update the delivery url for data router subscribers""" + dr_subs = [kwargs[s["name"]] for s in kwargs["streams_subscribes"] \ + if s["type"] == "data_router"] + + if dr_subs: + service_component_name = kwargs[SERVICE_COMPONENT_NAME] + # TODO: Should NOT be setting up the delivery url with ip addresses + # because in the https case, this will not work because data router does + # a certificate validation using the fqdn. + subscriber_host = _lookup_service(service_component_name, with_port=True) + + for dr_sub in dr_subs: + scheme = dr_sub["scheme"] if "scheme" in dr_sub else DEFAULT_SCHEME + path = dr_sub["route"] + dr_sub["delivery_url"] = "{scheme}://{host}/{path}".format( + scheme=scheme, host=subscriber_host, path=path) + kwargs[dr_sub["name"]] = dr_sub + + return kwargs + +@wrap_error_handling_start +@merge_inputs_for_start +@monkeypatch_loggers +@operation +def create_and_start_container_for_components_with_streams(**start_inputs): + """Create Docker container and start for components that have streams + + This operation method is to be used with the DockerContainerForComponents + node type. After launching the container, the plugin will verify with Consul + that the app is up and healthy before terminating. + """ + _done_for_start( + **_update_delivery_url( + **_verify_component( + **_create_and_start_component( + **_enhance_docker_params(**start_inputs))))) + + +@wrap_error_handling_start +@monkeypatch_loggers +@operation +def create_and_start_container_for_platforms(**kwargs): + """Create Docker container and start for platform services + + This operation method is to be used with the DockerContainerForPlatforms + node type. After launching the container, the plugin will verify with Consul + that the app is up and healthy before terminating. + """ + image = ctx.node.properties["image"] + docker_config = ctx.node.properties.get("docker_config", {}) + service_component_name = ctx.node.properties["name"] + + docker_host = ctx.instance.runtime_properties[SELECTED_CONTAINER_DESTINATION] + + envs = kwargs.get("envs", {}) + # NOTE: Healthchecks are optional until prepared to handle use cases that + # don't necessarily use http + envs_healthcheck = doc.create_envs_healthcheck(docker_config) \ + if "healthcheck" in docker_config else {} + envs.update(envs_healthcheck) + kwargs["envs"] = envs + + host_port = ctx.node.properties["host_port"] + container_port = ctx.node.properties["container_port"] + + # Cloudify properties are all required and Cloudify complains that None + # is not a valid type for integer. Defaulting to 0 to indicate to not + # use this and not to set a specific port mapping in cases like service + # change handler. + if host_port != 0 and container_port != 0: + # Doing this because other nodes might want to use this property + port_mapping = "{cp}:{hp}".format(cp=container_port, hp=host_port) + ports = kwargs.get("ports", []) + [ port_mapping ] + kwargs["ports"] = ports + if "ports" not in kwargs: + ctx.logger.warn("No port mappings defined. Will randomly assign port.") + + container_id = _create_and_start_container(service_component_name, image, + docker_host, **kwargs) + ctx.instance.runtime_properties[CONTAINER_ID] = container_id + + ctx.logger.info("Container started: {0}, {1}".format(container_id, + service_component_name)) + + # Verify that the container is healthy + + max_wait = kwargs.get("max_wait", 300) + + if _verify_container(service_component_name, max_wait): + ctx.logger.info("Container is healthy: {0}, {1}".format(container_id, + service_component_name)) + + +@wrap_error_handling_start +@monkeypatch_loggers +@operation +def create_and_start_container(**kwargs): + """Create Docker container and start""" + service_component_name = ctx.node.properties["name"] + ctx.instance.runtime_properties[SERVICE_COMPONENT_NAME] = service_component_name + + image = ctx.node.properties["image"] + docker_host = ctx.instance.runtime_properties[SELECTED_CONTAINER_DESTINATION] + + container_id = _create_and_start_container(service_component_name, image, + docker_host, **kwargs) + ctx.instance.runtime_properties[CONTAINER_ID] = container_id + + ctx.logger.info("Container started: {0}, {1}".format(container_id, + service_component_name)) + + +@monkeypatch_loggers +@operation +def stop_and_remove_container(**kwargs): + """Stop and remove Docker container""" + try: + docker_host = ctx.instance.runtime_properties[SELECTED_CONTAINER_DESTINATION] + + docker_host_ip = _lookup_service(docker_host) + + client = doc.create_client(docker_host_ip, DOCKER_PORT) + + container_id = ctx.instance.runtime_properties[CONTAINER_ID] + doc.stop_then_remove_container(client, container_id) + + cleanup_image = kwargs.get("cleanup_image", False) + + if cleanup_image: + image = ctx.node.properties["image"] + + if doc.remove_image(client, image): + ctx.logger.info("Removed Docker image: {0}".format(image)) + else: + ctx.logger.warn("Couldnot remove Docker image: {0}".format(image)) + except (doc.DockerConnectionError, dis.DiscoveryConnectionError, + dis.DiscoveryServiceNotFoundError) as e: + raise RecoverableError(e) + except Exception as e: + ctx.logger.error("Unexpected error while stopping container: {0}" + .format(str(e))) + raise NonRecoverableError(e) + +@monkeypatch_loggers +@operation +def cleanup_discovery(**kwargs): + """Delete configuration from Consul""" + service_component_name = ctx.instance.runtime_properties[SERVICE_COMPONENT_NAME] + + try: + conn = dis.create_kv_conn(CONSUL_HOST) + dis.remove_service_component_config(conn, service_component_name) + except dis.DiscoveryConnectionError as e: + raise RecoverableError(e) + + +# Lifecycle interface calls for dcae.nodes.DockerHost + +@operation +def select_docker_host(**kwargs): + selected_docker_host = ctx.node.properties['docker_host_override'] + + if selected_docker_host: + ctx.instance.runtime_properties[SERVICE_COMPONENT_NAME] = selected_docker_host + ctx.logger.info("Selected Docker host: {0}".format(selected_docker_host)) + else: + raise NonRecoverableError("Failed to find a suitable Docker host") + +@operation +def unselect_docker_host(**kwargs): + del ctx.instance.runtime_properties[SERVICE_COMPONENT_NAME] + ctx.logger.info("Unselected Docker host") + |