diff options
author | Michael Hwang <mhwang@research.att.com> | 2017-09-06 17:46:45 -0400 |
---|---|---|
committer | Michael Hwang <mhwang@research.att.com> | 2017-09-06 17:50:51 -0400 |
commit | da9cdd6a23d2fa2748795a6c83b14cc4d3fa3d13 (patch) | |
tree | a9e7a8580b992ebec517edb67b8e2e612dd4ecec /docker/dockerplugin | |
parent | fcb0bb5252f8e90ce065e7e6f9034f9d1a4ef8ce (diff) |
Enhance to query Consul for target docker host
* `SelectedDockerHost` actually queries by a name stem and location
* Shorten name
* Tag components with deployment id
Change-Id: I715f1de25fa047ce70eb26a5cc7615cfd3b408e7
Issue-ID: DCAEGEN2-91
Signed-off-by: Michael Hwang <mhwang@research.att.com>
Diffstat (limited to 'docker/dockerplugin')
-rw-r--r-- | docker/dockerplugin/discovery.py | 50 | ||||
-rw-r--r-- | docker/dockerplugin/tasks.py | 45 |
2 files changed, 79 insertions, 16 deletions
diff --git a/docker/dockerplugin/discovery.py b/docker/dockerplugin/discovery.py index 32a8cd0..03a51f6 100644 --- a/docker/dockerplugin/discovery.py +++ b/docker/dockerplugin/discovery.py @@ -47,19 +47,17 @@ def _wrap_consul_call(consul_func, *args, **kwargs): raise DiscoveryConnectionError(e) -def generate_service_component_name(service_component_type, service_id, location_id): +def generate_service_component_name(service_component_type): """Generate service component id used to pass into the service component instance and used as the key to the service component configuration. Format: - <service component id>.<service component type>.<service id>.<location id>.dcae.com - - TODO: The format will evolve. + <service component id>_<service component type> """ # Random generated - service_component_id = str(uuid.uuid4()) - return "{0}.{1}.{2}.{3}.dcae.com".format( - service_component_id, service_component_type, service_id, location_id) + # Copied from cdap plugin + return "{0}_{1}".format(str(uuid.uuid4()).replace("-",""), + service_component_type) def create_kv_conn(host): @@ -204,3 +202,41 @@ def add_to_entry(conn, key, add_name, add_value): updated = conn.kv.put(key, new_vstring, cas=mod_index) # if the key has changed since retrieval, this will return false if updated: return v + + +def _find_matching_services(services, name_search, tags): + """Find matching services given search criteria""" + def is_match(service): + srv_name, srv_tags = service + return name_search in srv_name and \ + all(map(lambda tag: tag in srv_tags, tags)) + + return [ srv[0] for srv in services.items() if is_match(srv) ] + +def search_services(conn, name_search, tags): + """Search for services that match criteria + + Args: + ----- + name_search: (string) Name to search for as a substring + tags: (list) List of strings that are tags. A service must match **all** the + tags in the list. + + Retruns: + -------- + List of names of services that matched + """ + # srvs is dict where key is service name and value is list of tags + catalog_get_services_func = partial(_wrap_consul_call, conn.catalog.services) + index, srvs = catalog_get_services_func() + + if srvs: + matches = _find_matching_services(srvs, name_search, tags) + + if matches: + return matches + + raise DiscoveryServiceNotFoundError( + "No matches found: {0}, {1}".format(name_search, tags)) + else: + raise DiscoveryServiceNotFoundError("No services found") diff --git a/docker/dockerplugin/tasks.py b/docker/dockerplugin/tasks.py index a41f143..837c1e9 100644 --- a/docker/dockerplugin/tasks.py +++ b/docker/dockerplugin/tasks.py @@ -20,7 +20,7 @@ # Lifecycle interface calls for DockerContainer -import json, time, copy +import json, time, copy, random from cloudify import ctx from cloudify.decorators import operation from cloudify.exceptions import NonRecoverableError, RecoverableError @@ -71,14 +71,10 @@ def _setup_for_discovery(**kwargs): def _generate_component_name(**kwargs): """Generate component name""" service_component_type = kwargs['service_component_type'] - service_id = kwargs['service_id'] - location_id = kwargs['location_id'] - name_override = kwargs['service_component_name_override'] kwargs['name'] = name_override if name_override \ - else dis.generate_service_component_name(service_component_type, - service_id, location_id) + else dis.generate_service_component_name(service_component_type) return kwargs def _done_for_create(**kwargs): @@ -297,6 +293,14 @@ def _create_and_start_container(container_name, image, docker_host, raise DockerPluginDependencyNotReadyError(e) +def _parse_cloudify_context(**kwargs): + """Parse Cloudify context + + Extract what is needed. This is impure function because it requires ctx. + """ + kwargs["deployment_id"] = ctx.deployment.id + return kwargs + def _enhance_docker_params(**kwargs): """Setup Docker envs""" docker_config = kwargs.get("docker_config", {}) @@ -307,6 +311,14 @@ def _enhance_docker_params(**kwargs): envs_healthcheck = doc.create_envs_healthcheck(docker_config) \ if "healthcheck" in docker_config else {} envs.update(envs_healthcheck) + + # Set tags on this component for its Consul registration as a service + tags = [kwargs.get("deployment_id", None), kwargs["service_id"]] + tags = [ str(tag) for tag in tags if tag is not None ] + # Registrator will use this to register this component with tags. Must be + # comma delimited. + envs["SERVICE_TAGS"] = ",".join(tags) + kwargs["envs"] = envs def combine_params(key, docker_config, kwargs): @@ -384,7 +396,8 @@ def create_and_start_container_for_components(**start_inputs): _done_for_start( **_verify_component( **_create_and_start_component( - **_enhance_docker_params(**start_inputs)))) + **_enhance_docker_params( + **_parse_cloudify_context(**start_inputs))))) def _update_delivery_url(**kwargs): @@ -423,7 +436,8 @@ def create_and_start_container_for_components_with_streams(**start_inputs): **_update_delivery_url( **_verify_component( **_create_and_start_component( - **_enhance_docker_params(**start_inputs))))) + **_enhance_docker_params( + **_parse_cloudify_context(**start_inputs)))))) @wrap_error_handling_start @@ -546,15 +560,28 @@ def cleanup_discovery(**kwargs): # Lifecycle interface calls for dcae.nodes.DockerHost + +@monkeypatch_loggers @operation def select_docker_host(**kwargs): selected_docker_host = ctx.node.properties['docker_host_override'] + name_search = ctx.node.properties['name_search'] + location_id = ctx.node.properties['location_id'] if selected_docker_host: ctx.instance.runtime_properties[SERVICE_COMPONENT_NAME] = selected_docker_host ctx.logger.info("Selected Docker host: {0}".format(selected_docker_host)) else: - raise NonRecoverableError("Failed to find a suitable Docker host") + try: + conn = dis.create_kv_conn(CONSUL_HOST) + names = dis.search_services(conn, name_search, [location_id]) + ctx.logger.info("Docker hosts found: {0}".format(names)) + # Randomly choose one + ctx.instance.runtime_properties[SERVICE_COMPONENT_NAME] = random.choice(names) + except (dis.DiscoveryConnectionError, dis.DiscoveryServiceNotFoundError) as e: + raise RecoverableError(e) + except Exception as e: + raise NonRecoverableError(e) @operation def unselect_docker_host(**kwargs): |