summaryrefslogtreecommitdiffstats
path: root/docker/dockerplugin
diff options
context:
space:
mode:
Diffstat (limited to 'docker/dockerplugin')
-rw-r--r--docker/dockerplugin/__init__.py31
-rw-r--r--docker/dockerplugin/decorators.py102
-rw-r--r--docker/dockerplugin/discovery.py257
-rw-r--r--docker/dockerplugin/exceptions.py29
-rw-r--r--docker/dockerplugin/tasks.py672
-rw-r--r--docker/dockerplugin/utils.py44
6 files changed, 0 insertions, 1135 deletions
diff --git a/docker/dockerplugin/__init__.py b/docker/dockerplugin/__init__.py
deleted file mode 100644
index 669e196..0000000
--- a/docker/dockerplugin/__init__.py
+++ /dev/null
@@ -1,31 +0,0 @@
-# ============LICENSE_START=======================================================
-# org.onap.dcae
-# ================================================================================
-# Copyright (c) 2017 AT&T Intellectual Property. All rights reserved.
-# ================================================================================
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-# ============LICENSE_END=========================================================
-#
-# ECOMP is a trademark and service mark of AT&T Intellectual Property.
-
-# REVIEW: Tried to source the version from here but you run into import issues
-# because "tasks" module is loaded. This method seems to be the PEP 396
-# recommended way and is listed #3 here https://packaging.python.org/single_source_version/
-# __version__ = '0.1.0'
-
-from .tasks import create_for_components, create_for_components_with_streams, \
- create_and_start_container_for_components_with_streams, \
- create_for_platforms, create_and_start_container, \
- create_and_start_container_for_components, create_and_start_container_for_platforms, \
- stop_and_remove_container, cleanup_discovery, select_docker_host, unselect_docker_host, \
- policy_update
diff --git a/docker/dockerplugin/decorators.py b/docker/dockerplugin/decorators.py
deleted file mode 100644
index f83263b..0000000
--- a/docker/dockerplugin/decorators.py
+++ /dev/null
@@ -1,102 +0,0 @@
-# ============LICENSE_START=======================================================
-# org.onap.dcae
-# ================================================================================
-# Copyright (c) 2017 AT&T Intellectual Property. All rights reserved.
-# ================================================================================
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-# ============LICENSE_END=========================================================
-#
-# ECOMP is a trademark and service mark of AT&T Intellectual Property.
-
-import copy
-from cloudify import ctx
-from cloudify.exceptions import NonRecoverableError, RecoverableError
-from dockering import utils as doc
-from dockerplugin import discovery as dis
-from dockerplugin.exceptions import DockerPluginDeploymentError, \
- DockerPluginDependencyNotReadyError
-from dockerplugin import utils
-
-
-def monkeypatch_loggers(task_func):
- """Sets up the dependent loggers"""
-
- def wrapper(**kwargs):
- # Ouch! Monkeypatch loggers
- doc.logger = ctx.logger
- dis.logger = ctx.logger
-
- return task_func(**kwargs)
-
- return wrapper
-
-
-def wrap_error_handling_start(task_start_func):
- """Wrap error handling for the start operations"""
-
- def wrapper(**kwargs):
- try:
- return task_start_func(**kwargs)
- except DockerPluginDependencyNotReadyError as e:
- # You are here because things we need like a working docker host is not
- # available yet so let Cloudify try again later.
- raise RecoverableError(e)
- except DockerPluginDeploymentError as e:
- # Container failed to come up in the allotted time. This is deemed
- # non-recoverable.
- raise NonRecoverableError(e)
- except Exception as e:
- ctx.logger.error("Unexpected error while starting container: {0}"
- .format(str(e)))
- raise NonRecoverableError(e)
-
- return wrapper
-
-
-def _wrapper_merge_inputs(task_func, properties, **kwargs):
- """Merge Cloudify properties with input kwargs before calling task func"""
- inputs = copy.deepcopy(properties)
- # Recursively update
- utils.update_dict(inputs, kwargs)
-
- # Apparently kwargs contains "ctx" which is cloudify.context.CloudifyContext
- # This has to be removed and not copied into runtime_properties else you get
- # JSON serialization errors.
- if "ctx" in inputs:
- del inputs["ctx"]
-
- return task_func(**inputs)
-
-def merge_inputs_for_create(task_create_func):
- """Merge all inputs for start operation into one dict"""
-
- # Needed to wrap the wrapper because I was seeing issues with
- # "RuntimeError: No context set in current execution thread"
- def wrapper(**kwargs):
- # NOTE: ctx.node.properties is an ImmutableProperties instance which is
- # why it is passed into a mutable dict so that it can be deep copied
- return _wrapper_merge_inputs(task_create_func,
- dict(ctx.node.properties), **kwargs)
-
- return wrapper
-
-def merge_inputs_for_start(task_start_func):
- """Merge all inputs for start operation into one dict"""
-
- # Needed to wrap the wrapper because I was seeing issues with
- # "RuntimeError: No context set in current execution thread"
- def wrapper(**kwargs):
- return _wrapper_merge_inputs(task_start_func,
- ctx.instance.runtime_properties, **kwargs)
-
- return wrapper
diff --git a/docker/dockerplugin/discovery.py b/docker/dockerplugin/discovery.py
deleted file mode 100644
index 563693c..0000000
--- a/docker/dockerplugin/discovery.py
+++ /dev/null
@@ -1,257 +0,0 @@
-# ============LICENSE_START=======================================================
-# org.onap.dcae
-# ================================================================================
-# Copyright (c) 2017 AT&T Intellectual Property. All rights reserved.
-# Copyright (c) 2019 Pantheon.tech. All rights reserved.
-# ================================================================================
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-# ============LICENSE_END=========================================================
-#
-# ECOMP is a trademark and service mark of AT&T Intellectual Property.
-
-from functools import partial
-import json
-import logging
-import uuid
-import requests
-import consul
-
-
-logger = logging.getLogger("discovery")
-
-
-class DiscoveryError(RuntimeError):
- pass
-
-class DiscoveryConnectionError(RuntimeError):
- pass
-
-class DiscoveryServiceNotFoundError(RuntimeError):
- pass
-
-class DiscoveryKVEntryNotFoundError(RuntimeError):
- pass
-
-
-def _wrap_consul_call(consul_func, *args, **kwargs):
- """Wrap Consul call to map errors"""
- try:
- return consul_func(*args, **kwargs)
- except requests.exceptions.ConnectionError as e:
- raise DiscoveryConnectionError(e)
-
-
-def generate_service_component_name(service_component_type):
- """Generate service component id used to pass into the service component
- instance and used as the key to the service component configuration.
-
- Format:
- <service component id>_<service component type>
- """
- # Random generated
- # Copied from cdap plugin
- return "{0}_{1}".format(str(uuid.uuid4()).replace("-",""),
- service_component_type)
-
-
-def create_kv_conn(host):
- """Create connection to key-value store
-
- Returns a Consul client to the specified Consul host"""
- try:
- [hostname, port] = host.split(":")
- return consul.Consul(host=hostname, port=int(port))
- except ValueError as e:
- return consul.Consul(host=host)
-
-def push_service_component_config(kv_conn, service_component_name, config):
- config_string = config if isinstance(config, str) else json.dumps(config)
- kv_put_func = partial(_wrap_consul_call, kv_conn.kv.put)
-
- if kv_put_func(service_component_name, config_string):
- logger.info("Added config for {0}".format(service_component_name))
- else:
- raise DiscoveryError("Failed to push configuration")
-
-def remove_service_component_config(kv_conn, service_component_name):
- kv_delete_func = partial(_wrap_consul_call, kv_conn.kv.delete)
- kv_delete_func(service_component_name)
-
-
-def get_kv_value(kv_conn, key):
- """Get a key-value entry's value from Consul
-
- Raises DiscoveryKVEntryNotFoundError if entry not found
- """
- kv_get_func = partial(_wrap_consul_call, kv_conn.kv.get)
- (index, val) = kv_get_func(key)
-
- if val:
- return json.loads(val['Value']) # will raise ValueError if not JSON, let it propagate
- else:
- raise DiscoveryKVEntryNotFoundError("{0} kv entry not found".format(key))
-
-
-def _create_rel_key(service_component_name):
- return "{0}:rel".format(service_component_name)
-
-def store_relationship(kv_conn, source_name, target_name):
- # TODO: Rel entry may already exist in a one-to-many situation. Need to
- # support that.
- rel_key = _create_rel_key(source_name)
- rel_value = [target_name] if target_name else []
-
- kv_put_func = partial(_wrap_consul_call, kv_conn.kv.put)
- kv_put_func(rel_key, json.dumps(rel_value))
- logger.info("Added relationship for {0}".format(rel_key))
-
-def delete_relationship(kv_conn, service_component_name):
- rel_key = _create_rel_key(service_component_name)
- kv_get_func = partial(_wrap_consul_call, kv_conn.kv.get)
- index, rels = kv_get_func(rel_key)
-
- if rels:
- rels = json.loads(rels["Value"].decode("utf-8"))
- kv_delete_func = partial(_wrap_consul_call, kv_conn.kv.delete)
- kv_delete_func(rel_key)
- return rels
- else:
- return []
-
-def lookup_service(kv_conn, service_component_name):
- catalog_get_func = partial(_wrap_consul_call, kv_conn.catalog.service)
- index, results = catalog_get_func(service_component_name)
-
- if results:
- return results
- else:
- raise DiscoveryServiceNotFoundError("Failed to find: {0}".format(service_component_name))
-
-
-# TODO: Note these functions have been (for the most part) shamelessly lifted from
-# dcae-cli and should really be shared.
-
-def _is_healthy_pure(get_health_func, instance):
- """Checks to see if a component instance is running healthy
-
- Pure function edition
-
- Args
- ----
- get_health_func: func(string) -> complex object
- Look at unittests in test_discovery to see examples
- instance: (string) fully qualified name of component instance
-
- Returns
- -------
- True if instance has been found and is healthy else False
- """
- index, resp = get_health_func(instance)
-
- if resp:
- def is_passing(instance):
- return all([check["Status"] == "passing" for check in instance["Checks"]])
-
- return any([is_passing(instance) for instance in resp])
- else:
- return False
-
-def is_healthy(consul_host, instance):
- """Checks to see if a component instance is running healthy
-
- Impure function edition
-
- Args
- ----
- consul_host: (string) host string of Consul
- instance: (string) fully qualified name of component instance
-
- Returns
- -------
- True if instance has been found and is healthy else False
- """
- cons = create_kv_conn(consul_host)
-
- get_health_func = partial(_wrap_consul_call, cons.health.service)
- return _is_healthy_pure(get_health_func, instance)
-
-
-def add_to_entry(conn, key, add_name, add_value):
- """
- Find 'key' in consul.
- Treat its value as a JSON string representing a dict.
- Extend the dict by adding an entry with key 'add_name' and value 'add_value'.
- Turn the resulting extended dict into a JSON string.
- Store the string back into Consul under 'key'.
- Watch out for conflicting concurrent updates.
-
- Example:
- Key 'xyz:dmaap' has the value '{"feed00": {"feed_url" : "http://example.com/feeds/999"}}'
- add_to_entry('xyz:dmaap', 'topic00', {'topic_url' : 'http://example.com/topics/1229'})
- should result in the value for key 'xyz:dmaap' in consul being updated to
- '{"feed00": {"feed_url" : "http://example.com/feeds/999"}, "topic00" : {"topic_url" : "http://example.com/topics/1229"}}'
- """
- while True: # do until update succeeds
- (index, val) = conn.kv.get(key) # index gives version of key retrieved
-
- if val is None: # no key yet
- vstring = '{}'
- mod_index = 0 # Use 0 as the cas index for initial insertion of the key
- else:
- vstring = val['Value']
- mod_index = val['ModifyIndex']
-
- # Build the updated dict
- # Exceptions just propagate
- v = json.loads(vstring)
- v[add_name] = add_value
- new_vstring = json.dumps(v)
-
- updated = conn.kv.put(key, new_vstring, cas=mod_index) # if the key has changed since retrieval, this will return false
- if updated:
- return v
-
-
-def _find_matching_services(services, name_search, tags):
- """Find matching services given search criteria"""
- tags = set(tags)
- return [srv_name for srv_name in services
- if name_search in srv_name and tags <= set(services[srv_name])]
-
-def search_services(conn, name_search, tags):
- """Search for services that match criteria
-
- Args:
- -----
- name_search: (string) Name to search for as a substring
- tags: (list) List of strings that are tags. A service must match **all** the
- tags in the list.
-
- Retruns:
- --------
- List of names of services that matched
- """
- # srvs is dict where key is service name and value is list of tags
- catalog_get_services_func = partial(_wrap_consul_call, conn.catalog.services)
- index, srvs = catalog_get_services_func()
-
- if srvs:
- matches = _find_matching_services(srvs, name_search, tags)
-
- if matches:
- return matches
-
- raise DiscoveryServiceNotFoundError(
- "No matches found: {0}, {1}".format(name_search, tags))
- else:
- raise DiscoveryServiceNotFoundError("No services found")
diff --git a/docker/dockerplugin/exceptions.py b/docker/dockerplugin/exceptions.py
deleted file mode 100644
index 0d8a341..0000000
--- a/docker/dockerplugin/exceptions.py
+++ /dev/null
@@ -1,29 +0,0 @@
-# ============LICENSE_START=======================================================
-# org.onap.dcae
-# ================================================================================
-# Copyright (c) 2017 AT&T Intellectual Property. All rights reserved.
-# ================================================================================
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-# ============LICENSE_END=========================================================
-#
-# ECOMP is a trademark and service mark of AT&T Intellectual Property.
-
-class DockerPluginDeploymentError(RuntimeError):
- pass
-
-
-class DockerPluginDependencyNotReadyError(RuntimeError):
- """Error to use when something that this plugin depends upon e.g. docker api,
- consul is not ready"""
- pass
-
diff --git a/docker/dockerplugin/tasks.py b/docker/dockerplugin/tasks.py
deleted file mode 100644
index 8a15319..0000000
--- a/docker/dockerplugin/tasks.py
+++ /dev/null
@@ -1,672 +0,0 @@
-# ============LICENSE_START=======================================================
-# org.onap.dcae
-# ================================================================================
-# Copyright (c) 2017 AT&T Intellectual Property. All rights reserved.
-# Copyright (c) 2019 Pantheon.tech. All rights reserved.
-# ================================================================================
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-# ============LICENSE_END=========================================================
-#
-# ECOMP is a trademark and service mark of AT&T Intellectual Property.
-
-# Lifecycle interface calls for DockerContainer
-
-import json, time, copy, random
-from cloudify import ctx
-from cloudify.decorators import operation
-from cloudify.exceptions import NonRecoverableError, RecoverableError
-import dockering as doc
-from onap_dcae_dcaepolicy_lib import Policies
-from dockerplugin import discovery as dis
-from dockerplugin.decorators import monkeypatch_loggers, wrap_error_handling_start, \
- merge_inputs_for_start, merge_inputs_for_create
-from dockerplugin.exceptions import DockerPluginDeploymentError, \
- DockerPluginDependencyNotReadyError
-from dockerplugin import utils
-
-# TODO: Remove this Docker port hardcoding and query for this port instead
-DOCKER_PORT = 2376
-# Rely on the setup of the cloudify manager host to resolve "consul" for the
-# plugin. NOTE: This variable is not passed to components.
-CONSUL_HOST = "consul"
-
-# Used to construct delivery urls for data router subscribers. Data router in FTL
-# requires https but this author believes that ONAP is to be defaulted to http.
-DEFAULT_SCHEME = "http"
-
-# Property keys
-SERVICE_COMPONENT_NAME = "service_component_name"
-SELECTED_CONTAINER_DESTINATION = "selected_container_destination"
-CONTAINER_ID = "container_id"
-APPLICATION_CONFIG = "application_config"
-
-
-# Utility methods
-
-def _get_docker_logins(consul_host=CONSUL_HOST):
- """Get Docker logins
-
- The assumption is that all Docker logins to be used will be available in
- Consul's key-value store under "docker_plugin/docker_logins" as a list of
- json objects where each object is a single login:
-
- [{ "username": "dcae_dev_ro", "password": "att123ro",
- "registry": "nexus01.research.att.com:18443" }]
- """
- # REVIEW: The error handling may have to be re-examined. The current thought is
- # that the logins *must* be setup even with an empty list otherwise the task
- # will fail (fail fast). One alterative is to pass back empty list upon any
- # issues but this would push potential issues to a later point of the
- # deployment.
- kv_conn = dis.create_kv_conn(consul_host)
- return dis.get_kv_value(kv_conn, "docker_plugin/docker_logins")
-
-
-# Lifecycle interface calls for dcae.nodes.DockerContainer
-
-def _setup_for_discovery(**kwargs):
- """Setup for config discovery"""
- try:
- name = kwargs['name']
- application_config = kwargs[APPLICATION_CONFIG]
-
- # NOTE: application_config is no longer a json string and is inputed as a
- # YAML map which translates to a dict. We don't have to do any
- # preprocessing anymore.
- conn = dis.create_kv_conn(CONSUL_HOST)
- dis.push_service_component_config(conn, name, application_config)
- return kwargs
- except dis.DiscoveryConnectionError as e:
- raise RecoverableError(e)
- except Exception as e:
- ctx.logger.error("Unexpected error while pushing configuration: {0}"
- .format(str(e)))
- raise NonRecoverableError(e)
-
-def _generate_component_name(**kwargs):
- """Generate component name"""
- service_component_type = kwargs['service_component_type']
- name_override = kwargs['service_component_name_override']
-
- kwargs['name'] = name_override if name_override \
- else dis.generate_service_component_name(service_component_type)
- return kwargs
-
-def _done_for_create(**kwargs):
- """Wrap up create operation"""
- name = kwargs['name']
- kwargs[SERVICE_COMPONENT_NAME] = name
- # All updates to the runtime_properties happens here. I don't see a reason
- # why we shouldn't do this because the context is not being mutated by
- # something else and will keep the other functions pure (pure in the sense
- # not dealing with CloudifyContext).
- ctx.instance.runtime_properties.update(kwargs)
- ctx.logger.info("Done setting up: {0}".format(name))
- return kwargs
-
-
-@merge_inputs_for_create
-@monkeypatch_loggers
-@Policies.gather_policies_to_node()
-@operation
-def create_for_components(**create_inputs):
- """Create step for Docker containers that are components
-
- This interface is responible for:
-
- 1. Generating service component name
- 2. Populating config information into Consul
- """
- _done_for_create(
- **_setup_for_discovery(
- **_generate_component_name(
- **create_inputs)))
-
-
-def _parse_streams(**kwargs):
- """Parse streams and setup for DMaaP plugin"""
- # The DMaaP plugin requires this plugin to set the runtime properties
- # keyed by the node name.
- for stream in kwargs["streams_publishes"]:
- kwargs[stream["name"]] = stream
-
- # NOTE: That the delivery url is constructed and setup in the start operation
- for stream in kwargs["streams_subscribes"]:
- if stream["type"] == "data_router":
- # If either username or password is missing then generate it. The
- # DMaaP plugin doesn't generate them for subscribers.
- # The code and length of username/password are lifted from the DMaaP
- # plugin.
-
- # Don't want to mutate the source
- stream = copy.deepcopy(stream)
- if not stream.get("username", None):
- stream["username"] = utils.random_string(8)
- if not stream.get("password", None):
- stream["password"] = utils.random_string(10)
-
- kwargs[stream["name"]] = stream
-
- return kwargs
-
-
-def _setup_for_discovery_streams(**kwargs):
- """Setup for discovery of streams
-
- Specifically, there's a race condition this call addresses for data router
- subscriber case. The component needs its feed subscriber information but the
- DMaaP plugin doesn't provide this until after the docker plugin start
- operation.
- """
- dr_subs = [kwargs[s["name"]] for s in kwargs["streams_subscribes"] \
- if s["type"] == "data_router"]
-
- if dr_subs:
- dmaap_kv_key = "{0}:dmaap".format(kwargs["name"])
- conn = dis.create_kv_conn(CONSUL_HOST)
-
- def add_feed(dr_sub):
- # delivery url and subscriber id will be fill by the dmaap plugin later
- v = { "location": dr_sub["location"], "delivery_url": None,
- "username": dr_sub["username"], "password": dr_sub["password"],
- "subscriber_id": None }
- return dis.add_to_entry(conn, dmaap_kv_key, dr_sub["name"], v)
-
- try:
- for dr_sub in dr_subs:
- if add_feed(dr_sub) is None:
- raise NonRecoverableError(
- "Failure updating feed streams in Consul")
- except Exception as e:
- raise NonRecoverableError(e)
-
- return kwargs
-
-
-@merge_inputs_for_create
-@monkeypatch_loggers
-@Policies.gather_policies_to_node()
-@operation
-def create_for_components_with_streams(**create_inputs):
- """Create step for Docker containers that are components that use DMaaP
-
- This interface is responible for:
-
- 1. Generating service component name
- 2. Setup runtime properties for DMaaP plugin
- 3. Populating application config into Consul
- 4. Populating DMaaP config for data router subscribers in Consul
- """
- _done_for_create(
- **_setup_for_discovery(
- **_setup_for_discovery_streams(
- **_parse_streams(
- **_generate_component_name(
- **create_inputs)))))
-
-
-@merge_inputs_for_create
-@monkeypatch_loggers
-@operation
-def create_for_platforms(**create_inputs):
- """Create step for Docker containers that are platform components
-
- This interface is responible for:
-
- 1. Populating config information into Consul
- """
- _done_for_create(
- **_setup_for_discovery(
- **create_inputs))
-
-
-def _lookup_service(service_component_name, consul_host=CONSUL_HOST,
- with_port=False):
- conn = dis.create_kv_conn(consul_host)
- results = dis.lookup_service(conn, service_component_name)
-
- if with_port:
- # Just grab first
- result = results[0]
- return "{address}:{port}".format(address=result["ServiceAddress"],
- port=result["ServicePort"])
- else:
- return results[0]["ServiceAddress"]
-
-
-def _verify_container(service_component_name, max_wait, consul_host=CONSUL_HOST):
- """Verify that the container is healthy
-
- Args:
- -----
- max_wait (integer): limit to how may attempts to make which translates to
- seconds because each sleep is one second. 0 means infinite.
-
- Return:
- -------
- True if component is healthy else a DockerPluginDeploymentError exception
- will be raised.
- """
- num_attempts = 1
-
- while True:
- if dis.is_healthy(consul_host, service_component_name):
- return True
- else:
- num_attempts += 1
-
- if max_wait > 0 and max_wait < num_attempts:
- raise DockerPluginDeploymentError("Container never became healthy")
-
- time.sleep(1)
-
-
-def _create_and_start_container(container_name, image, docker_host,
- consul_host=CONSUL_HOST, **kwargs):
- """Create and start Docker container
-
- This is the function that actually does more of the heavy lifting including
- resolving the docker host to connect and common things to do in setting up
- docker containers like making sure CONSUL_HOST gets set as the local docker
- host ip.
-
- This method raises DockerPluginDependencyNotReadyError
- """
- try:
- # Setup for Docker operations
-
- docker_host_ip = _lookup_service(docker_host, consul_host=consul_host)
-
- logins = _get_docker_logins(consul_host=consul_host)
- client = doc.create_client(docker_host_ip, DOCKER_PORT, logins=logins)
-
- hcp = doc.add_host_config_params_volumes(volumes=kwargs.get("volumes",
- None))
- hcp = doc.add_host_config_params_ports(ports=kwargs.get("ports", None),
- host_config_params=hcp)
- hcp = doc.add_host_config_params_dns(docker_host_ip,
- host_config_params=hcp)
-
- # NOTE: The critical env variable CONSUL_HOST is being assigned the
- # docker host ip itself because there should be a local Consul agent. We
- # want services to register with their local Consul agent.
- # CONFIG_BINDING_SERVICE is here for backwards compatibility. This is a
- # well-known name now.
- platform_envs = { "CONSUL_HOST": docker_host_ip,
- "CONFIG_BINDING_SERVICE": "config_binding_service" }
- # NOTE: The order of the envs being passed in is **important**. The
- # kwargs["envs"] getting passed in last ensures that manual overrides
- # will override the hardcoded envs.
- envs = doc.create_envs(container_name, platform_envs, kwargs.get("envs", {}))
-
- # Do Docker operations
-
- container = doc.create_container(client, image, container_name, envs, hcp)
- container_id = doc.start_container(client, container)
-
- return container_id
- except (doc.DockerConnectionError, dis.DiscoveryConnectionError,
- dis.DiscoveryServiceNotFoundError) as e:
- raise DockerPluginDependencyNotReadyError(e)
-
-
-def _parse_cloudify_context(**kwargs):
- """Parse Cloudify context
-
- Extract what is needed. This is impure function because it requires ctx.
- """
- kwargs["deployment_id"] = ctx.deployment.id
- return kwargs
-
-def _enhance_docker_params(**kwargs):
- """Setup Docker envs"""
- docker_config = kwargs.get("docker_config", {})
-
- envs = kwargs.get("envs", {})
- # NOTE: Healthchecks are optional until prepared to handle use cases that
- # don't necessarily use http
- envs_healthcheck = doc.create_envs_healthcheck(docker_config) \
- if "healthcheck" in docker_config else {}
- envs.update(envs_healthcheck)
-
- # Set tags on this component for its Consul registration as a service
- tags = [kwargs.get("deployment_id", None), kwargs["service_id"]]
- tags = [ str(tag) for tag in tags if tag is not None ]
- # Registrator will use this to register this component with tags. Must be
- # comma delimited.
- envs["SERVICE_TAGS"] = ",".join(tags)
-
- kwargs["envs"] = envs
-
- def combine_params(key, docker_config, kwargs):
- v = docker_config.get(key, []) + kwargs.get(key, [])
- if v:
- kwargs[key] = v
- return kwargs
-
- # Add the lists of ports and volumes unintelligently - meaning just add the
- # lists together with no deduping.
- kwargs = combine_params("ports", docker_config, kwargs)
- kwargs = combine_params("volumes", docker_config, kwargs)
-
- return kwargs
-
-def _create_and_start_component(**kwargs):
- """Create and start component (container)"""
- image = kwargs["image"]
- service_component_name = kwargs[SERVICE_COMPONENT_NAME]
- docker_host = kwargs[SELECTED_CONTAINER_DESTINATION]
- # Need to be picky and manually select out pieces because just using kwargs
- # which contains everything confused the execution of
- # _create_and_start_container because duplicate variables exist
- sub_kwargs = { "volumes": kwargs.get("volumes", []),
- "ports": kwargs.get("ports", None), "envs": kwargs.get("envs", {}) }
-
- container_id = _create_and_start_container(service_component_name, image,
- docker_host, **sub_kwargs)
- kwargs[CONTAINER_ID] = container_id
-
- # TODO: Use regular logging here
- ctx.logger.info("Container started: {0}, {1}".format(container_id,
- service_component_name))
-
- return kwargs
-
-def _verify_component(**kwargs):
- """Verify component (container) is healthy"""
- service_component_name = kwargs[SERVICE_COMPONENT_NAME]
- # TODO: "Consul doesn't make its first health check immediately upon registration.
- # Instead it waits for the health check interval to pass."
- # Possible enhancement is to read the interval (and possibly the timeout) from
- # docker_config and multiply that by a number to come up with a more suitable
- # max_wait.
- max_wait = kwargs.get("max_wait", 300)
-
- # Verify that the container is healthy
-
- if _verify_container(service_component_name, max_wait):
- container_id = kwargs[CONTAINER_ID]
- service_component_name = kwargs[SERVICE_COMPONENT_NAME]
-
- # TODO: Use regular logging here
- ctx.logger.info("Container is healthy: {0}, {1}".format(container_id,
- service_component_name))
-
- return kwargs
-
-def _done_for_start(**kwargs):
- ctx.instance.runtime_properties.update(kwargs)
- ctx.logger.info("Done starting: {0}".format(kwargs["name"]))
- return kwargs
-
-@wrap_error_handling_start
-@merge_inputs_for_start
-@monkeypatch_loggers
-@operation
-def create_and_start_container_for_components(**start_inputs):
- """Create Docker container and start for components
-
- This operation method is to be used with the DockerContainerForComponents
- node type. After launching the container, the plugin will verify with Consul
- that the app is up and healthy before terminating.
- """
- _done_for_start(
- **_verify_component(
- **_create_and_start_component(
- **_enhance_docker_params(
- **_parse_cloudify_context(**start_inputs)))))
-
-
-def _update_delivery_url(**kwargs):
- """Update the delivery url for data router subscribers"""
- dr_subs = [kwargs[s["name"]] for s in kwargs["streams_subscribes"] \
- if s["type"] == "data_router"]
-
- if dr_subs:
- service_component_name = kwargs[SERVICE_COMPONENT_NAME]
- # TODO: Should NOT be setting up the delivery url with ip addresses
- # because in the https case, this will not work because data router does
- # a certificate validation using the fqdn.
- subscriber_host = _lookup_service(service_component_name, with_port=True)
-
- for dr_sub in dr_subs:
- scheme = dr_sub["scheme"] if "scheme" in dr_sub else DEFAULT_SCHEME
- if "route" not in dr_sub:
- raise NonRecoverableError("'route' key missing from data router subscriber")
- path = dr_sub["route"]
- dr_sub["delivery_url"] = "{scheme}://{host}/{path}".format(
- scheme=scheme, host=subscriber_host, path=path)
- kwargs[dr_sub["name"]] = dr_sub
-
- return kwargs
-
-@wrap_error_handling_start
-@merge_inputs_for_start
-@monkeypatch_loggers
-@operation
-def create_and_start_container_for_components_with_streams(**start_inputs):
- """Create Docker container and start for components that have streams
-
- This operation method is to be used with the DockerContainerForComponents
- node type. After launching the container, the plugin will verify with Consul
- that the app is up and healthy before terminating.
- """
- _done_for_start(
- **_update_delivery_url(
- **_verify_component(
- **_create_and_start_component(
- **_enhance_docker_params(
- **_parse_cloudify_context(**start_inputs))))))
-
-
-@wrap_error_handling_start
-@monkeypatch_loggers
-@operation
-def create_and_start_container_for_platforms(**kwargs):
- """Create Docker container and start for platform services
-
- This operation method is to be used with the DockerContainerForPlatforms
- node type. After launching the container, the plugin will verify with Consul
- that the app is up and healthy before terminating.
- """
- image = ctx.node.properties["image"]
- docker_config = ctx.node.properties.get("docker_config", {})
- service_component_name = ctx.node.properties["name"]
-
- docker_host = ctx.instance.runtime_properties[SELECTED_CONTAINER_DESTINATION]
-
- envs = kwargs.get("envs", {})
- # NOTE: Healthchecks are optional until prepared to handle use cases that
- # don't necessarily use http
- envs_healthcheck = doc.create_envs_healthcheck(docker_config) \
- if "healthcheck" in docker_config else {}
- envs.update(envs_healthcheck)
- kwargs["envs"] = envs
-
- host_port = ctx.node.properties["host_port"]
- container_port = ctx.node.properties["container_port"]
-
- # Cloudify properties are all required and Cloudify complains that None
- # is not a valid type for integer. Defaulting to 0 to indicate to not
- # use this and not to set a specific port mapping in cases like service
- # change handler.
- if host_port != 0 and container_port != 0:
- # Doing this because other nodes might want to use this property
- port_mapping = "{cp}:{hp}".format(cp=container_port, hp=host_port)
- ports = kwargs.get("ports", []) + [ port_mapping ]
- kwargs["ports"] = ports
- if "ports" not in kwargs:
- ctx.logger.warn("No port mappings defined. Will randomly assign port.")
-
- container_id = _create_and_start_container(service_component_name, image,
- docker_host, **kwargs)
- ctx.instance.runtime_properties[CONTAINER_ID] = container_id
-
- ctx.logger.info("Container started: {0}, {1}".format(container_id,
- service_component_name))
-
- # Verify that the container is healthy
-
- max_wait = kwargs.get("max_wait", 300)
-
- if _verify_container(service_component_name, max_wait):
- ctx.logger.info("Container is healthy: {0}, {1}".format(container_id,
- service_component_name))
-
-
-@wrap_error_handling_start
-@monkeypatch_loggers
-@operation
-def create_and_start_container(**kwargs):
- """Create Docker container and start"""
- service_component_name = ctx.node.properties["name"]
- ctx.instance.runtime_properties[SERVICE_COMPONENT_NAME] = service_component_name
-
- image = ctx.node.properties["image"]
- docker_host = ctx.instance.runtime_properties[SELECTED_CONTAINER_DESTINATION]
-
- container_id = _create_and_start_container(service_component_name, image,
- docker_host, **kwargs)
- ctx.instance.runtime_properties[CONTAINER_ID] = container_id
-
- ctx.logger.info("Container started: {0}, {1}".format(container_id,
- service_component_name))
-
-
-@monkeypatch_loggers
-@operation
-def stop_and_remove_container(**kwargs):
- """Stop and remove Docker container"""
- try:
- docker_host = ctx.instance.runtime_properties[SELECTED_CONTAINER_DESTINATION]
-
- docker_host_ip = _lookup_service(docker_host)
-
- logins = _get_docker_logins()
- client = doc.create_client(docker_host_ip, DOCKER_PORT, logins=logins)
-
- container_id = ctx.instance.runtime_properties[CONTAINER_ID]
- doc.stop_then_remove_container(client, container_id)
-
- cleanup_image = kwargs.get("cleanup_image", False)
-
- if cleanup_image:
- image = ctx.node.properties["image"]
-
- if doc.remove_image(client, image):
- ctx.logger.info("Removed Docker image: {0}".format(image))
- else:
- ctx.logger.warn("Couldnot remove Docker image: {0}".format(image))
- except (doc.DockerConnectionError, dis.DiscoveryConnectionError,
- dis.DiscoveryServiceNotFoundError) as e:
- raise RecoverableError(e)
- except Exception as e:
- ctx.logger.error("Unexpected error while stopping container: {0}"
- .format(str(e)))
- raise NonRecoverableError(e)
-
-@monkeypatch_loggers
-@Policies.cleanup_policies_on_node
-@operation
-def cleanup_discovery(**kwargs):
- """Delete configuration from Consul"""
- service_component_name = ctx.instance.runtime_properties[SERVICE_COMPONENT_NAME]
-
- try:
- conn = dis.create_kv_conn(CONSUL_HOST)
- dis.remove_service_component_config(conn, service_component_name)
- except dis.DiscoveryConnectionError as e:
- raise RecoverableError(e)
-
-
-def _notify_container(**kwargs):
- """Notify container using the policy section in the docker_config"""
- dc = kwargs["docker_config"]
-
- if "policy" in dc:
- if dc["policy"]["trigger_type"] == "docker":
- # REVIEW: Need to finalize on the docker config policy data structure
- script_path = dc["policy"]["script_path"]
- updated_policies = kwargs["updated_policies"]
- removed_policies = kwargs["removed_policies"]
- policies = kwargs["policies"]
- cmd = doc.build_policy_update_cmd(script_path, use_sh=False,
- msg_type="policies",
- updated_policies=updated_policies,
- removed_policies=removed_policies,
- policies=policies
- )
-
- docker_host = kwargs[SELECTED_CONTAINER_DESTINATION]
- docker_host_ip = _lookup_service(docker_host)
- logins = _get_docker_logins()
- client = doc.create_client(docker_host_ip, DOCKER_PORT, logins=logins)
-
- container_id = kwargs["container_id"]
-
- doc.notify_for_policy_update(client, container_id, cmd)
- # else the default is no trigger
-
- return kwargs
-
-@monkeypatch_loggers
-@Policies.update_policies_on_node()
-@operation
-def policy_update(updated_policies, removed_policies=None, policies=None, **kwargs):
- """Policy update task
-
- This method is responsible for updating the application configuration and
- notifying the applications that the change has occurred. This is to be used
- for the dcae.interfaces.policy.policy_update operation.
-
- :updated_policies: contains the list of changed policy-configs when configs_only=True
- (default) Use configs_only=False to bring the full policy objects in :updated_policies:.
- """
- update_inputs = copy.deepcopy(ctx.instance.runtime_properties)
- update_inputs["updated_policies"] = updated_policies
- update_inputs["removed_policies"] = removed_policies
- update_inputs["policies"] = policies
-
- _notify_container(**update_inputs)
-
-
-# Lifecycle interface calls for dcae.nodes.DockerHost
-
-
-@monkeypatch_loggers
-@operation
-def select_docker_host(**kwargs):
- selected_docker_host = ctx.node.properties['docker_host_override']
- name_search = ctx.node.properties['name_search']
- location_id = ctx.node.properties['location_id']
-
- if selected_docker_host:
- ctx.instance.runtime_properties[SERVICE_COMPONENT_NAME] = selected_docker_host
- ctx.logger.info("Selected Docker host: {0}".format(selected_docker_host))
- else:
- try:
- conn = dis.create_kv_conn(CONSUL_HOST)
- names = dis.search_services(conn, name_search, [location_id])
- ctx.logger.info("Docker hosts found: {0}".format(names))
- # Randomly choose one
- ctx.instance.runtime_properties[SERVICE_COMPONENT_NAME] = random.choice(names)
- except (dis.DiscoveryConnectionError, dis.DiscoveryServiceNotFoundError) as e:
- raise RecoverableError(e)
- except Exception as e:
- raise NonRecoverableError(e)
-
-@operation
-def unselect_docker_host(**kwargs):
- del ctx.instance.runtime_properties[SERVICE_COMPONENT_NAME]
- ctx.logger.info("Unselected Docker host")
-
diff --git a/docker/dockerplugin/utils.py b/docker/dockerplugin/utils.py
deleted file mode 100644
index 6475aaa..0000000
--- a/docker/dockerplugin/utils.py
+++ /dev/null
@@ -1,44 +0,0 @@
-# ============LICENSE_START=======================================================
-# org.onap.dcae
-# ================================================================================
-# Copyright (c) 2017 AT&T Intellectual Property. All rights reserved.
-# Copyright (c) 2019 Pantheon.tech. All rights reserved.
-# ================================================================================
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-# ============LICENSE_END=========================================================
-#
-# ECOMP is a trademark and service mark of AT&T Intellectual Property.
-
-import string
-import random
-import collections
-
-
-def random_string(n):
- """Random generate an ascii string of "n" length"""
- corpus = string.ascii_lowercase + string.ascii_uppercase + string.digits
- return ''.join(random.choice(corpus) for x in range(n))
-
-
-def update_dict(d, u):
- """Recursively updates dict
-
- Update dict d with dict u
- """
- for k, v in u.items():
- if isinstance(v, collections.Mapping):
- r = update_dict(d.get(k, {}), v)
- d[k] = r
- else:
- d[k] = u[k]
- return d