From 9d0a254d0bcd6e966c47f2656527274ab1787d97 Mon Sep 17 00:00:00 2001 From: Michael Hwang Date: Tue, 12 Sep 2017 17:28:37 -0400 Subject: Combine all task inputs for create and start Change-Id: Ia86d3b26b5ecccd636fb171b3967f924b0cb1250 Issue-Id: DCAEGEN2-91 Signed-off-by: Michael Hwang --- docker/dockerplugin/decorators.py | 42 +++++++++++++++++++++++++++++---------- docker/dockerplugin/tasks.py | 17 +++++++++------- docker/dockerplugin/utils.py | 15 ++++++++++++++ 3 files changed, 57 insertions(+), 17 deletions(-) diff --git a/docker/dockerplugin/decorators.py b/docker/dockerplugin/decorators.py index 089231a..f83263b 100644 --- a/docker/dockerplugin/decorators.py +++ b/docker/dockerplugin/decorators.py @@ -25,6 +25,7 @@ from dockering import utils as doc from dockerplugin import discovery as dis from dockerplugin.exceptions import DockerPluginDeploymentError, \ DockerPluginDependencyNotReadyError +from dockerplugin import utils def monkeypatch_loggers(task_func): @@ -62,19 +63,40 @@ def wrap_error_handling_start(task_start_func): return wrapper -def merge_inputs_for_start(task_start_func): +def _wrapper_merge_inputs(task_func, properties, **kwargs): + """Merge Cloudify properties with input kwargs before calling task func""" + inputs = copy.deepcopy(properties) + # Recursively update + utils.update_dict(inputs, kwargs) + + # Apparently kwargs contains "ctx" which is cloudify.context.CloudifyContext + # This has to be removed and not copied into runtime_properties else you get + # JSON serialization errors. + if "ctx" in inputs: + del inputs["ctx"] + + return task_func(**inputs) + +def merge_inputs_for_create(task_create_func): """Merge all inputs for start operation into one dict""" - def wrapper (**kwargs): - start_inputs = copy.deepcopy(ctx.instance.runtime_properties) - start_inputs.update(kwargs) + # Needed to wrap the wrapper because I was seeing issues with + # "RuntimeError: No context set in current execution thread" + def wrapper(**kwargs): + # NOTE: ctx.node.properties is an ImmutableProperties instance which is + # why it is passed into a mutable dict so that it can be deep copied + return _wrapper_merge_inputs(task_create_func, + dict(ctx.node.properties), **kwargs) + + return wrapper - # Apparently kwargs contains "ctx" which is cloudify.context.CloudifyContext - # This has to be removed and not copied into runtime_properties else you get - # JSON serialization errors. - if "ctx" in start_inputs: - del start_inputs["ctx"] +def merge_inputs_for_start(task_start_func): + """Merge all inputs for start operation into one dict""" - return task_start_func(**start_inputs) + # Needed to wrap the wrapper because I was seeing issues with + # "RuntimeError: No context set in current execution thread" + def wrapper(**kwargs): + return _wrapper_merge_inputs(task_start_func, + ctx.instance.runtime_properties, **kwargs) return wrapper diff --git a/docker/dockerplugin/tasks.py b/docker/dockerplugin/tasks.py index e42e47d..9d33a8b 100644 --- a/docker/dockerplugin/tasks.py +++ b/docker/dockerplugin/tasks.py @@ -28,7 +28,7 @@ import dockering as doc from dcaepolicy import Policies, POLICIES, POLICY_MESSAGE_TYPE from dockerplugin import discovery as dis from dockerplugin.decorators import monkeypatch_loggers, wrap_error_handling_start, \ - merge_inputs_for_start + merge_inputs_for_start, merge_inputs_for_create from dockerplugin.exceptions import DockerPluginDeploymentError, \ DockerPluginDependencyNotReadyError from dockerplugin import utils @@ -119,10 +119,11 @@ def _merge_policy_updates(**kwargs): return kwargs +@merge_inputs_for_create @monkeypatch_loggers @Policies.gather_policies_to_node @operation -def create_for_components(**kwargs): +def create_for_components(**create_inputs): """Create step for Docker containers that are components This interface is responible for: @@ -134,7 +135,7 @@ def create_for_components(**kwargs): **_setup_for_discovery( **_merge_policy_updates( **_generate_component_name( - **ctx.node.properties)))) + **create_inputs)))) def _parse_streams(**kwargs): @@ -201,10 +202,11 @@ def _setup_for_discovery_streams(**kwargs): return kwargs +@merge_inputs_for_create @monkeypatch_loggers @Policies.gather_policies_to_node @operation -def create_for_components_with_streams(**kwargs): +def create_for_components_with_streams(**create_inputs): """Create step for Docker containers that are components that use DMaaP This interface is responible for: @@ -220,12 +222,13 @@ def create_for_components_with_streams(**kwargs): **_merge_policy_updates( **_parse_streams( **_generate_component_name( - **ctx.node.properties)))))) + **create_inputs)))))) +@merge_inputs_for_create @monkeypatch_loggers @operation -def create_for_platforms(**kwargs): +def create_for_platforms(**create_inputs): """Create step for Docker containers that are platform components This interface is responible for: @@ -234,7 +237,7 @@ def create_for_platforms(**kwargs): """ _done_for_create( **_setup_for_discovery( - **ctx.node.properties)) + **create_inputs)) def _lookup_service(service_component_name, consul_host=CONSUL_HOST, diff --git a/docker/dockerplugin/utils.py b/docker/dockerplugin/utils.py index ed680c2..c45af68 100644 --- a/docker/dockerplugin/utils.py +++ b/docker/dockerplugin/utils.py @@ -20,9 +20,24 @@ import string import random +import collections def random_string(n): """Random generate an ascii string of "n" length""" corpus = string.ascii_lowercase + string.ascii_uppercase + string.digits return ''.join(random.choice(corpus) for x in range(n)) + + +def update_dict(d, u): + """Recursively updates dict + + Update dict d with dict u + """ + for k, v in u.iteritems(): + if isinstance(v, collections.Mapping): + r = update_dict(d.get(k, {}), v) + d[k] = r + else: + d[k] = u[k] + return d -- cgit 1.2.3-korg