diff options
author | Miroslav Los <miroslav.los@pantheon.tech> | 2019-12-18 13:10:24 +0100 |
---|---|---|
committer | Miroslav Los <miroslav.los@pantheon.tech> | 2020-01-07 10:25:09 +0100 |
commit | e0b1912b608523881e43e5d7e22610fafba8fac4 (patch) | |
tree | 544d4d830a40c0ebe790ca24fb539ca203fa1492 /docker/dockerplugin | |
parent | f31bd85266b8bdb7d95bb6f6e2f6d48967278f9a (diff) |
Remove obsolete cdap and docker plugins
Signed-off-by: Miroslav Los <miroslav.los@pantheon.tech>
Issue-ID: DCAEGEN2-1987
Change-Id: I7e7114458a2931b8f1baf915f1714ee8465b86e5
Diffstat (limited to 'docker/dockerplugin')
-rw-r--r-- | docker/dockerplugin/__init__.py | 31 | ||||
-rw-r--r-- | docker/dockerplugin/decorators.py | 102 | ||||
-rw-r--r-- | docker/dockerplugin/discovery.py | 257 | ||||
-rw-r--r-- | docker/dockerplugin/exceptions.py | 29 | ||||
-rw-r--r-- | docker/dockerplugin/tasks.py | 672 | ||||
-rw-r--r-- | docker/dockerplugin/utils.py | 44 |
6 files changed, 0 insertions, 1135 deletions
diff --git a/docker/dockerplugin/__init__.py b/docker/dockerplugin/__init__.py deleted file mode 100644 index 669e196..0000000 --- a/docker/dockerplugin/__init__.py +++ /dev/null @@ -1,31 +0,0 @@ -# ============LICENSE_START======================================================= -# org.onap.dcae -# ================================================================================ -# Copyright (c) 2017 AT&T Intellectual Property. All rights reserved. -# ================================================================================ -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# ============LICENSE_END========================================================= -# -# ECOMP is a trademark and service mark of AT&T Intellectual Property. - -# REVIEW: Tried to source the version from here but you run into import issues -# because "tasks" module is loaded. This method seems to be the PEP 396 -# recommended way and is listed #3 here https://packaging.python.org/single_source_version/ -# __version__ = '0.1.0' - -from .tasks import create_for_components, create_for_components_with_streams, \ - create_and_start_container_for_components_with_streams, \ - create_for_platforms, create_and_start_container, \ - create_and_start_container_for_components, create_and_start_container_for_platforms, \ - stop_and_remove_container, cleanup_discovery, select_docker_host, unselect_docker_host, \ - policy_update diff --git a/docker/dockerplugin/decorators.py b/docker/dockerplugin/decorators.py deleted file mode 100644 index f83263b..0000000 --- a/docker/dockerplugin/decorators.py +++ /dev/null @@ -1,102 +0,0 @@ -# ============LICENSE_START======================================================= -# org.onap.dcae -# ================================================================================ -# Copyright (c) 2017 AT&T Intellectual Property. All rights reserved. -# ================================================================================ -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# ============LICENSE_END========================================================= -# -# ECOMP is a trademark and service mark of AT&T Intellectual Property. - -import copy -from cloudify import ctx -from cloudify.exceptions import NonRecoverableError, RecoverableError -from dockering import utils as doc -from dockerplugin import discovery as dis -from dockerplugin.exceptions import DockerPluginDeploymentError, \ - DockerPluginDependencyNotReadyError -from dockerplugin import utils - - -def monkeypatch_loggers(task_func): - """Sets up the dependent loggers""" - - def wrapper(**kwargs): - # Ouch! Monkeypatch loggers - doc.logger = ctx.logger - dis.logger = ctx.logger - - return task_func(**kwargs) - - return wrapper - - -def wrap_error_handling_start(task_start_func): - """Wrap error handling for the start operations""" - - def wrapper(**kwargs): - try: - return task_start_func(**kwargs) - except DockerPluginDependencyNotReadyError as e: - # You are here because things we need like a working docker host is not - # available yet so let Cloudify try again later. - raise RecoverableError(e) - except DockerPluginDeploymentError as e: - # Container failed to come up in the allotted time. This is deemed - # non-recoverable. - raise NonRecoverableError(e) - except Exception as e: - ctx.logger.error("Unexpected error while starting container: {0}" - .format(str(e))) - raise NonRecoverableError(e) - - return wrapper - - -def _wrapper_merge_inputs(task_func, properties, **kwargs): - """Merge Cloudify properties with input kwargs before calling task func""" - inputs = copy.deepcopy(properties) - # Recursively update - utils.update_dict(inputs, kwargs) - - # Apparently kwargs contains "ctx" which is cloudify.context.CloudifyContext - # This has to be removed and not copied into runtime_properties else you get - # JSON serialization errors. - if "ctx" in inputs: - del inputs["ctx"] - - return task_func(**inputs) - -def merge_inputs_for_create(task_create_func): - """Merge all inputs for start operation into one dict""" - - # Needed to wrap the wrapper because I was seeing issues with - # "RuntimeError: No context set in current execution thread" - def wrapper(**kwargs): - # NOTE: ctx.node.properties is an ImmutableProperties instance which is - # why it is passed into a mutable dict so that it can be deep copied - return _wrapper_merge_inputs(task_create_func, - dict(ctx.node.properties), **kwargs) - - return wrapper - -def merge_inputs_for_start(task_start_func): - """Merge all inputs for start operation into one dict""" - - # Needed to wrap the wrapper because I was seeing issues with - # "RuntimeError: No context set in current execution thread" - def wrapper(**kwargs): - return _wrapper_merge_inputs(task_start_func, - ctx.instance.runtime_properties, **kwargs) - - return wrapper diff --git a/docker/dockerplugin/discovery.py b/docker/dockerplugin/discovery.py deleted file mode 100644 index 563693c..0000000 --- a/docker/dockerplugin/discovery.py +++ /dev/null @@ -1,257 +0,0 @@ -# ============LICENSE_START======================================================= -# org.onap.dcae -# ================================================================================ -# Copyright (c) 2017 AT&T Intellectual Property. All rights reserved. -# Copyright (c) 2019 Pantheon.tech. All rights reserved. -# ================================================================================ -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# ============LICENSE_END========================================================= -# -# ECOMP is a trademark and service mark of AT&T Intellectual Property. - -from functools import partial -import json -import logging -import uuid -import requests -import consul - - -logger = logging.getLogger("discovery") - - -class DiscoveryError(RuntimeError): - pass - -class DiscoveryConnectionError(RuntimeError): - pass - -class DiscoveryServiceNotFoundError(RuntimeError): - pass - -class DiscoveryKVEntryNotFoundError(RuntimeError): - pass - - -def _wrap_consul_call(consul_func, *args, **kwargs): - """Wrap Consul call to map errors""" - try: - return consul_func(*args, **kwargs) - except requests.exceptions.ConnectionError as e: - raise DiscoveryConnectionError(e) - - -def generate_service_component_name(service_component_type): - """Generate service component id used to pass into the service component - instance and used as the key to the service component configuration. - - Format: - <service component id>_<service component type> - """ - # Random generated - # Copied from cdap plugin - return "{0}_{1}".format(str(uuid.uuid4()).replace("-",""), - service_component_type) - - -def create_kv_conn(host): - """Create connection to key-value store - - Returns a Consul client to the specified Consul host""" - try: - [hostname, port] = host.split(":") - return consul.Consul(host=hostname, port=int(port)) - except ValueError as e: - return consul.Consul(host=host) - -def push_service_component_config(kv_conn, service_component_name, config): - config_string = config if isinstance(config, str) else json.dumps(config) - kv_put_func = partial(_wrap_consul_call, kv_conn.kv.put) - - if kv_put_func(service_component_name, config_string): - logger.info("Added config for {0}".format(service_component_name)) - else: - raise DiscoveryError("Failed to push configuration") - -def remove_service_component_config(kv_conn, service_component_name): - kv_delete_func = partial(_wrap_consul_call, kv_conn.kv.delete) - kv_delete_func(service_component_name) - - -def get_kv_value(kv_conn, key): - """Get a key-value entry's value from Consul - - Raises DiscoveryKVEntryNotFoundError if entry not found - """ - kv_get_func = partial(_wrap_consul_call, kv_conn.kv.get) - (index, val) = kv_get_func(key) - - if val: - return json.loads(val['Value']) # will raise ValueError if not JSON, let it propagate - else: - raise DiscoveryKVEntryNotFoundError("{0} kv entry not found".format(key)) - - -def _create_rel_key(service_component_name): - return "{0}:rel".format(service_component_name) - -def store_relationship(kv_conn, source_name, target_name): - # TODO: Rel entry may already exist in a one-to-many situation. Need to - # support that. - rel_key = _create_rel_key(source_name) - rel_value = [target_name] if target_name else [] - - kv_put_func = partial(_wrap_consul_call, kv_conn.kv.put) - kv_put_func(rel_key, json.dumps(rel_value)) - logger.info("Added relationship for {0}".format(rel_key)) - -def delete_relationship(kv_conn, service_component_name): - rel_key = _create_rel_key(service_component_name) - kv_get_func = partial(_wrap_consul_call, kv_conn.kv.get) - index, rels = kv_get_func(rel_key) - - if rels: - rels = json.loads(rels["Value"].decode("utf-8")) - kv_delete_func = partial(_wrap_consul_call, kv_conn.kv.delete) - kv_delete_func(rel_key) - return rels - else: - return [] - -def lookup_service(kv_conn, service_component_name): - catalog_get_func = partial(_wrap_consul_call, kv_conn.catalog.service) - index, results = catalog_get_func(service_component_name) - - if results: - return results - else: - raise DiscoveryServiceNotFoundError("Failed to find: {0}".format(service_component_name)) - - -# TODO: Note these functions have been (for the most part) shamelessly lifted from -# dcae-cli and should really be shared. - -def _is_healthy_pure(get_health_func, instance): - """Checks to see if a component instance is running healthy - - Pure function edition - - Args - ---- - get_health_func: func(string) -> complex object - Look at unittests in test_discovery to see examples - instance: (string) fully qualified name of component instance - - Returns - ------- - True if instance has been found and is healthy else False - """ - index, resp = get_health_func(instance) - - if resp: - def is_passing(instance): - return all([check["Status"] == "passing" for check in instance["Checks"]]) - - return any([is_passing(instance) for instance in resp]) - else: - return False - -def is_healthy(consul_host, instance): - """Checks to see if a component instance is running healthy - - Impure function edition - - Args - ---- - consul_host: (string) host string of Consul - instance: (string) fully qualified name of component instance - - Returns - ------- - True if instance has been found and is healthy else False - """ - cons = create_kv_conn(consul_host) - - get_health_func = partial(_wrap_consul_call, cons.health.service) - return _is_healthy_pure(get_health_func, instance) - - -def add_to_entry(conn, key, add_name, add_value): - """ - Find 'key' in consul. - Treat its value as a JSON string representing a dict. - Extend the dict by adding an entry with key 'add_name' and value 'add_value'. - Turn the resulting extended dict into a JSON string. - Store the string back into Consul under 'key'. - Watch out for conflicting concurrent updates. - - Example: - Key 'xyz:dmaap' has the value '{"feed00": {"feed_url" : "http://example.com/feeds/999"}}' - add_to_entry('xyz:dmaap', 'topic00', {'topic_url' : 'http://example.com/topics/1229'}) - should result in the value for key 'xyz:dmaap' in consul being updated to - '{"feed00": {"feed_url" : "http://example.com/feeds/999"}, "topic00" : {"topic_url" : "http://example.com/topics/1229"}}' - """ - while True: # do until update succeeds - (index, val) = conn.kv.get(key) # index gives version of key retrieved - - if val is None: # no key yet - vstring = '{}' - mod_index = 0 # Use 0 as the cas index for initial insertion of the key - else: - vstring = val['Value'] - mod_index = val['ModifyIndex'] - - # Build the updated dict - # Exceptions just propagate - v = json.loads(vstring) - v[add_name] = add_value - new_vstring = json.dumps(v) - - updated = conn.kv.put(key, new_vstring, cas=mod_index) # if the key has changed since retrieval, this will return false - if updated: - return v - - -def _find_matching_services(services, name_search, tags): - """Find matching services given search criteria""" - tags = set(tags) - return [srv_name for srv_name in services - if name_search in srv_name and tags <= set(services[srv_name])] - -def search_services(conn, name_search, tags): - """Search for services that match criteria - - Args: - ----- - name_search: (string) Name to search for as a substring - tags: (list) List of strings that are tags. A service must match **all** the - tags in the list. - - Retruns: - -------- - List of names of services that matched - """ - # srvs is dict where key is service name and value is list of tags - catalog_get_services_func = partial(_wrap_consul_call, conn.catalog.services) - index, srvs = catalog_get_services_func() - - if srvs: - matches = _find_matching_services(srvs, name_search, tags) - - if matches: - return matches - - raise DiscoveryServiceNotFoundError( - "No matches found: {0}, {1}".format(name_search, tags)) - else: - raise DiscoveryServiceNotFoundError("No services found") diff --git a/docker/dockerplugin/exceptions.py b/docker/dockerplugin/exceptions.py deleted file mode 100644 index 0d8a341..0000000 --- a/docker/dockerplugin/exceptions.py +++ /dev/null @@ -1,29 +0,0 @@ -# ============LICENSE_START======================================================= -# org.onap.dcae -# ================================================================================ -# Copyright (c) 2017 AT&T Intellectual Property. All rights reserved. -# ================================================================================ -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# ============LICENSE_END========================================================= -# -# ECOMP is a trademark and service mark of AT&T Intellectual Property. - -class DockerPluginDeploymentError(RuntimeError): - pass - - -class DockerPluginDependencyNotReadyError(RuntimeError): - """Error to use when something that this plugin depends upon e.g. docker api, - consul is not ready""" - pass - diff --git a/docker/dockerplugin/tasks.py b/docker/dockerplugin/tasks.py deleted file mode 100644 index 8a15319..0000000 --- a/docker/dockerplugin/tasks.py +++ /dev/null @@ -1,672 +0,0 @@ -# ============LICENSE_START======================================================= -# org.onap.dcae -# ================================================================================ -# Copyright (c) 2017 AT&T Intellectual Property. All rights reserved. -# Copyright (c) 2019 Pantheon.tech. All rights reserved. -# ================================================================================ -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# ============LICENSE_END========================================================= -# -# ECOMP is a trademark and service mark of AT&T Intellectual Property. - -# Lifecycle interface calls for DockerContainer - -import json, time, copy, random -from cloudify import ctx -from cloudify.decorators import operation -from cloudify.exceptions import NonRecoverableError, RecoverableError -import dockering as doc -from onap_dcae_dcaepolicy_lib import Policies -from dockerplugin import discovery as dis -from dockerplugin.decorators import monkeypatch_loggers, wrap_error_handling_start, \ - merge_inputs_for_start, merge_inputs_for_create -from dockerplugin.exceptions import DockerPluginDeploymentError, \ - DockerPluginDependencyNotReadyError -from dockerplugin import utils - -# TODO: Remove this Docker port hardcoding and query for this port instead -DOCKER_PORT = 2376 -# Rely on the setup of the cloudify manager host to resolve "consul" for the -# plugin. NOTE: This variable is not passed to components. -CONSUL_HOST = "consul" - -# Used to construct delivery urls for data router subscribers. Data router in FTL -# requires https but this author believes that ONAP is to be defaulted to http. -DEFAULT_SCHEME = "http" - -# Property keys -SERVICE_COMPONENT_NAME = "service_component_name" -SELECTED_CONTAINER_DESTINATION = "selected_container_destination" -CONTAINER_ID = "container_id" -APPLICATION_CONFIG = "application_config" - - -# Utility methods - -def _get_docker_logins(consul_host=CONSUL_HOST): - """Get Docker logins - - The assumption is that all Docker logins to be used will be available in - Consul's key-value store under "docker_plugin/docker_logins" as a list of - json objects where each object is a single login: - - [{ "username": "dcae_dev_ro", "password": "att123ro", - "registry": "nexus01.research.att.com:18443" }] - """ - # REVIEW: The error handling may have to be re-examined. The current thought is - # that the logins *must* be setup even with an empty list otherwise the task - # will fail (fail fast). One alterative is to pass back empty list upon any - # issues but this would push potential issues to a later point of the - # deployment. - kv_conn = dis.create_kv_conn(consul_host) - return dis.get_kv_value(kv_conn, "docker_plugin/docker_logins") - - -# Lifecycle interface calls for dcae.nodes.DockerContainer - -def _setup_for_discovery(**kwargs): - """Setup for config discovery""" - try: - name = kwargs['name'] - application_config = kwargs[APPLICATION_CONFIG] - - # NOTE: application_config is no longer a json string and is inputed as a - # YAML map which translates to a dict. We don't have to do any - # preprocessing anymore. - conn = dis.create_kv_conn(CONSUL_HOST) - dis.push_service_component_config(conn, name, application_config) - return kwargs - except dis.DiscoveryConnectionError as e: - raise RecoverableError(e) - except Exception as e: - ctx.logger.error("Unexpected error while pushing configuration: {0}" - .format(str(e))) - raise NonRecoverableError(e) - -def _generate_component_name(**kwargs): - """Generate component name""" - service_component_type = kwargs['service_component_type'] - name_override = kwargs['service_component_name_override'] - - kwargs['name'] = name_override if name_override \ - else dis.generate_service_component_name(service_component_type) - return kwargs - -def _done_for_create(**kwargs): - """Wrap up create operation""" - name = kwargs['name'] - kwargs[SERVICE_COMPONENT_NAME] = name - # All updates to the runtime_properties happens here. I don't see a reason - # why we shouldn't do this because the context is not being mutated by - # something else and will keep the other functions pure (pure in the sense - # not dealing with CloudifyContext). - ctx.instance.runtime_properties.update(kwargs) - ctx.logger.info("Done setting up: {0}".format(name)) - return kwargs - - -@merge_inputs_for_create -@monkeypatch_loggers -@Policies.gather_policies_to_node() -@operation -def create_for_components(**create_inputs): - """Create step for Docker containers that are components - - This interface is responible for: - - 1. Generating service component name - 2. Populating config information into Consul - """ - _done_for_create( - **_setup_for_discovery( - **_generate_component_name( - **create_inputs))) - - -def _parse_streams(**kwargs): - """Parse streams and setup for DMaaP plugin""" - # The DMaaP plugin requires this plugin to set the runtime properties - # keyed by the node name. - for stream in kwargs["streams_publishes"]: - kwargs[stream["name"]] = stream - - # NOTE: That the delivery url is constructed and setup in the start operation - for stream in kwargs["streams_subscribes"]: - if stream["type"] == "data_router": - # If either username or password is missing then generate it. The - # DMaaP plugin doesn't generate them for subscribers. - # The code and length of username/password are lifted from the DMaaP - # plugin. - - # Don't want to mutate the source - stream = copy.deepcopy(stream) - if not stream.get("username", None): - stream["username"] = utils.random_string(8) - if not stream.get("password", None): - stream["password"] = utils.random_string(10) - - kwargs[stream["name"]] = stream - - return kwargs - - -def _setup_for_discovery_streams(**kwargs): - """Setup for discovery of streams - - Specifically, there's a race condition this call addresses for data router - subscriber case. The component needs its feed subscriber information but the - DMaaP plugin doesn't provide this until after the docker plugin start - operation. - """ - dr_subs = [kwargs[s["name"]] for s in kwargs["streams_subscribes"] \ - if s["type"] == "data_router"] - - if dr_subs: - dmaap_kv_key = "{0}:dmaap".format(kwargs["name"]) - conn = dis.create_kv_conn(CONSUL_HOST) - - def add_feed(dr_sub): - # delivery url and subscriber id will be fill by the dmaap plugin later - v = { "location": dr_sub["location"], "delivery_url": None, - "username": dr_sub["username"], "password": dr_sub["password"], - "subscriber_id": None } - return dis.add_to_entry(conn, dmaap_kv_key, dr_sub["name"], v) - - try: - for dr_sub in dr_subs: - if add_feed(dr_sub) is None: - raise NonRecoverableError( - "Failure updating feed streams in Consul") - except Exception as e: - raise NonRecoverableError(e) - - return kwargs - - -@merge_inputs_for_create -@monkeypatch_loggers -@Policies.gather_policies_to_node() -@operation -def create_for_components_with_streams(**create_inputs): - """Create step for Docker containers that are components that use DMaaP - - This interface is responible for: - - 1. Generating service component name - 2. Setup runtime properties for DMaaP plugin - 3. Populating application config into Consul - 4. Populating DMaaP config for data router subscribers in Consul - """ - _done_for_create( - **_setup_for_discovery( - **_setup_for_discovery_streams( - **_parse_streams( - **_generate_component_name( - **create_inputs))))) - - -@merge_inputs_for_create -@monkeypatch_loggers -@operation -def create_for_platforms(**create_inputs): - """Create step for Docker containers that are platform components - - This interface is responible for: - - 1. Populating config information into Consul - """ - _done_for_create( - **_setup_for_discovery( - **create_inputs)) - - -def _lookup_service(service_component_name, consul_host=CONSUL_HOST, - with_port=False): - conn = dis.create_kv_conn(consul_host) - results = dis.lookup_service(conn, service_component_name) - - if with_port: - # Just grab first - result = results[0] - return "{address}:{port}".format(address=result["ServiceAddress"], - port=result["ServicePort"]) - else: - return results[0]["ServiceAddress"] - - -def _verify_container(service_component_name, max_wait, consul_host=CONSUL_HOST): - """Verify that the container is healthy - - Args: - ----- - max_wait (integer): limit to how may attempts to make which translates to - seconds because each sleep is one second. 0 means infinite. - - Return: - ------- - True if component is healthy else a DockerPluginDeploymentError exception - will be raised. - """ - num_attempts = 1 - - while True: - if dis.is_healthy(consul_host, service_component_name): - return True - else: - num_attempts += 1 - - if max_wait > 0 and max_wait < num_attempts: - raise DockerPluginDeploymentError("Container never became healthy") - - time.sleep(1) - - -def _create_and_start_container(container_name, image, docker_host, - consul_host=CONSUL_HOST, **kwargs): - """Create and start Docker container - - This is the function that actually does more of the heavy lifting including - resolving the docker host to connect and common things to do in setting up - docker containers like making sure CONSUL_HOST gets set as the local docker - host ip. - - This method raises DockerPluginDependencyNotReadyError - """ - try: - # Setup for Docker operations - - docker_host_ip = _lookup_service(docker_host, consul_host=consul_host) - - logins = _get_docker_logins(consul_host=consul_host) - client = doc.create_client(docker_host_ip, DOCKER_PORT, logins=logins) - - hcp = doc.add_host_config_params_volumes(volumes=kwargs.get("volumes", - None)) - hcp = doc.add_host_config_params_ports(ports=kwargs.get("ports", None), - host_config_params=hcp) - hcp = doc.add_host_config_params_dns(docker_host_ip, - host_config_params=hcp) - - # NOTE: The critical env variable CONSUL_HOST is being assigned the - # docker host ip itself because there should be a local Consul agent. We - # want services to register with their local Consul agent. - # CONFIG_BINDING_SERVICE is here for backwards compatibility. This is a - # well-known name now. - platform_envs = { "CONSUL_HOST": docker_host_ip, - "CONFIG_BINDING_SERVICE": "config_binding_service" } - # NOTE: The order of the envs being passed in is **important**. The - # kwargs["envs"] getting passed in last ensures that manual overrides - # will override the hardcoded envs. - envs = doc.create_envs(container_name, platform_envs, kwargs.get("envs", {})) - - # Do Docker operations - - container = doc.create_container(client, image, container_name, envs, hcp) - container_id = doc.start_container(client, container) - - return container_id - except (doc.DockerConnectionError, dis.DiscoveryConnectionError, - dis.DiscoveryServiceNotFoundError) as e: - raise DockerPluginDependencyNotReadyError(e) - - -def _parse_cloudify_context(**kwargs): - """Parse Cloudify context - - Extract what is needed. This is impure function because it requires ctx. - """ - kwargs["deployment_id"] = ctx.deployment.id - return kwargs - -def _enhance_docker_params(**kwargs): - """Setup Docker envs""" - docker_config = kwargs.get("docker_config", {}) - - envs = kwargs.get("envs", {}) - # NOTE: Healthchecks are optional until prepared to handle use cases that - # don't necessarily use http - envs_healthcheck = doc.create_envs_healthcheck(docker_config) \ - if "healthcheck" in docker_config else {} - envs.update(envs_healthcheck) - - # Set tags on this component for its Consul registration as a service - tags = [kwargs.get("deployment_id", None), kwargs["service_id"]] - tags = [ str(tag) for tag in tags if tag is not None ] - # Registrator will use this to register this component with tags. Must be - # comma delimited. - envs["SERVICE_TAGS"] = ",".join(tags) - - kwargs["envs"] = envs - - def combine_params(key, docker_config, kwargs): - v = docker_config.get(key, []) + kwargs.get(key, []) - if v: - kwargs[key] = v - return kwargs - - # Add the lists of ports and volumes unintelligently - meaning just add the - # lists together with no deduping. - kwargs = combine_params("ports", docker_config, kwargs) - kwargs = combine_params("volumes", docker_config, kwargs) - - return kwargs - -def _create_and_start_component(**kwargs): - """Create and start component (container)""" - image = kwargs["image"] - service_component_name = kwargs[SERVICE_COMPONENT_NAME] - docker_host = kwargs[SELECTED_CONTAINER_DESTINATION] - # Need to be picky and manually select out pieces because just using kwargs - # which contains everything confused the execution of - # _create_and_start_container because duplicate variables exist - sub_kwargs = { "volumes": kwargs.get("volumes", []), - "ports": kwargs.get("ports", None), "envs": kwargs.get("envs", {}) } - - container_id = _create_and_start_container(service_component_name, image, - docker_host, **sub_kwargs) - kwargs[CONTAINER_ID] = container_id - - # TODO: Use regular logging here - ctx.logger.info("Container started: {0}, {1}".format(container_id, - service_component_name)) - - return kwargs - -def _verify_component(**kwargs): - """Verify component (container) is healthy""" - service_component_name = kwargs[SERVICE_COMPONENT_NAME] - # TODO: "Consul doesn't make its first health check immediately upon registration. - # Instead it waits for the health check interval to pass." - # Possible enhancement is to read the interval (and possibly the timeout) from - # docker_config and multiply that by a number to come up with a more suitable - # max_wait. - max_wait = kwargs.get("max_wait", 300) - - # Verify that the container is healthy - - if _verify_container(service_component_name, max_wait): - container_id = kwargs[CONTAINER_ID] - service_component_name = kwargs[SERVICE_COMPONENT_NAME] - - # TODO: Use regular logging here - ctx.logger.info("Container is healthy: {0}, {1}".format(container_id, - service_component_name)) - - return kwargs - -def _done_for_start(**kwargs): - ctx.instance.runtime_properties.update(kwargs) - ctx.logger.info("Done starting: {0}".format(kwargs["name"])) - return kwargs - -@wrap_error_handling_start -@merge_inputs_for_start -@monkeypatch_loggers -@operation -def create_and_start_container_for_components(**start_inputs): - """Create Docker container and start for components - - This operation method is to be used with the DockerContainerForComponents - node type. After launching the container, the plugin will verify with Consul - that the app is up and healthy before terminating. - """ - _done_for_start( - **_verify_component( - **_create_and_start_component( - **_enhance_docker_params( - **_parse_cloudify_context(**start_inputs))))) - - -def _update_delivery_url(**kwargs): - """Update the delivery url for data router subscribers""" - dr_subs = [kwargs[s["name"]] for s in kwargs["streams_subscribes"] \ - if s["type"] == "data_router"] - - if dr_subs: - service_component_name = kwargs[SERVICE_COMPONENT_NAME] - # TODO: Should NOT be setting up the delivery url with ip addresses - # because in the https case, this will not work because data router does - # a certificate validation using the fqdn. - subscriber_host = _lookup_service(service_component_name, with_port=True) - - for dr_sub in dr_subs: - scheme = dr_sub["scheme"] if "scheme" in dr_sub else DEFAULT_SCHEME - if "route" not in dr_sub: - raise NonRecoverableError("'route' key missing from data router subscriber") - path = dr_sub["route"] - dr_sub["delivery_url"] = "{scheme}://{host}/{path}".format( - scheme=scheme, host=subscriber_host, path=path) - kwargs[dr_sub["name"]] = dr_sub - - return kwargs - -@wrap_error_handling_start -@merge_inputs_for_start -@monkeypatch_loggers -@operation -def create_and_start_container_for_components_with_streams(**start_inputs): - """Create Docker container and start for components that have streams - - This operation method is to be used with the DockerContainerForComponents - node type. After launching the container, the plugin will verify with Consul - that the app is up and healthy before terminating. - """ - _done_for_start( - **_update_delivery_url( - **_verify_component( - **_create_and_start_component( - **_enhance_docker_params( - **_parse_cloudify_context(**start_inputs)))))) - - -@wrap_error_handling_start -@monkeypatch_loggers -@operation -def create_and_start_container_for_platforms(**kwargs): - """Create Docker container and start for platform services - - This operation method is to be used with the DockerContainerForPlatforms - node type. After launching the container, the plugin will verify with Consul - that the app is up and healthy before terminating. - """ - image = ctx.node.properties["image"] - docker_config = ctx.node.properties.get("docker_config", {}) - service_component_name = ctx.node.properties["name"] - - docker_host = ctx.instance.runtime_properties[SELECTED_CONTAINER_DESTINATION] - - envs = kwargs.get("envs", {}) - # NOTE: Healthchecks are optional until prepared to handle use cases that - # don't necessarily use http - envs_healthcheck = doc.create_envs_healthcheck(docker_config) \ - if "healthcheck" in docker_config else {} - envs.update(envs_healthcheck) - kwargs["envs"] = envs - - host_port = ctx.node.properties["host_port"] - container_port = ctx.node.properties["container_port"] - - # Cloudify properties are all required and Cloudify complains that None - # is not a valid type for integer. Defaulting to 0 to indicate to not - # use this and not to set a specific port mapping in cases like service - # change handler. - if host_port != 0 and container_port != 0: - # Doing this because other nodes might want to use this property - port_mapping = "{cp}:{hp}".format(cp=container_port, hp=host_port) - ports = kwargs.get("ports", []) + [ port_mapping ] - kwargs["ports"] = ports - if "ports" not in kwargs: - ctx.logger.warn("No port mappings defined. Will randomly assign port.") - - container_id = _create_and_start_container(service_component_name, image, - docker_host, **kwargs) - ctx.instance.runtime_properties[CONTAINER_ID] = container_id - - ctx.logger.info("Container started: {0}, {1}".format(container_id, - service_component_name)) - - # Verify that the container is healthy - - max_wait = kwargs.get("max_wait", 300) - - if _verify_container(service_component_name, max_wait): - ctx.logger.info("Container is healthy: {0}, {1}".format(container_id, - service_component_name)) - - -@wrap_error_handling_start -@monkeypatch_loggers -@operation -def create_and_start_container(**kwargs): - """Create Docker container and start""" - service_component_name = ctx.node.properties["name"] - ctx.instance.runtime_properties[SERVICE_COMPONENT_NAME] = service_component_name - - image = ctx.node.properties["image"] - docker_host = ctx.instance.runtime_properties[SELECTED_CONTAINER_DESTINATION] - - container_id = _create_and_start_container(service_component_name, image, - docker_host, **kwargs) - ctx.instance.runtime_properties[CONTAINER_ID] = container_id - - ctx.logger.info("Container started: {0}, {1}".format(container_id, - service_component_name)) - - -@monkeypatch_loggers -@operation -def stop_and_remove_container(**kwargs): - """Stop and remove Docker container""" - try: - docker_host = ctx.instance.runtime_properties[SELECTED_CONTAINER_DESTINATION] - - docker_host_ip = _lookup_service(docker_host) - - logins = _get_docker_logins() - client = doc.create_client(docker_host_ip, DOCKER_PORT, logins=logins) - - container_id = ctx.instance.runtime_properties[CONTAINER_ID] - doc.stop_then_remove_container(client, container_id) - - cleanup_image = kwargs.get("cleanup_image", False) - - if cleanup_image: - image = ctx.node.properties["image"] - - if doc.remove_image(client, image): - ctx.logger.info("Removed Docker image: {0}".format(image)) - else: - ctx.logger.warn("Couldnot remove Docker image: {0}".format(image)) - except (doc.DockerConnectionError, dis.DiscoveryConnectionError, - dis.DiscoveryServiceNotFoundError) as e: - raise RecoverableError(e) - except Exception as e: - ctx.logger.error("Unexpected error while stopping container: {0}" - .format(str(e))) - raise NonRecoverableError(e) - -@monkeypatch_loggers -@Policies.cleanup_policies_on_node -@operation -def cleanup_discovery(**kwargs): - """Delete configuration from Consul""" - service_component_name = ctx.instance.runtime_properties[SERVICE_COMPONENT_NAME] - - try: - conn = dis.create_kv_conn(CONSUL_HOST) - dis.remove_service_component_config(conn, service_component_name) - except dis.DiscoveryConnectionError as e: - raise RecoverableError(e) - - -def _notify_container(**kwargs): - """Notify container using the policy section in the docker_config""" - dc = kwargs["docker_config"] - - if "policy" in dc: - if dc["policy"]["trigger_type"] == "docker": - # REVIEW: Need to finalize on the docker config policy data structure - script_path = dc["policy"]["script_path"] - updated_policies = kwargs["updated_policies"] - removed_policies = kwargs["removed_policies"] - policies = kwargs["policies"] - cmd = doc.build_policy_update_cmd(script_path, use_sh=False, - msg_type="policies", - updated_policies=updated_policies, - removed_policies=removed_policies, - policies=policies - ) - - docker_host = kwargs[SELECTED_CONTAINER_DESTINATION] - docker_host_ip = _lookup_service(docker_host) - logins = _get_docker_logins() - client = doc.create_client(docker_host_ip, DOCKER_PORT, logins=logins) - - container_id = kwargs["container_id"] - - doc.notify_for_policy_update(client, container_id, cmd) - # else the default is no trigger - - return kwargs - -@monkeypatch_loggers -@Policies.update_policies_on_node() -@operation -def policy_update(updated_policies, removed_policies=None, policies=None, **kwargs): - """Policy update task - - This method is responsible for updating the application configuration and - notifying the applications that the change has occurred. This is to be used - for the dcae.interfaces.policy.policy_update operation. - - :updated_policies: contains the list of changed policy-configs when configs_only=True - (default) Use configs_only=False to bring the full policy objects in :updated_policies:. - """ - update_inputs = copy.deepcopy(ctx.instance.runtime_properties) - update_inputs["updated_policies"] = updated_policies - update_inputs["removed_policies"] = removed_policies - update_inputs["policies"] = policies - - _notify_container(**update_inputs) - - -# Lifecycle interface calls for dcae.nodes.DockerHost - - -@monkeypatch_loggers -@operation -def select_docker_host(**kwargs): - selected_docker_host = ctx.node.properties['docker_host_override'] - name_search = ctx.node.properties['name_search'] - location_id = ctx.node.properties['location_id'] - - if selected_docker_host: - ctx.instance.runtime_properties[SERVICE_COMPONENT_NAME] = selected_docker_host - ctx.logger.info("Selected Docker host: {0}".format(selected_docker_host)) - else: - try: - conn = dis.create_kv_conn(CONSUL_HOST) - names = dis.search_services(conn, name_search, [location_id]) - ctx.logger.info("Docker hosts found: {0}".format(names)) - # Randomly choose one - ctx.instance.runtime_properties[SERVICE_COMPONENT_NAME] = random.choice(names) - except (dis.DiscoveryConnectionError, dis.DiscoveryServiceNotFoundError) as e: - raise RecoverableError(e) - except Exception as e: - raise NonRecoverableError(e) - -@operation -def unselect_docker_host(**kwargs): - del ctx.instance.runtime_properties[SERVICE_COMPONENT_NAME] - ctx.logger.info("Unselected Docker host") - diff --git a/docker/dockerplugin/utils.py b/docker/dockerplugin/utils.py deleted file mode 100644 index 6475aaa..0000000 --- a/docker/dockerplugin/utils.py +++ /dev/null @@ -1,44 +0,0 @@ -# ============LICENSE_START======================================================= -# org.onap.dcae -# ================================================================================ -# Copyright (c) 2017 AT&T Intellectual Property. All rights reserved. -# Copyright (c) 2019 Pantheon.tech. All rights reserved. -# ================================================================================ -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# ============LICENSE_END========================================================= -# -# ECOMP is a trademark and service mark of AT&T Intellectual Property. - -import string -import random -import collections - - -def random_string(n): - """Random generate an ascii string of "n" length""" - corpus = string.ascii_lowercase + string.ascii_uppercase + string.digits - return ''.join(random.choice(corpus) for x in range(n)) - - -def update_dict(d, u): - """Recursively updates dict - - Update dict d with dict u - """ - for k, v in u.items(): - if isinstance(v, collections.Mapping): - r = update_dict(d.get(k, {}), v) - d[k] = r - else: - d[k] = u[k] - return d |