summaryrefslogtreecommitdiffstats
path: root/docker/dockerplugin
diff options
context:
space:
mode:
Diffstat (limited to 'docker/dockerplugin')
-rw-r--r--docker/dockerplugin/discovery.py50
-rw-r--r--docker/dockerplugin/tasks.py45
2 files changed, 79 insertions, 16 deletions
diff --git a/docker/dockerplugin/discovery.py b/docker/dockerplugin/discovery.py
index 32a8cd0..03a51f6 100644
--- a/docker/dockerplugin/discovery.py
+++ b/docker/dockerplugin/discovery.py
@@ -47,19 +47,17 @@ def _wrap_consul_call(consul_func, *args, **kwargs):
raise DiscoveryConnectionError(e)
-def generate_service_component_name(service_component_type, service_id, location_id):
+def generate_service_component_name(service_component_type):
"""Generate service component id used to pass into the service component
instance and used as the key to the service component configuration.
Format:
- <service component id>.<service component type>.<service id>.<location id>.dcae.com
-
- TODO: The format will evolve.
+ <service component id>_<service component type>
"""
# Random generated
- service_component_id = str(uuid.uuid4())
- return "{0}.{1}.{2}.{3}.dcae.com".format(
- service_component_id, service_component_type, service_id, location_id)
+ # Copied from cdap plugin
+ return "{0}_{1}".format(str(uuid.uuid4()).replace("-",""),
+ service_component_type)
def create_kv_conn(host):
@@ -204,3 +202,41 @@ def add_to_entry(conn, key, add_name, add_value):
updated = conn.kv.put(key, new_vstring, cas=mod_index) # if the key has changed since retrieval, this will return false
if updated:
return v
+
+
+def _find_matching_services(services, name_search, tags):
+ """Find matching services given search criteria"""
+ def is_match(service):
+ srv_name, srv_tags = service
+ return name_search in srv_name and \
+ all(map(lambda tag: tag in srv_tags, tags))
+
+ return [ srv[0] for srv in services.items() if is_match(srv) ]
+
+def search_services(conn, name_search, tags):
+ """Search for services that match criteria
+
+ Args:
+ -----
+ name_search: (string) Name to search for as a substring
+ tags: (list) List of strings that are tags. A service must match **all** the
+ tags in the list.
+
+ Retruns:
+ --------
+ List of names of services that matched
+ """
+ # srvs is dict where key is service name and value is list of tags
+ catalog_get_services_func = partial(_wrap_consul_call, conn.catalog.services)
+ index, srvs = catalog_get_services_func()
+
+ if srvs:
+ matches = _find_matching_services(srvs, name_search, tags)
+
+ if matches:
+ return matches
+
+ raise DiscoveryServiceNotFoundError(
+ "No matches found: {0}, {1}".format(name_search, tags))
+ else:
+ raise DiscoveryServiceNotFoundError("No services found")
diff --git a/docker/dockerplugin/tasks.py b/docker/dockerplugin/tasks.py
index a41f143..837c1e9 100644
--- a/docker/dockerplugin/tasks.py
+++ b/docker/dockerplugin/tasks.py
@@ -20,7 +20,7 @@
# Lifecycle interface calls for DockerContainer
-import json, time, copy
+import json, time, copy, random
from cloudify import ctx
from cloudify.decorators import operation
from cloudify.exceptions import NonRecoverableError, RecoverableError
@@ -71,14 +71,10 @@ def _setup_for_discovery(**kwargs):
def _generate_component_name(**kwargs):
"""Generate component name"""
service_component_type = kwargs['service_component_type']
- service_id = kwargs['service_id']
- location_id = kwargs['location_id']
-
name_override = kwargs['service_component_name_override']
kwargs['name'] = name_override if name_override \
- else dis.generate_service_component_name(service_component_type,
- service_id, location_id)
+ else dis.generate_service_component_name(service_component_type)
return kwargs
def _done_for_create(**kwargs):
@@ -297,6 +293,14 @@ def _create_and_start_container(container_name, image, docker_host,
raise DockerPluginDependencyNotReadyError(e)
+def _parse_cloudify_context(**kwargs):
+ """Parse Cloudify context
+
+ Extract what is needed. This is impure function because it requires ctx.
+ """
+ kwargs["deployment_id"] = ctx.deployment.id
+ return kwargs
+
def _enhance_docker_params(**kwargs):
"""Setup Docker envs"""
docker_config = kwargs.get("docker_config", {})
@@ -307,6 +311,14 @@ def _enhance_docker_params(**kwargs):
envs_healthcheck = doc.create_envs_healthcheck(docker_config) \
if "healthcheck" in docker_config else {}
envs.update(envs_healthcheck)
+
+ # Set tags on this component for its Consul registration as a service
+ tags = [kwargs.get("deployment_id", None), kwargs["service_id"]]
+ tags = [ str(tag) for tag in tags if tag is not None ]
+ # Registrator will use this to register this component with tags. Must be
+ # comma delimited.
+ envs["SERVICE_TAGS"] = ",".join(tags)
+
kwargs["envs"] = envs
def combine_params(key, docker_config, kwargs):
@@ -384,7 +396,8 @@ def create_and_start_container_for_components(**start_inputs):
_done_for_start(
**_verify_component(
**_create_and_start_component(
- **_enhance_docker_params(**start_inputs))))
+ **_enhance_docker_params(
+ **_parse_cloudify_context(**start_inputs)))))
def _update_delivery_url(**kwargs):
@@ -423,7 +436,8 @@ def create_and_start_container_for_components_with_streams(**start_inputs):
**_update_delivery_url(
**_verify_component(
**_create_and_start_component(
- **_enhance_docker_params(**start_inputs)))))
+ **_enhance_docker_params(
+ **_parse_cloudify_context(**start_inputs))))))
@wrap_error_handling_start
@@ -546,15 +560,28 @@ def cleanup_discovery(**kwargs):
# Lifecycle interface calls for dcae.nodes.DockerHost
+
+@monkeypatch_loggers
@operation
def select_docker_host(**kwargs):
selected_docker_host = ctx.node.properties['docker_host_override']
+ name_search = ctx.node.properties['name_search']
+ location_id = ctx.node.properties['location_id']
if selected_docker_host:
ctx.instance.runtime_properties[SERVICE_COMPONENT_NAME] = selected_docker_host
ctx.logger.info("Selected Docker host: {0}".format(selected_docker_host))
else:
- raise NonRecoverableError("Failed to find a suitable Docker host")
+ try:
+ conn = dis.create_kv_conn(CONSUL_HOST)
+ names = dis.search_services(conn, name_search, [location_id])
+ ctx.logger.info("Docker hosts found: {0}".format(names))
+ # Randomly choose one
+ ctx.instance.runtime_properties[SERVICE_COMPONENT_NAME] = random.choice(names)
+ except (dis.DiscoveryConnectionError, dis.DiscoveryServiceNotFoundError) as e:
+ raise RecoverableError(e)
+ except Exception as e:
+ raise NonRecoverableError(e)
@operation
def unselect_docker_host(**kwargs):