diff options
author | Michael Hwang <mhwang@research.att.com> | 2017-08-23 14:26:36 -0400 |
---|---|---|
committer | Michael Hwang <mhwang@research.att.com> | 2017-08-23 14:31:51 -0400 |
commit | 21af561cafe31681f94479e8c70f157f6e6ecc53 (patch) | |
tree | f41f69e1419867fd70af1f7697f78e3490d41b87 /docker/dockerplugin | |
parent | 94cbaca0f5d9447afe9b0f392f248470420422e5 (diff) |
Add docker and relationships plugin
Change-Id: I323599ae2965f081f2061b6791635bbeddb09811
Issue-Id: DCAEGEN2-79
Signed-off-by: Michael Hwang <mhwang@research.att.com>
Diffstat (limited to 'docker/dockerplugin')
-rw-r--r-- | docker/dockerplugin/__init__.py | 30 | ||||
-rw-r--r-- | docker/dockerplugin/decorators.py | 80 | ||||
-rw-r--r-- | docker/dockerplugin/discovery.py | 206 | ||||
-rw-r--r-- | docker/dockerplugin/exceptions.py | 29 | ||||
-rw-r--r-- | docker/dockerplugin/tasks.py | 563 | ||||
-rw-r--r-- | docker/dockerplugin/utils.py | 28 |
6 files changed, 936 insertions, 0 deletions
diff --git a/docker/dockerplugin/__init__.py b/docker/dockerplugin/__init__.py new file mode 100644 index 0000000..ef1bfec --- /dev/null +++ b/docker/dockerplugin/__init__.py @@ -0,0 +1,30 @@ +# ============LICENSE_START======================================================= +# org.onap.dcae +# ================================================================================ +# Copyright (c) 2017 AT&T Intellectual Property. All rights reserved. +# ================================================================================ +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============LICENSE_END========================================================= +# +# ECOMP is a trademark and service mark of AT&T Intellectual Property. + +# REVIEW: Tried to source the version from here but you run into import issues +# because "tasks" module is loaded. This method seems to be the PEP 396 +# recommended way and is listed #3 here https://packaging.python.org/single_source_version/ +# __version__ = '0.1.0' + +from .tasks import create_for_components, create_for_components_with_streams, \ + create_and_start_container_for_components_with_streams, \ + create_for_platforms, create_and_start_container, \ + create_and_start_container_for_components, create_and_start_container_for_platforms, \ + stop_and_remove_container, cleanup_discovery, select_docker_host, unselect_docker_host diff --git a/docker/dockerplugin/decorators.py b/docker/dockerplugin/decorators.py new file mode 100644 index 0000000..089231a --- /dev/null +++ b/docker/dockerplugin/decorators.py @@ -0,0 +1,80 @@ +# ============LICENSE_START======================================================= +# org.onap.dcae +# ================================================================================ +# Copyright (c) 2017 AT&T Intellectual Property. All rights reserved. +# ================================================================================ +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============LICENSE_END========================================================= +# +# ECOMP is a trademark and service mark of AT&T Intellectual Property. + +import copy +from cloudify import ctx +from cloudify.exceptions import NonRecoverableError, RecoverableError +from dockering import utils as doc +from dockerplugin import discovery as dis +from dockerplugin.exceptions import DockerPluginDeploymentError, \ + DockerPluginDependencyNotReadyError + + +def monkeypatch_loggers(task_func): + """Sets up the dependent loggers""" + + def wrapper(**kwargs): + # Ouch! Monkeypatch loggers + doc.logger = ctx.logger + dis.logger = ctx.logger + + return task_func(**kwargs) + + return wrapper + + +def wrap_error_handling_start(task_start_func): + """Wrap error handling for the start operations""" + + def wrapper(**kwargs): + try: + return task_start_func(**kwargs) + except DockerPluginDependencyNotReadyError as e: + # You are here because things we need like a working docker host is not + # available yet so let Cloudify try again later. + raise RecoverableError(e) + except DockerPluginDeploymentError as e: + # Container failed to come up in the allotted time. This is deemed + # non-recoverable. + raise NonRecoverableError(e) + except Exception as e: + ctx.logger.error("Unexpected error while starting container: {0}" + .format(str(e))) + raise NonRecoverableError(e) + + return wrapper + + +def merge_inputs_for_start(task_start_func): + """Merge all inputs for start operation into one dict""" + + def wrapper (**kwargs): + start_inputs = copy.deepcopy(ctx.instance.runtime_properties) + start_inputs.update(kwargs) + + # Apparently kwargs contains "ctx" which is cloudify.context.CloudifyContext + # This has to be removed and not copied into runtime_properties else you get + # JSON serialization errors. + if "ctx" in start_inputs: + del start_inputs["ctx"] + + return task_start_func(**start_inputs) + + return wrapper diff --git a/docker/dockerplugin/discovery.py b/docker/dockerplugin/discovery.py new file mode 100644 index 0000000..32a8cd0 --- /dev/null +++ b/docker/dockerplugin/discovery.py @@ -0,0 +1,206 @@ +# ============LICENSE_START======================================================= +# org.onap.dcae +# ================================================================================ +# Copyright (c) 2017 AT&T Intellectual Property. All rights reserved. +# ================================================================================ +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============LICENSE_END========================================================= +# +# ECOMP is a trademark and service mark of AT&T Intellectual Property. + +from functools import partial +import json +import logging +import uuid +import requests +import consul + + +logger = logging.getLogger("discovery") + + +class DiscoveryError(RuntimeError): + pass + +class DiscoveryConnectionError(RuntimeError): + pass + +class DiscoveryServiceNotFoundError(RuntimeError): + pass + + +def _wrap_consul_call(consul_func, *args, **kwargs): + """Wrap Consul call to map errors""" + try: + return consul_func(*args, **kwargs) + except requests.exceptions.ConnectionError as e: + raise DiscoveryConnectionError(e) + + +def generate_service_component_name(service_component_type, service_id, location_id): + """Generate service component id used to pass into the service component + instance and used as the key to the service component configuration. + + Format: + <service component id>.<service component type>.<service id>.<location id>.dcae.com + + TODO: The format will evolve. + """ + # Random generated + service_component_id = str(uuid.uuid4()) + return "{0}.{1}.{2}.{3}.dcae.com".format( + service_component_id, service_component_type, service_id, location_id) + + +def create_kv_conn(host): + """Create connection to key-value store + + Returns a Consul client to the specified Consul host""" + try: + [hostname, port] = host.split(":") + return consul.Consul(host=hostname, port=int(port)) + except ValueError as e: + return consul.Consul(host=host) + +def push_service_component_config(kv_conn, service_component_name, config): + config_string = config if isinstance(config, str) else json.dumps(config) + kv_put_func = partial(_wrap_consul_call, kv_conn.kv.put) + + if kv_put_func(service_component_name, config_string): + logger.info("Added config for {0}".format(service_component_name)) + else: + raise DiscoveryError("Failed to push configuration") + +def remove_service_component_config(kv_conn, service_component_name): + kv_delete_func = partial(_wrap_consul_call, kv_conn.kv.delete) + kv_delete_func(service_component_name) + + +def _create_rel_key(service_component_name): + return "{0}:rel".format(service_component_name) + +def store_relationship(kv_conn, source_name, target_name): + # TODO: Rel entry may already exist in a one-to-many situation. Need to + # support that. + rel_key = _create_rel_key(source_name) + rel_value = [target_name] if target_name else [] + + kv_put_func = partial(_wrap_consul_call, kv_conn.kv.put) + kv_put_func(rel_key, json.dumps(rel_value)) + logger.info("Added relationship for {0}".format(rel_key)) + +def delete_relationship(kv_conn, service_component_name): + rel_key = _create_rel_key(service_component_name) + kv_get_func = partial(_wrap_consul_call, kv_conn.kv.get) + index, rels = kv_get_func(rel_key) + + if rels: + rels = json.loads(rels["Value"].decode("utf-8")) + kv_delete_func = partial(_wrap_consul_call, kv_conn.kv.delete) + kv_delete_func(rel_key) + return rels + else: + return [] + +def lookup_service(kv_conn, service_component_name): + catalog_get_func = partial(_wrap_consul_call, kv_conn.catalog.service) + index, results = catalog_get_func(service_component_name) + + if results: + return results + else: + raise DiscoveryServiceNotFoundError("Failed to find: {0}".format(service_component_name)) + + +# TODO: Note these functions have been (for the most part) shamelessly lifted from +# dcae-cli and should really be shared. + +def _is_healthy_pure(get_health_func, instance): + """Checks to see if a component instance is running healthy + + Pure function edition + + Args + ---- + get_health_func: func(string) -> complex object + Look at unittests in test_discovery to see examples + instance: (string) fully qualified name of component instance + + Returns + ------- + True if instance has been found and is healthy else False + """ + index, resp = get_health_func(instance) + + if resp: + def is_passing(instance): + return all([check["Status"] == "passing" for check in instance["Checks"]]) + + return any([is_passing(instance) for instance in resp]) + else: + return False + +def is_healthy(consul_host, instance): + """Checks to see if a component instance is running healthy + + Impure function edition + + Args + ---- + consul_host: (string) host string of Consul + instance: (string) fully qualified name of component instance + + Returns + ------- + True if instance has been found and is healthy else False + """ + cons = create_kv_conn(consul_host) + + get_health_func = partial(_wrap_consul_call, cons.health.service) + return _is_healthy_pure(get_health_func, instance) + + +def add_to_entry(conn, key, add_name, add_value): + """ + Find 'key' in consul. + Treat its value as a JSON string representing a dict. + Extend the dict by adding an entry with key 'add_name' and value 'add_value'. + Turn the resulting extended dict into a JSON string. + Store the string back into Consul under 'key'. + Watch out for conflicting concurrent updates. + + Example: + Key 'xyz:dmaap' has the value '{"feed00": {"feed_url" : "http://example.com/feeds/999"}}' + add_to_entry('xyz:dmaap', 'topic00', {'topic_url' : 'http://example.com/topics/1229'}) + should result in the value for key 'xyz:dmaap' in consul being updated to + '{"feed00": {"feed_url" : "http://example.com/feeds/999"}, "topic00" : {"topic_url" : "http://example.com/topics/1229"}}' + """ + while True: # do until update succeeds + (index, val) = conn.kv.get(key) # index gives version of key retrieved + + if val is None: # no key yet + vstring = '{}' + mod_index = 0 # Use 0 as the cas index for initial insertion of the key + else: + vstring = val['Value'] + mod_index = val['ModifyIndex'] + + # Build the updated dict + # Exceptions just propagate + v = json.loads(vstring) + v[add_name] = add_value + new_vstring = json.dumps(v) + + updated = conn.kv.put(key, new_vstring, cas=mod_index) # if the key has changed since retrieval, this will return false + if updated: + return v diff --git a/docker/dockerplugin/exceptions.py b/docker/dockerplugin/exceptions.py new file mode 100644 index 0000000..0d8a341 --- /dev/null +++ b/docker/dockerplugin/exceptions.py @@ -0,0 +1,29 @@ +# ============LICENSE_START======================================================= +# org.onap.dcae +# ================================================================================ +# Copyright (c) 2017 AT&T Intellectual Property. All rights reserved. +# ================================================================================ +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============LICENSE_END========================================================= +# +# ECOMP is a trademark and service mark of AT&T Intellectual Property. + +class DockerPluginDeploymentError(RuntimeError): + pass + + +class DockerPluginDependencyNotReadyError(RuntimeError): + """Error to use when something that this plugin depends upon e.g. docker api, + consul is not ready""" + pass + diff --git a/docker/dockerplugin/tasks.py b/docker/dockerplugin/tasks.py new file mode 100644 index 0000000..a41f143 --- /dev/null +++ b/docker/dockerplugin/tasks.py @@ -0,0 +1,563 @@ +# ============LICENSE_START======================================================= +# org.onap.dcae +# ================================================================================ +# Copyright (c) 2017 AT&T Intellectual Property. All rights reserved. +# ================================================================================ +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============LICENSE_END========================================================= +# +# ECOMP is a trademark and service mark of AT&T Intellectual Property. + +# Lifecycle interface calls for DockerContainer + +import json, time, copy +from cloudify import ctx +from cloudify.decorators import operation +from cloudify.exceptions import NonRecoverableError, RecoverableError +import dockering as doc +from dockerplugin import discovery as dis +from dockerplugin.decorators import monkeypatch_loggers, wrap_error_handling_start, \ + merge_inputs_for_start +from dockerplugin.exceptions import DockerPluginDeploymentError, \ + DockerPluginDependencyNotReadyError +from dockerplugin import utils + +# TODO: Remove this Docker port hardcoding and query for this port instead +DOCKER_PORT = 2376 +# Always use the local Consul agent for interfacing with Consul from the plugin. +# Safe to assume that its always there. +CONSUL_HOST = "localhost" + +# Used to construct delivery urls for data router subscribers. Data router in FTL +# requires https but this author believes that ONAP is to be defaulted to http. +DEFAULT_SCHEME = "http" + +# Property keys +SERVICE_COMPONENT_NAME = "service_component_name" +SELECTED_CONTAINER_DESTINATION = "selected_container_destination" +CONTAINER_ID = "container_id" + +# Lifecycle interface calls for dcae.nodes.DockerContainer + +def _setup_for_discovery(**kwargs): + """Setup for config discovery""" + try: + name = kwargs['name'] + application_config = kwargs['application_config'] + + # NOTE: application_config is no longer a json string and is inputed as a + # YAML map which translates to a dict. We don't have to do any + # preprocessing anymore. + conn = dis.create_kv_conn(CONSUL_HOST) + dis.push_service_component_config(conn, name, application_config) + return kwargs + except dis.DiscoveryConnectionError as e: + raise RecoverableError(e) + except Exception as e: + ctx.logger.error("Unexpected error while pushing configuration: {0}" + .format(str(e))) + raise NonRecoverableError(e) + +def _generate_component_name(**kwargs): + """Generate component name""" + service_component_type = kwargs['service_component_type'] + service_id = kwargs['service_id'] + location_id = kwargs['location_id'] + + name_override = kwargs['service_component_name_override'] + + kwargs['name'] = name_override if name_override \ + else dis.generate_service_component_name(service_component_type, + service_id, location_id) + return kwargs + +def _done_for_create(**kwargs): + """Wrap up create operation""" + name = kwargs['name'] + kwargs[SERVICE_COMPONENT_NAME] = name + # All updates to the runtime_properties happens here. I don't see a reason + # why we shouldn't do this because the context is not being mutated by + # something else and will keep the other functions pure (pure in the sense + # not dealing with CloudifyContext). + ctx.instance.runtime_properties.update(kwargs) + ctx.logger.info("Done setting up: {0}".format(name)) + return kwargs + + +@monkeypatch_loggers +@operation +def create_for_components(**kwargs): + """Create step for Docker containers that are components + + This interface is responible for: + + 1. Generating service component name + 2. Populating config information into Consul + """ + _done_for_create( + **_setup_for_discovery( + **_generate_component_name( + **ctx.node.properties))) + + +def _parse_streams(**kwargs): + """Parse streams and setup for DMaaP plugin""" + # The DMaaP plugin requires this plugin to set the runtime properties + # keyed by the node name. + def setup_publishes(s): + kwargs[s["name"]] = s + + map(setup_publishes, kwargs["streams_publishes"]) + + def setup_subscribes(s): + if s["type"] == "data_router": + # If username and password has been provided then generate it. The + # DMaaP plugin doesn't generate for subscribers. The generation code + # and length of username password has been lifted from the DMaaP + # plugin. + + # Don't want to mutate the source + s = copy.deepcopy(s) + if not s.get("username", None): + s["username"] = utils.random_string(8) + if not s.get("password", None): + s["password"] = utils.random_string(10) + + kwargs[s["name"]] = s + + # NOTE: That the delivery url is constructed and setup in the start operation + map(setup_subscribes, kwargs["streams_subscribes"]) + + return kwargs + +def _setup_for_discovery_streams(**kwargs): + """Setup for discovery of streams + + Specifically, there's a race condition this call addresses for data router + subscriber case. The component needs its feed subscriber information but the + DMaaP plugin doesn't provide this until after the docker plugin start + operation. + """ + dr_subs = [kwargs[s["name"]] for s in kwargs["streams_subscribes"] \ + if s["type"] == "data_router"] + + if dr_subs: + dmaap_kv_key = "{0}:dmaap".format(kwargs["name"]) + conn = dis.create_kv_conn(CONSUL_HOST) + + def add_feed(dr_sub): + # delivery url and subscriber id will be fill by the dmaap plugin later + v = { "location": dr_sub["location"], "delivery_url": None, + "username": dr_sub["username"], "password": dr_sub["password"], + "subscriber_id": None } + return dis.add_to_entry(conn, dmaap_kv_key, dr_sub["name"], v) != None + + try: + if all(map(add_feed, dr_subs)): + return kwargs + except Exception as e: + raise NonRecoverableError(e) + + # You should never get here + raise NonRecoverableError("Failure updating feed streams in Consul") + else: + return kwargs + + +@monkeypatch_loggers +@operation +def create_for_components_with_streams(**kwargs): + """Create step for Docker containers that are components that use DMaaP + + This interface is responible for: + + 1. Generating service component name + 2. Setup runtime properties for DMaaP plugin + 3. Populating application config into Consul + 4. Populating DMaaP config for data router subscribers in Consul + """ + _done_for_create( + **_setup_for_discovery( + **_setup_for_discovery_streams( + **_parse_streams( + **_generate_component_name( + **ctx.node.properties))))) + + +@monkeypatch_loggers +@operation +def create_for_platforms(**kwargs): + """Create step for Docker containers that are platform components + + This interface is responible for: + + 1. Populating config information into Consul + """ + _done_for_create( + **_setup_for_discovery( + **ctx.node.properties)) + + +def _lookup_service(service_component_name, consul_host=CONSUL_HOST, + with_port=False): + conn = dis.create_kv_conn(consul_host) + results = dis.lookup_service(conn, service_component_name) + + if with_port: + # Just grab first + result = results[0] + return "{address}:{port}".format(address=result["ServiceAddress"], + port=result["ServicePort"]) + else: + return results[0]["ServiceAddress"] + + +def _verify_container(service_component_name, max_wait, consul_host=CONSUL_HOST): + """Verify that the container is healthy + + Args: + ----- + max_wait (integer): limit to how may attempts to make which translates to + seconds because each sleep is one second. 0 means infinite. + + Return: + ------- + True if component is healthy else a DockerPluginDeploymentError exception + will be raised. + """ + num_attempts = 1 + + while True: + if dis.is_healthy(consul_host, service_component_name): + return True + else: + num_attempts += 1 + + if max_wait > 0 and max_wait < num_attempts: + raise DockerPluginDeploymentError("Container never became healthy") + + time.sleep(1) + + +def _create_and_start_container(container_name, image, docker_host, + consul_host=CONSUL_HOST, **kwargs): + """Create and start Docker container + + This is the function that actually does more of the heavy lifting including + resolving the docker host to connect and common things to do in setting up + docker containers like making sure CONSUL_HOST gets set as the local docker + host ip. + + This method raises DockerPluginDependencyNotReadyError + """ + try: + # Setup for Docker operations + + docker_host_ip = _lookup_service(docker_host, consul_host=consul_host) + + client = doc.create_client(docker_host_ip, DOCKER_PORT) + + hcp = doc.add_host_config_params_volumes(volumes=kwargs.get("volumes", + None)) + hcp = doc.add_host_config_params_ports(ports=kwargs.get("ports", None), + host_config_params=hcp) + hcp = doc.add_host_config_params_dns(docker_host_ip, + host_config_params=hcp) + + # NOTE: The critical env variable CONSUL_HOST is being assigned the + # docker host ip itself because there should be a local Consul agent. We + # want services to register with their local Consul agent. + # CONFIG_BINDING_SERVICE is here for backwards compatibility. This is a + # well-known name now. + platform_envs = { "CONSUL_HOST": docker_host_ip, + "CONFIG_BINDING_SERVICE": "config_binding_service" } + # NOTE: The order of the envs being passed in is **important**. The + # kwargs["envs"] getting passed in last ensures that manual overrides + # will override the hardcoded envs. + envs = doc.create_envs(container_name, platform_envs, kwargs.get("envs", {})) + + # Do Docker operations + + container = doc.create_container(client, image, container_name, envs, hcp) + container_id = doc.start_container(client, container) + + return container_id + except (doc.DockerConnectionError, dis.DiscoveryConnectionError, + dis.DiscoveryServiceNotFoundError) as e: + raise DockerPluginDependencyNotReadyError(e) + + +def _enhance_docker_params(**kwargs): + """Setup Docker envs""" + docker_config = kwargs.get("docker_config", {}) + + envs = kwargs.get("envs", {}) + # NOTE: Healthchecks are optional until prepared to handle use cases that + # don't necessarily use http + envs_healthcheck = doc.create_envs_healthcheck(docker_config) \ + if "healthcheck" in docker_config else {} + envs.update(envs_healthcheck) + kwargs["envs"] = envs + + def combine_params(key, docker_config, kwargs): + v = docker_config.get(key, []) + kwargs.get(key, []) + if v: + kwargs[key] = v + return kwargs + + # Add the lists of ports and volumes unintelligently - meaning just add the + # lists together with no deduping. + kwargs = combine_params("ports", docker_config, kwargs) + kwargs = combine_params("volumes", docker_config, kwargs) + + return kwargs + +def _create_and_start_component(**kwargs): + """Create and start component (container)""" + image = kwargs["image"] + service_component_name = kwargs[SERVICE_COMPONENT_NAME] + docker_host = kwargs[SELECTED_CONTAINER_DESTINATION] + # Need to be picky and manually select out pieces because just using kwargs + # which contains everything confused the execution of + # _create_and_start_container because duplicate variables exist + sub_kwargs = { "volumes": kwargs.get("volumes", []), + "ports": kwargs.get("ports", None), "envs": kwargs.get("envs", {}) } + + container_id = _create_and_start_container(service_component_name, image, + docker_host, **sub_kwargs) + kwargs[CONTAINER_ID] = container_id + + # TODO: Use regular logging here + ctx.logger.info("Container started: {0}, {1}".format(container_id, + service_component_name)) + + return kwargs + +def _verify_component(**kwargs): + """Verify component (container) is healthy""" + service_component_name = kwargs[SERVICE_COMPONENT_NAME] + # TODO: "Consul doesn't make its first health check immediately upon registration. + # Instead it waits for the health check interval to pass." + # Possible enhancement is to read the interval (and possibly the timeout) from + # docker_config and multiply that by a number to come up with a more suitable + # max_wait. + max_wait = kwargs.get("max_wait", 300) + + # Verify that the container is healthy + + if _verify_container(service_component_name, max_wait): + container_id = kwargs[CONTAINER_ID] + service_component_name = kwargs[SERVICE_COMPONENT_NAME] + + # TODO: Use regular logging here + ctx.logger.info("Container is healthy: {0}, {1}".format(container_id, + service_component_name)) + + return kwargs + +def _done_for_start(**kwargs): + ctx.instance.runtime_properties.update(kwargs) + ctx.logger.info("Done starting: {0}".format(kwargs["name"])) + return kwargs + +@wrap_error_handling_start +@merge_inputs_for_start +@monkeypatch_loggers +@operation +def create_and_start_container_for_components(**start_inputs): + """Create Docker container and start for components + + This operation method is to be used with the DockerContainerForComponents + node type. After launching the container, the plugin will verify with Consul + that the app is up and healthy before terminating. + """ + _done_for_start( + **_verify_component( + **_create_and_start_component( + **_enhance_docker_params(**start_inputs)))) + + +def _update_delivery_url(**kwargs): + """Update the delivery url for data router subscribers""" + dr_subs = [kwargs[s["name"]] for s in kwargs["streams_subscribes"] \ + if s["type"] == "data_router"] + + if dr_subs: + service_component_name = kwargs[SERVICE_COMPONENT_NAME] + # TODO: Should NOT be setting up the delivery url with ip addresses + # because in the https case, this will not work because data router does + # a certificate validation using the fqdn. + subscriber_host = _lookup_service(service_component_name, with_port=True) + + for dr_sub in dr_subs: + scheme = dr_sub["scheme"] if "scheme" in dr_sub else DEFAULT_SCHEME + path = dr_sub["route"] + dr_sub["delivery_url"] = "{scheme}://{host}/{path}".format( + scheme=scheme, host=subscriber_host, path=path) + kwargs[dr_sub["name"]] = dr_sub + + return kwargs + +@wrap_error_handling_start +@merge_inputs_for_start +@monkeypatch_loggers +@operation +def create_and_start_container_for_components_with_streams(**start_inputs): + """Create Docker container and start for components that have streams + + This operation method is to be used with the DockerContainerForComponents + node type. After launching the container, the plugin will verify with Consul + that the app is up and healthy before terminating. + """ + _done_for_start( + **_update_delivery_url( + **_verify_component( + **_create_and_start_component( + **_enhance_docker_params(**start_inputs))))) + + +@wrap_error_handling_start +@monkeypatch_loggers +@operation +def create_and_start_container_for_platforms(**kwargs): + """Create Docker container and start for platform services + + This operation method is to be used with the DockerContainerForPlatforms + node type. After launching the container, the plugin will verify with Consul + that the app is up and healthy before terminating. + """ + image = ctx.node.properties["image"] + docker_config = ctx.node.properties.get("docker_config", {}) + service_component_name = ctx.node.properties["name"] + + docker_host = ctx.instance.runtime_properties[SELECTED_CONTAINER_DESTINATION] + + envs = kwargs.get("envs", {}) + # NOTE: Healthchecks are optional until prepared to handle use cases that + # don't necessarily use http + envs_healthcheck = doc.create_envs_healthcheck(docker_config) \ + if "healthcheck" in docker_config else {} + envs.update(envs_healthcheck) + kwargs["envs"] = envs + + host_port = ctx.node.properties["host_port"] + container_port = ctx.node.properties["container_port"] + + # Cloudify properties are all required and Cloudify complains that None + # is not a valid type for integer. Defaulting to 0 to indicate to not + # use this and not to set a specific port mapping in cases like service + # change handler. + if host_port != 0 and container_port != 0: + # Doing this because other nodes might want to use this property + port_mapping = "{cp}:{hp}".format(cp=container_port, hp=host_port) + ports = kwargs.get("ports", []) + [ port_mapping ] + kwargs["ports"] = ports + if "ports" not in kwargs: + ctx.logger.warn("No port mappings defined. Will randomly assign port.") + + container_id = _create_and_start_container(service_component_name, image, + docker_host, **kwargs) + ctx.instance.runtime_properties[CONTAINER_ID] = container_id + + ctx.logger.info("Container started: {0}, {1}".format(container_id, + service_component_name)) + + # Verify that the container is healthy + + max_wait = kwargs.get("max_wait", 300) + + if _verify_container(service_component_name, max_wait): + ctx.logger.info("Container is healthy: {0}, {1}".format(container_id, + service_component_name)) + + +@wrap_error_handling_start +@monkeypatch_loggers +@operation +def create_and_start_container(**kwargs): + """Create Docker container and start""" + service_component_name = ctx.node.properties["name"] + ctx.instance.runtime_properties[SERVICE_COMPONENT_NAME] = service_component_name + + image = ctx.node.properties["image"] + docker_host = ctx.instance.runtime_properties[SELECTED_CONTAINER_DESTINATION] + + container_id = _create_and_start_container(service_component_name, image, + docker_host, **kwargs) + ctx.instance.runtime_properties[CONTAINER_ID] = container_id + + ctx.logger.info("Container started: {0}, {1}".format(container_id, + service_component_name)) + + +@monkeypatch_loggers +@operation +def stop_and_remove_container(**kwargs): + """Stop and remove Docker container""" + try: + docker_host = ctx.instance.runtime_properties[SELECTED_CONTAINER_DESTINATION] + + docker_host_ip = _lookup_service(docker_host) + + client = doc.create_client(docker_host_ip, DOCKER_PORT) + + container_id = ctx.instance.runtime_properties[CONTAINER_ID] + doc.stop_then_remove_container(client, container_id) + + cleanup_image = kwargs.get("cleanup_image", False) + + if cleanup_image: + image = ctx.node.properties["image"] + + if doc.remove_image(client, image): + ctx.logger.info("Removed Docker image: {0}".format(image)) + else: + ctx.logger.warn("Couldnot remove Docker image: {0}".format(image)) + except (doc.DockerConnectionError, dis.DiscoveryConnectionError, + dis.DiscoveryServiceNotFoundError) as e: + raise RecoverableError(e) + except Exception as e: + ctx.logger.error("Unexpected error while stopping container: {0}" + .format(str(e))) + raise NonRecoverableError(e) + +@monkeypatch_loggers +@operation +def cleanup_discovery(**kwargs): + """Delete configuration from Consul""" + service_component_name = ctx.instance.runtime_properties[SERVICE_COMPONENT_NAME] + + try: + conn = dis.create_kv_conn(CONSUL_HOST) + dis.remove_service_component_config(conn, service_component_name) + except dis.DiscoveryConnectionError as e: + raise RecoverableError(e) + + +# Lifecycle interface calls for dcae.nodes.DockerHost + +@operation +def select_docker_host(**kwargs): + selected_docker_host = ctx.node.properties['docker_host_override'] + + if selected_docker_host: + ctx.instance.runtime_properties[SERVICE_COMPONENT_NAME] = selected_docker_host + ctx.logger.info("Selected Docker host: {0}".format(selected_docker_host)) + else: + raise NonRecoverableError("Failed to find a suitable Docker host") + +@operation +def unselect_docker_host(**kwargs): + del ctx.instance.runtime_properties[SERVICE_COMPONENT_NAME] + ctx.logger.info("Unselected Docker host") + diff --git a/docker/dockerplugin/utils.py b/docker/dockerplugin/utils.py new file mode 100644 index 0000000..ed680c2 --- /dev/null +++ b/docker/dockerplugin/utils.py @@ -0,0 +1,28 @@ +# ============LICENSE_START======================================================= +# org.onap.dcae +# ================================================================================ +# Copyright (c) 2017 AT&T Intellectual Property. All rights reserved. +# ================================================================================ +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============LICENSE_END========================================================= +# +# ECOMP is a trademark and service mark of AT&T Intellectual Property. + +import string +import random + + +def random_string(n): + """Random generate an ascii string of "n" length""" + corpus = string.ascii_lowercase + string.ascii_uppercase + string.digits + return ''.join(random.choice(corpus) for x in range(n)) |