diff options
Diffstat (limited to 'azure/aria/aria-extension-cloudify/src/aria/tests/orchestrator/workflows/executor')
6 files changed, 853 insertions, 0 deletions
diff --git a/azure/aria/aria-extension-cloudify/src/aria/tests/orchestrator/workflows/executor/__init__.py b/azure/aria/aria-extension-cloudify/src/aria/tests/orchestrator/workflows/executor/__init__.py new file mode 100644 index 0000000..99d0b39 --- /dev/null +++ b/azure/aria/aria-extension-cloudify/src/aria/tests/orchestrator/workflows/executor/__init__.py @@ -0,0 +1,98 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +import logging +import uuid +from contextlib import contextmanager + +import aria +from aria.modeling import models +from aria.orchestrator.context.common import BaseContext + + +class MockContext(object): + + INSTRUMENTATION_FIELDS = BaseContext.INSTRUMENTATION_FIELDS + + def __init__(self, storage, task_kwargs=None): + self.logger = logging.getLogger('mock_logger') + self._task_kwargs = task_kwargs or {} + self._storage = storage + self.task = MockTask(storage, **task_kwargs) + self.states = [] + self.exception = None + + @property + def serialization_dict(self): + return { + 'context_cls': self.__class__, + 'context': { + 'storage_kwargs': self._storage.serialization_dict, + 'task_kwargs': self._task_kwargs + } + } + + def __getattr__(self, item): + return None + + def close(self): + pass + + @property + def model(self): + return self._storage + + @classmethod + def instantiate_from_dict(cls, storage_kwargs=None, task_kwargs=None): + return cls(storage=aria.application_model_storage(**(storage_kwargs or {})), + task_kwargs=(task_kwargs or {})) + + @property + @contextmanager + def persist_changes(self): + yield + + +class MockActor(object): + def __init__(self): + self.name = 'actor_name' + + +class MockTask(object): + + INFINITE_RETRIES = models.Task.INFINITE_RETRIES + + def __init__(self, model, function, arguments=None, plugin_fk=None): + self.function = self.name = function + self.plugin_fk = plugin_fk + self.arguments = arguments or {} + self.states = [] + self.exception = None + self.id = str(uuid.uuid4()) + self.logger = logging.getLogger() + self.attempts_count = 1 + self.max_attempts = 1 + self.ignore_failure = False + self.interface_name = 'interface_name' + self.operation_name = 'operation_name' + self.actor = MockActor() + self.node = self.actor + self.model = model + + for state in models.Task.STATES: + setattr(self, state.upper(), state) + + @property + def plugin(self): + return self.model.plugin.get(self.plugin_fk) if self.plugin_fk else None diff --git a/azure/aria/aria-extension-cloudify/src/aria/tests/orchestrator/workflows/executor/test_executor.py b/azure/aria/aria-extension-cloudify/src/aria/tests/orchestrator/workflows/executor/test_executor.py new file mode 100644 index 0000000..32a68e0 --- /dev/null +++ b/azure/aria/aria-extension-cloudify/src/aria/tests/orchestrator/workflows/executor/test_executor.py @@ -0,0 +1,149 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +import pytest +import retrying + +try: + import celery as _celery + app = _celery.Celery() + app.conf.update(CELERY_RESULT_BACKEND='amqp://') +except ImportError: + _celery = None + app = None + +import aria +from aria.modeling import models +from aria.orchestrator import events +from aria.orchestrator.workflows.executor import ( + thread, + process, + # celery +) + +import tests +from . import MockContext + + +def _get_function(func): + return '{module}.{func.__name__}'.format(module=__name__, func=func) + + +def execute_and_assert(executor, storage=None): + expected_value = 'value' + successful_task = MockContext( + storage, task_kwargs=dict(function=_get_function(mock_successful_task)) + ) + failing_task = MockContext( + storage, task_kwargs=dict(function=_get_function(mock_failing_task)) + ) + task_with_inputs = MockContext( + storage, + task_kwargs=dict(function=_get_function(mock_task_with_input), + arguments={'input': models.Argument.wrap('input', 'value')}) + ) + + for task in [successful_task, failing_task, task_with_inputs]: + executor.execute(task) + + @retrying.retry(stop_max_delay=10000, wait_fixed=100) + def assertion(): + assert successful_task.states == ['start', 'success'] + assert failing_task.states == ['start', 'failure'] + assert task_with_inputs.states == ['start', 'failure'] + assert isinstance(failing_task.exception, MockException) + assert isinstance(task_with_inputs.exception, MockException) + assert task_with_inputs.exception.message == expected_value + assertion() + + +def test_thread_execute(thread_executor): + execute_and_assert(thread_executor) + + +def test_process_execute(process_executor, storage): + execute_and_assert(process_executor, storage) + + +def mock_successful_task(**_): + pass + + +def mock_failing_task(**_): + raise MockException + + +def mock_task_with_input(input, **_): + raise MockException(input) + +if app: + mock_successful_task = app.task(mock_successful_task) + mock_failing_task = app.task(mock_failing_task) + mock_task_with_input = app.task(mock_task_with_input) + + +class MockException(Exception): + pass + + +@pytest.fixture +def storage(tmpdir): + _storage = aria.application_model_storage(aria.storage.sql_mapi.SQLAlchemyModelAPI, + initiator_kwargs=dict(base_dir=str(tmpdir))) + yield _storage + tests.storage.release_sqlite_storage(_storage) + + +@pytest.fixture(params=[ + (thread.ThreadExecutor, {'pool_size': 1}), + (thread.ThreadExecutor, {'pool_size': 2}), + # subprocess needs to load a tests module so we explicitly add the root directory as if + # the project has been installed in editable mode + # (celery.CeleryExecutor, {'app': app}) +]) +def thread_executor(request): + executor_cls, executor_kwargs = request.param + result = executor_cls(**executor_kwargs) + yield result + result.close() + + +@pytest.fixture +def process_executor(): + result = process.ProcessExecutor(python_path=tests.ROOT_DIR) + yield result + result.close() + + +@pytest.fixture(autouse=True) +def register_signals(): + def start_handler(task, *args, **kwargs): + task.states.append('start') + + def success_handler(task, *args, **kwargs): + task.states.append('success') + + def failure_handler(task, exception, *args, **kwargs): + task.states.append('failure') + task.exception = exception + + events.start_task_signal.connect(start_handler) + events.on_success_task_signal.connect(success_handler) + events.on_failure_task_signal.connect(failure_handler) + yield + events.start_task_signal.disconnect(start_handler) + events.on_success_task_signal.disconnect(success_handler) + events.on_failure_task_signal.disconnect(failure_handler) diff --git a/azure/aria/aria-extension-cloudify/src/aria/tests/orchestrator/workflows/executor/test_process_executor.py b/azure/aria/aria-extension-cloudify/src/aria/tests/orchestrator/workflows/executor/test_process_executor.py new file mode 100644 index 0000000..e050d18 --- /dev/null +++ b/azure/aria/aria-extension-cloudify/src/aria/tests/orchestrator/workflows/executor/test_process_executor.py @@ -0,0 +1,172 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import os +import sys +import time +import Queue +import subprocess + +import pytest +import psutil +import retrying + +import aria + +from aria import operation +from aria.modeling import models +from aria.orchestrator import events +from aria.utils.plugin import create as create_plugin +from aria.orchestrator.workflows.executor import process + +import tests.storage +import tests.resources +from tests.helpers import FilesystemDataHolder +from tests.fixtures import ( # pylint: disable=unused-import + plugins_dir, + plugin_manager, +) +from . import MockContext + + +class TestProcessExecutor(object): + + def test_plugin_execution(self, executor, mock_plugin, model, queue): + ctx = MockContext( + model, + task_kwargs=dict(function='mock_plugin1.operation', plugin_fk=mock_plugin.id) + ) + + executor.execute(ctx) + error = queue.get(timeout=60) + # tests/resources/plugins/mock-plugin1 is the plugin installed + # during this tests setup. The module mock_plugin1 contains a single + # operation named "operation" which calls an entry point defined in the plugin's + # setup.py. This entry points simply prints 'mock-plugin-output' to stdout. + # The "operation" operation that called this subprocess, then raises a RuntimeError + # with that subprocess output as the error message. + # This is what we assert here. This tests checks that both the PYTHONPATH (operation) + # and PATH (entry point) are properly updated in the subprocess in which the task is + # running. + assert isinstance(error, RuntimeError) + assert error.message == 'mock-plugin-output' + + def test_closed(self, executor, model): + executor.close() + with pytest.raises(RuntimeError) as exc_info: + executor.execute(MockContext(model, task_kwargs=dict(function='some.function'))) + assert 'closed' in exc_info.value.message + + def test_process_termination(self, executor, model, fs_test_holder, tmpdir): + freeze_script_path = str(tmpdir.join('freeze_script')) + with open(freeze_script_path, 'w+b') as f: + f.write( + '''import time +while True: + time.sleep(5) + ''' + ) + holder_path_argument = models.Argument.wrap('holder_path', fs_test_holder._path) + script_path_argument = models.Argument.wrap('freezing_script_path', + str(tmpdir.join('freeze_script'))) + + model.argument.put(holder_path_argument) + model.argument.put(script_path_argument) + ctx = MockContext( + model, + task_kwargs=dict( + function='{0}.{1}'.format(__name__, freezing_task.__name__), + arguments=dict(holder_path=holder_path_argument, + freezing_script_path=script_path_argument)), + ) + + executor.execute(ctx) + + @retrying.retry(retry_on_result=lambda r: r is False, stop_max_delay=60000, wait_fixed=500) + def wait_for_extra_process_id(): + return fs_test_holder.get('subproc', False) + + task_pid = executor._tasks[ctx.task.id].proc.pid + extra_process_pid = wait_for_extra_process_id() + + assert set([task_pid, extra_process_pid]).issubset(set(psutil.pids())) + executor.terminate(ctx.task.id) + + # Give a chance to the processes to terminate + time.sleep(2) + + # all processes should be either zombies or non existent + pids = [task_pid, extra_process_pid] + for pid in pids: + if pid in psutil.pids(): + assert psutil.Process(pid).status() == psutil.STATUS_ZOMBIE + else: + # making the test more readable + assert pid not in psutil.pids() + + +@pytest.fixture +def queue(): + _queue = Queue.Queue() + + def handler(_, exception=None, **kwargs): + _queue.put(exception) + + events.on_success_task_signal.connect(handler) + events.on_failure_task_signal.connect(handler) + try: + yield _queue + finally: + events.on_success_task_signal.disconnect(handler) + events.on_failure_task_signal.disconnect(handler) + + +@pytest.fixture +def fs_test_holder(tmpdir): + dataholder_path = str(tmpdir.join('dataholder')) + holder = FilesystemDataHolder(dataholder_path) + return holder + + +@pytest.fixture +def executor(plugin_manager): + result = process.ProcessExecutor(plugin_manager=plugin_manager, python_path=[tests.ROOT_DIR]) + try: + yield result + finally: + result.close() + + +@pytest.fixture +def mock_plugin(plugin_manager, tmpdir): + source = os.path.join(tests.resources.DIR, 'plugins', 'mock-plugin1') + plugin_path = create_plugin(source=source, destination_dir=str(tmpdir)) + return plugin_manager.install(source=plugin_path) + + +@pytest.fixture +def model(tmpdir): + _storage = aria.application_model_storage(aria.storage.sql_mapi.SQLAlchemyModelAPI, + initiator_kwargs=dict(base_dir=str(tmpdir))) + yield _storage + tests.storage.release_sqlite_storage(_storage) + + +@operation +def freezing_task(holder_path, freezing_script_path, **_): + holder = FilesystemDataHolder(holder_path) + holder['subproc'] = subprocess.Popen([sys.executable, freezing_script_path], shell=True).pid + while True: + time.sleep(5) diff --git a/azure/aria/aria-extension-cloudify/src/aria/tests/orchestrator/workflows/executor/test_process_executor_concurrent_modifications.py b/azure/aria/aria-extension-cloudify/src/aria/tests/orchestrator/workflows/executor/test_process_executor_concurrent_modifications.py new file mode 100644 index 0000000..86a2edf --- /dev/null +++ b/azure/aria/aria-extension-cloudify/src/aria/tests/orchestrator/workflows/executor/test_process_executor_concurrent_modifications.py @@ -0,0 +1,167 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +import time + +import fasteners +import pytest + +from aria.orchestrator import events +from aria.orchestrator.workflows.exceptions import ExecutorException +from aria.orchestrator.workflows import api +from aria.orchestrator.workflows.executor import process +from aria.orchestrator import workflow, operation + +import tests +from tests.orchestrator.context import execute as execute_workflow +from tests.orchestrator.workflows.helpers import events_collector +from tests import mock +from tests import storage +from tests import helpers + + +@pytest.fixture +def dataholder(tmpdir): + dataholder_path = str(tmpdir.join('dataholder')) + holder = helpers.FilesystemDataHolder(dataholder_path) + return holder + + +def test_concurrent_modification_on_task_succeeded(context, executor, lock_files, dataholder): + _test(context, executor, lock_files, _test_task_succeeded, dataholder, expected_failure=False) + + +@operation +def _test_task_succeeded(ctx, lock_files, key, first_value, second_value, holder_path): + _concurrent_update(lock_files, ctx.node, key, first_value, second_value, holder_path) + + +def test_concurrent_modification_on_task_failed(context, executor, lock_files, dataholder): + _test(context, executor, lock_files, _test_task_failed, dataholder, expected_failure=True) + + +@operation +def _test_task_failed(ctx, lock_files, key, first_value, second_value, holder_path): + first = _concurrent_update(lock_files, ctx.node, key, first_value, second_value, holder_path) + if not first: + raise RuntimeError('MESSAGE') + + +def _test(context, executor, lock_files, func, dataholder, expected_failure): + def _node(ctx): + return ctx.model.node.get_by_name(mock.models.DEPENDENCY_NODE_NAME) + + interface_name, operation_name = mock.operations.NODE_OPERATIONS_INSTALL[0] + + key = 'key' + first_value = 'value1' + second_value = 'value2' + arguments = { + 'lock_files': lock_files, + 'key': key, + 'first_value': first_value, + 'second_value': second_value, + 'holder_path': dataholder.path + } + + node = _node(context) + interface = mock.models.create_interface( + node.service, + interface_name, + operation_name, + operation_kwargs=dict(function='{0}.{1}'.format(__name__, func.__name__), + arguments=arguments) + ) + node.interfaces[interface.name] = interface + context.model.node.update(node) + + @workflow + def mock_workflow(graph, **_): + graph.add_tasks( + api.task.OperationTask( + node, + interface_name=interface_name, + operation_name=operation_name, + arguments=arguments), + api.task.OperationTask( + node, + interface_name=interface_name, + operation_name=operation_name, + arguments=arguments) + ) + + signal = events.on_failure_task_signal + with events_collector(signal) as collected: + try: + execute_workflow(mock_workflow, context, executor) + except ExecutorException: + pass + + props = _node(context).attributes + assert dataholder['invocations'] == 2 + assert props[key].value == dataholder[key] + + exceptions = [event['kwargs']['exception'] for event in collected.get(signal, [])] + if expected_failure: + assert exceptions + + +@pytest.fixture +def executor(): + result = process.ProcessExecutor(python_path=[tests.ROOT_DIR]) + try: + yield result + finally: + result.close() + + +@pytest.fixture +def context(tmpdir): + result = mock.context.simple(str(tmpdir)) + yield result + storage.release_sqlite_storage(result.model) + + +@pytest.fixture +def lock_files(tmpdir): + return str(tmpdir.join('first_lock_file')), str(tmpdir.join('second_lock_file')) + + +def _concurrent_update(lock_files, node, key, first_value, second_value, holder_path): + holder = helpers.FilesystemDataHolder(holder_path) + locker1 = fasteners.InterProcessLock(lock_files[0]) + locker2 = fasteners.InterProcessLock(lock_files[1]) + + first = locker1.acquire(blocking=False) + + if first: + # Give chance for both processes to acquire locks + while locker2.acquire(blocking=False): + locker2.release() + time.sleep(0.1) + else: + locker2.acquire() + + node.attributes[key] = first_value if first else second_value + holder['key'] = first_value if first else second_value + holder.setdefault('invocations', 0) + holder['invocations'] += 1 + + if first: + locker1.release() + else: + with locker1: + locker2.release() + + return first diff --git a/azure/aria/aria-extension-cloudify/src/aria/tests/orchestrator/workflows/executor/test_process_executor_extension.py b/azure/aria/aria-extension-cloudify/src/aria/tests/orchestrator/workflows/executor/test_process_executor_extension.py new file mode 100644 index 0000000..b26fa43 --- /dev/null +++ b/azure/aria/aria-extension-cloudify/src/aria/tests/orchestrator/workflows/executor/test_process_executor_extension.py @@ -0,0 +1,99 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import pytest + +from aria import extension +from aria.orchestrator.workflows import api +from aria.orchestrator.workflows.core import engine, graph_compiler +from aria.orchestrator.workflows.executor import process +from aria.orchestrator import workflow, operation + +import tests +from tests import mock +from tests import storage + + +def test_decorate_extension(context, executor): + arguments = {'arg1': 1, 'arg2': 2} + + def get_node(ctx): + return ctx.model.node.get_by_name(mock.models.DEPENDENCY_NODE_NAME) + + node = get_node(context) + interface_name = 'test_interface' + operation_name = 'operation' + interface = mock.models.create_interface( + context.service, + interface_name, + operation_name, + operation_kwargs=dict(function='{0}.{1}'.format(__name__, _mock_operation.__name__), + arguments=arguments) + ) + node.interfaces[interface.name] = interface + context.model.node.update(node) + + + @workflow + def mock_workflow(ctx, graph): + node = get_node(ctx) + task = api.task.OperationTask( + node, + interface_name=interface_name, + operation_name=operation_name, + arguments=arguments) + graph.add_tasks(task) + return graph + graph = mock_workflow(ctx=context) # pylint: disable=no-value-for-parameter + graph_compiler.GraphCompiler(context, executor.__class__).compile(graph) + eng = engine.Engine({executor.__class__: executor}) + eng.execute(context) + out = get_node(context).attributes.get('out').value + assert out['wrapper_arguments'] == arguments + assert out['function_arguments'] == arguments + + +@extension.process_executor +class MockProcessExecutorExtension(object): + + def decorate(self): + def decorator(function): + def wrapper(ctx, **operation_arguments): + with ctx.model.instrument(ctx.model.node.model_cls.attributes): + ctx.node.attributes['out'] = {'wrapper_arguments': operation_arguments} + function(ctx=ctx, **operation_arguments) + return wrapper + return decorator + + +@operation +def _mock_operation(ctx, **operation_arguments): + ctx.node.attributes['out']['function_arguments'] = operation_arguments + + +@pytest.fixture +def executor(): + result = process.ProcessExecutor(python_path=[tests.ROOT_DIR]) + try: + yield result + finally: + result.close() + + +@pytest.fixture +def context(tmpdir): + result = mock.context.simple(str(tmpdir)) + yield result + storage.release_sqlite_storage(result.model) diff --git a/azure/aria/aria-extension-cloudify/src/aria/tests/orchestrator/workflows/executor/test_process_executor_tracked_changes.py b/azure/aria/aria-extension-cloudify/src/aria/tests/orchestrator/workflows/executor/test_process_executor_tracked_changes.py new file mode 100644 index 0000000..47ee2f7 --- /dev/null +++ b/azure/aria/aria-extension-cloudify/src/aria/tests/orchestrator/workflows/executor/test_process_executor_tracked_changes.py @@ -0,0 +1,168 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import copy + +import pytest + +from aria.orchestrator.workflows import api +from aria.orchestrator.workflows.core import engine, graph_compiler +from aria.orchestrator.workflows.executor import process +from aria.orchestrator import workflow, operation +from aria.orchestrator.workflows import exceptions + +import tests +from tests import mock +from tests import storage + + +_TEST_ATTRIBUTES = { + 'some': 'values', 'that': 'are', 'most': 'likely', 'only': 'set', 'here': 'yo' +} + + +def test_track_changes_of_successful_operation(context, executor): + _run_workflow(context=context, executor=executor, op_func=_mock_success_operation) + _assert_tracked_changes_are_applied(context) + + +def test_track_changes_of_failed_operation(context, executor): + with pytest.raises(exceptions.ExecutorException): + _run_workflow(context=context, executor=executor, op_func=_mock_fail_operation) + _assert_tracked_changes_are_applied(context) + + +def _assert_tracked_changes_are_applied(context): + instance = context.model.node.get_by_name(mock.models.DEPENDENCY_NODE_NAME) + assert all(instance.attributes[key].value == value + for key, value in _TEST_ATTRIBUTES.items()) + + +def _update_attributes(context): + context.node.attributes.clear() + context.node.attributes.update(_TEST_ATTRIBUTES) + + +def test_refresh_state_of_tracked_attributes(context, executor): + out = _run_workflow(context=context, executor=executor, op_func=_mock_refreshing_operation) + assert out['after_refresh'] == out['after_change'] + assert out['initial'] != out['after_change'] + + +def test_apply_tracked_changes_during_an_operation(context, executor): + arguments = { + 'committed': {'some': 'new', 'properties': 'right here'}, + 'changed_but_refreshed': {'some': 'newer', 'properties': 'right there'} + } + + expected_initial = context.model.node.get_by_name(mock.models.DEPENDENCY_NODE_NAME).attributes + out = _run_workflow( + context=context, executor=executor, op_func=_mock_updating_operation, arguments=arguments) + + expected_after_update = expected_initial.copy() + expected_after_update.update(arguments['committed']) # pylint: disable=no-member + expected_after_change = expected_after_update.copy() + expected_after_change.update(arguments['changed_but_refreshed']) # pylint: disable=no-member + + assert out['initial'] == expected_initial + assert out['after_update'] == expected_after_update + assert out['after_change'] == expected_after_change + assert out['after_refresh'] == expected_after_change + + +def _run_workflow(context, executor, op_func, arguments=None): + node = context.model.node.get_by_name(mock.models.DEPENDENCY_NODE_NAME) + interface_name = 'test_interface' + operation_name = 'operation' + wf_arguments = arguments or {} + interface = mock.models.create_interface( + context.service, + interface_name, + operation_name, + operation_kwargs=dict(function=_operation_mapping(op_func), + arguments=wf_arguments) + ) + node.interfaces[interface.name] = interface + context.model.node.update(node) + + @workflow + def mock_workflow(ctx, graph): + task = api.task.OperationTask( + node, + interface_name=interface_name, + operation_name=operation_name, + arguments=wf_arguments) + graph.add_tasks(task) + return graph + graph = mock_workflow(ctx=context) # pylint: disable=no-value-for-parameter + graph_compiler.GraphCompiler(context, executor.__class__).compile(graph) + eng = engine.Engine({executor.__class__: executor}) + eng.execute(context) + out = context.model.node.get_by_name(mock.models.DEPENDENCY_NODE_NAME).attributes.get('out') + return out.value if out else None + + +@operation +def _mock_success_operation(ctx): + _update_attributes(ctx) + + +@operation +def _mock_fail_operation(ctx): + _update_attributes(ctx) + raise RuntimeError + + +@operation +def _mock_refreshing_operation(ctx): + out = {'initial': copy.deepcopy(ctx.node.attributes)} + ctx.node.attributes.update({'some': 'new', 'properties': 'right here'}) + out['after_change'] = copy.deepcopy(ctx.node.attributes) + ctx.model.node.refresh(ctx.node) + out['after_refresh'] = copy.deepcopy(ctx.node.attributes) + ctx.node.attributes['out'] = out + + +@operation +def _mock_updating_operation(ctx, committed, changed_but_refreshed): + out = {'initial': copy.deepcopy(ctx.node.attributes)} + ctx.node.attributes.update(committed) + ctx.model.node.update(ctx.node) + out['after_update'] = copy.deepcopy(ctx.node.attributes) + ctx.node.attributes.update(changed_but_refreshed) + out['after_change'] = copy.deepcopy(ctx.node.attributes) + ctx.model.node.refresh(ctx.node) + out['after_refresh'] = copy.deepcopy(ctx.node.attributes) + ctx.node.attributes['out'] = out + + +def _operation_mapping(func): + return '{name}.{func.__name__}'.format(name=__name__, func=func) + + +@pytest.fixture +def executor(): + result = process.ProcessExecutor(python_path=[tests.ROOT_DIR]) + try: + yield result + finally: + result.close() + + +@pytest.fixture +def context(tmpdir): + result = mock.context.simple(str(tmpdir)) + yield result + storage.release_sqlite_storage(result.model) |