diff options
Diffstat (limited to 'docker/dockerplugin')
-rw-r--r-- | docker/dockerplugin/__init__.py | 3 | ||||
-rw-r--r-- | docker/dockerplugin/discovery.py | 17 | ||||
-rw-r--r-- | docker/dockerplugin/tasks.py | 106 |
3 files changed, 117 insertions, 9 deletions
diff --git a/docker/dockerplugin/__init__.py b/docker/dockerplugin/__init__.py index ef1bfec..669e196 100644 --- a/docker/dockerplugin/__init__.py +++ b/docker/dockerplugin/__init__.py @@ -27,4 +27,5 @@ from .tasks import create_for_components, create_for_components_with_streams, \ create_and_start_container_for_components_with_streams, \ create_for_platforms, create_and_start_container, \ create_and_start_container_for_components, create_and_start_container_for_platforms, \ - stop_and_remove_container, cleanup_discovery, select_docker_host, unselect_docker_host + stop_and_remove_container, cleanup_discovery, select_docker_host, unselect_docker_host, \ + policy_update diff --git a/docker/dockerplugin/discovery.py b/docker/dockerplugin/discovery.py index 03a51f6..8361c13 100644 --- a/docker/dockerplugin/discovery.py +++ b/docker/dockerplugin/discovery.py @@ -38,6 +38,9 @@ class DiscoveryConnectionError(RuntimeError): class DiscoveryServiceNotFoundError(RuntimeError): pass +class DiscoveryKVEntryNotFoundError(RuntimeError): + pass + def _wrap_consul_call(consul_func, *args, **kwargs): """Wrap Consul call to map errors""" @@ -84,6 +87,20 @@ def remove_service_component_config(kv_conn, service_component_name): kv_delete_func(service_component_name) +def get_kv_value(kv_conn, key): + """Get a key-value entry's value from Consul + + Raises DiscoveryKVEntryNotFoundError if entry not found + """ + kv_get_func = partial(_wrap_consul_call, kv_conn.kv.get) + (index, val) = kv_get_func(key) + + if val: + return json.loads(val['Value']) # will raise ValueError if not JSON, let it propagate + else: + raise DiscoveryKVEntryNotFoundError("{0} kv entry not found".format(key)) + + def _create_rel_key(service_component_name): return "{0}:rel".format(service_component_name) diff --git a/docker/dockerplugin/tasks.py b/docker/dockerplugin/tasks.py index 837c1e9..e42e47d 100644 --- a/docker/dockerplugin/tasks.py +++ b/docker/dockerplugin/tasks.py @@ -25,6 +25,7 @@ from cloudify import ctx from cloudify.decorators import operation from cloudify.exceptions import NonRecoverableError, RecoverableError import dockering as doc +from dcaepolicy import Policies, POLICIES, POLICY_MESSAGE_TYPE from dockerplugin import discovery as dis from dockerplugin.decorators import monkeypatch_loggers, wrap_error_handling_start, \ merge_inputs_for_start @@ -46,6 +47,29 @@ DEFAULT_SCHEME = "http" SERVICE_COMPONENT_NAME = "service_component_name" SELECTED_CONTAINER_DESTINATION = "selected_container_destination" CONTAINER_ID = "container_id" +APPLICATION_CONFIG = "application_config" + + +# Utility methods + +def _get_docker_logins(consul_host=CONSUL_HOST): + """Get Docker logins + + The assumption is that all Docker logins to be used will be available in + Consul's key-value store under "docker_plugin/docker_logins" as a list of + json objects where each object is a single login: + + [{ "username": "dcae_dev_ro", "password": "att123ro", + "registry": "nexus01.research.att.com:18443" }] + """ + # REVIEW: The error handling may have to be re-examined. The current thought is + # that the logins *must* be setup even with an empty list otherwise the task + # will fail (fail fast). One alterative is to pass back empty list upon any + # issues but this would push potential issues to a later point of the + # deployment. + kv_conn = dis.create_kv_conn(consul_host) + return dis.get_kv_value(kv_conn, "docker_plugin/docker_logins") + # Lifecycle interface calls for dcae.nodes.DockerContainer @@ -53,7 +77,7 @@ def _setup_for_discovery(**kwargs): """Setup for config discovery""" try: name = kwargs['name'] - application_config = kwargs['application_config'] + application_config = kwargs[APPLICATION_CONFIG] # NOTE: application_config is no longer a json string and is inputed as a # YAML map which translates to a dict. We don't have to do any @@ -89,8 +113,14 @@ def _done_for_create(**kwargs): ctx.logger.info("Done setting up: {0}".format(name)) return kwargs +def _merge_policy_updates(**kwargs): + app_config = kwargs[APPLICATION_CONFIG] + kwargs[APPLICATION_CONFIG] = Policies.shallow_merge_policies_into(app_config) + return kwargs + @monkeypatch_loggers +@Policies.gather_policies_to_node @operation def create_for_components(**kwargs): """Create step for Docker containers that are components @@ -102,8 +132,9 @@ def create_for_components(**kwargs): """ _done_for_create( **_setup_for_discovery( - **_generate_component_name( - **ctx.node.properties))) + **_merge_policy_updates( + **_generate_component_name( + **ctx.node.properties)))) def _parse_streams(**kwargs): @@ -171,6 +202,7 @@ def _setup_for_discovery_streams(**kwargs): @monkeypatch_loggers +@Policies.gather_policies_to_node @operation def create_for_components_with_streams(**kwargs): """Create step for Docker containers that are components that use DMaaP @@ -185,9 +217,10 @@ def create_for_components_with_streams(**kwargs): _done_for_create( **_setup_for_discovery( **_setup_for_discovery_streams( - **_parse_streams( - **_generate_component_name( - **ctx.node.properties))))) + **_merge_policy_updates( + **_parse_streams( + **_generate_component_name( + **ctx.node.properties)))))) @monkeypatch_loggers @@ -261,7 +294,8 @@ def _create_and_start_container(container_name, image, docker_host, docker_host_ip = _lookup_service(docker_host, consul_host=consul_host) - client = doc.create_client(docker_host_ip, DOCKER_PORT) + logins = _get_docker_logins(consul_host=consul_host) + client = doc.create_client(docker_host_ip, DOCKER_PORT, logins=logins) hcp = doc.add_host_config_params_volumes(volumes=kwargs.get("volumes", None)) @@ -523,7 +557,8 @@ def stop_and_remove_container(**kwargs): docker_host_ip = _lookup_service(docker_host) - client = doc.create_client(docker_host_ip, DOCKER_PORT) + logins = _get_docker_logins() + client = doc.create_client(docker_host_ip, DOCKER_PORT, logins=logins) container_id = ctx.instance.runtime_properties[CONTAINER_ID] doc.stop_then_remove_container(client, container_id) @@ -558,6 +593,61 @@ def cleanup_discovery(**kwargs): raise RecoverableError(e) +def _notify_container(**kwargs): + """Notify container using the policy section in the docker_config""" + dc = kwargs["docker_config"] + + if "policy" in dc: + if dc["policy"]["trigger_type"] == "docker": + # REVIEW: Need to finalize on the docker config policy data structure + script_path = dc["policy"]["script_path"] + app_config = kwargs["application_config"] + updated_policies = kwargs["updated_policies"] + cmd = doc.build_policy_update_cmd(script_path, use_sh=False, + updated_policies=updated_policies, + application_config=app_config) + + docker_host = kwargs[SELECTED_CONTAINER_DESTINATION] + docker_host_ip = _lookup_service(docker_host) + logins = _get_docker_logins() + client = doc.create_client(docker_host_ip, DOCKER_PORT, logins=logins) + + container_id = kwargs["container_id"] + + doc.notify_for_policy_update(client, container_id, cmd) + # else the default is no trigger + + return kwargs + +def _done_for_policy_update(**kwargs): + name = kwargs['name'] + ctx.instance.runtime_properties.update(kwargs) + ctx.logger.info("Done updating for policy: {0}".format(name)) + return kwargs + +@monkeypatch_loggers +@Policies.update_policies_on_node(configs_only=True) +@operation +def policy_update(updated_policies, **kwargs): + """Policy update task + + This method is responsible for updating the application configuration and + notifying the applications that the change has occurred. This is to be used + for the dcae.interfaces.policy.policy_update operation. + + :updated_policies: contains the list of changed policy-configs when configs_only=True + (default) Use configs_only=False to bring the full policy objects in :updated_policies:. + """ + update_inputs = copy.deepcopy(ctx.instance.runtime_properties) + update_inputs["updated_policies"] = updated_policies + + # Merge in policy updates into application config and make available + _done_for_policy_update( + **_notify_container( + **_setup_for_discovery( + **_merge_policy_updates(**update_inputs)))) + + # Lifecycle interface calls for dcae.nodes.DockerHost |