summaryrefslogtreecommitdiffstats
path: root/azure/aria/aria-extension-cloudify/src/aria/tests/orchestrator/workflows/executor
diff options
context:
space:
mode:
Diffstat (limited to 'azure/aria/aria-extension-cloudify/src/aria/tests/orchestrator/workflows/executor')
-rw-r--r--azure/aria/aria-extension-cloudify/src/aria/tests/orchestrator/workflows/executor/__init__.py98
-rw-r--r--azure/aria/aria-extension-cloudify/src/aria/tests/orchestrator/workflows/executor/test_executor.py149
-rw-r--r--azure/aria/aria-extension-cloudify/src/aria/tests/orchestrator/workflows/executor/test_process_executor.py172
-rw-r--r--azure/aria/aria-extension-cloudify/src/aria/tests/orchestrator/workflows/executor/test_process_executor_concurrent_modifications.py167
-rw-r--r--azure/aria/aria-extension-cloudify/src/aria/tests/orchestrator/workflows/executor/test_process_executor_extension.py99
-rw-r--r--azure/aria/aria-extension-cloudify/src/aria/tests/orchestrator/workflows/executor/test_process_executor_tracked_changes.py168
6 files changed, 853 insertions, 0 deletions
diff --git a/azure/aria/aria-extension-cloudify/src/aria/tests/orchestrator/workflows/executor/__init__.py b/azure/aria/aria-extension-cloudify/src/aria/tests/orchestrator/workflows/executor/__init__.py
new file mode 100644
index 0000000..99d0b39
--- /dev/null
+++ b/azure/aria/aria-extension-cloudify/src/aria/tests/orchestrator/workflows/executor/__init__.py
@@ -0,0 +1,98 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+import logging
+import uuid
+from contextlib import contextmanager
+
+import aria
+from aria.modeling import models
+from aria.orchestrator.context.common import BaseContext
+
+
+class MockContext(object):
+
+ INSTRUMENTATION_FIELDS = BaseContext.INSTRUMENTATION_FIELDS
+
+ def __init__(self, storage, task_kwargs=None):
+ self.logger = logging.getLogger('mock_logger')
+ self._task_kwargs = task_kwargs or {}
+ self._storage = storage
+ self.task = MockTask(storage, **task_kwargs)
+ self.states = []
+ self.exception = None
+
+ @property
+ def serialization_dict(self):
+ return {
+ 'context_cls': self.__class__,
+ 'context': {
+ 'storage_kwargs': self._storage.serialization_dict,
+ 'task_kwargs': self._task_kwargs
+ }
+ }
+
+ def __getattr__(self, item):
+ return None
+
+ def close(self):
+ pass
+
+ @property
+ def model(self):
+ return self._storage
+
+ @classmethod
+ def instantiate_from_dict(cls, storage_kwargs=None, task_kwargs=None):
+ return cls(storage=aria.application_model_storage(**(storage_kwargs or {})),
+ task_kwargs=(task_kwargs or {}))
+
+ @property
+ @contextmanager
+ def persist_changes(self):
+ yield
+
+
+class MockActor(object):
+ def __init__(self):
+ self.name = 'actor_name'
+
+
+class MockTask(object):
+
+ INFINITE_RETRIES = models.Task.INFINITE_RETRIES
+
+ def __init__(self, model, function, arguments=None, plugin_fk=None):
+ self.function = self.name = function
+ self.plugin_fk = plugin_fk
+ self.arguments = arguments or {}
+ self.states = []
+ self.exception = None
+ self.id = str(uuid.uuid4())
+ self.logger = logging.getLogger()
+ self.attempts_count = 1
+ self.max_attempts = 1
+ self.ignore_failure = False
+ self.interface_name = 'interface_name'
+ self.operation_name = 'operation_name'
+ self.actor = MockActor()
+ self.node = self.actor
+ self.model = model
+
+ for state in models.Task.STATES:
+ setattr(self, state.upper(), state)
+
+ @property
+ def plugin(self):
+ return self.model.plugin.get(self.plugin_fk) if self.plugin_fk else None
diff --git a/azure/aria/aria-extension-cloudify/src/aria/tests/orchestrator/workflows/executor/test_executor.py b/azure/aria/aria-extension-cloudify/src/aria/tests/orchestrator/workflows/executor/test_executor.py
new file mode 100644
index 0000000..32a68e0
--- /dev/null
+++ b/azure/aria/aria-extension-cloudify/src/aria/tests/orchestrator/workflows/executor/test_executor.py
@@ -0,0 +1,149 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+import pytest
+import retrying
+
+try:
+ import celery as _celery
+ app = _celery.Celery()
+ app.conf.update(CELERY_RESULT_BACKEND='amqp://')
+except ImportError:
+ _celery = None
+ app = None
+
+import aria
+from aria.modeling import models
+from aria.orchestrator import events
+from aria.orchestrator.workflows.executor import (
+ thread,
+ process,
+ # celery
+)
+
+import tests
+from . import MockContext
+
+
+def _get_function(func):
+ return '{module}.{func.__name__}'.format(module=__name__, func=func)
+
+
+def execute_and_assert(executor, storage=None):
+ expected_value = 'value'
+ successful_task = MockContext(
+ storage, task_kwargs=dict(function=_get_function(mock_successful_task))
+ )
+ failing_task = MockContext(
+ storage, task_kwargs=dict(function=_get_function(mock_failing_task))
+ )
+ task_with_inputs = MockContext(
+ storage,
+ task_kwargs=dict(function=_get_function(mock_task_with_input),
+ arguments={'input': models.Argument.wrap('input', 'value')})
+ )
+
+ for task in [successful_task, failing_task, task_with_inputs]:
+ executor.execute(task)
+
+ @retrying.retry(stop_max_delay=10000, wait_fixed=100)
+ def assertion():
+ assert successful_task.states == ['start', 'success']
+ assert failing_task.states == ['start', 'failure']
+ assert task_with_inputs.states == ['start', 'failure']
+ assert isinstance(failing_task.exception, MockException)
+ assert isinstance(task_with_inputs.exception, MockException)
+ assert task_with_inputs.exception.message == expected_value
+ assertion()
+
+
+def test_thread_execute(thread_executor):
+ execute_and_assert(thread_executor)
+
+
+def test_process_execute(process_executor, storage):
+ execute_and_assert(process_executor, storage)
+
+
+def mock_successful_task(**_):
+ pass
+
+
+def mock_failing_task(**_):
+ raise MockException
+
+
+def mock_task_with_input(input, **_):
+ raise MockException(input)
+
+if app:
+ mock_successful_task = app.task(mock_successful_task)
+ mock_failing_task = app.task(mock_failing_task)
+ mock_task_with_input = app.task(mock_task_with_input)
+
+
+class MockException(Exception):
+ pass
+
+
+@pytest.fixture
+def storage(tmpdir):
+ _storage = aria.application_model_storage(aria.storage.sql_mapi.SQLAlchemyModelAPI,
+ initiator_kwargs=dict(base_dir=str(tmpdir)))
+ yield _storage
+ tests.storage.release_sqlite_storage(_storage)
+
+
+@pytest.fixture(params=[
+ (thread.ThreadExecutor, {'pool_size': 1}),
+ (thread.ThreadExecutor, {'pool_size': 2}),
+ # subprocess needs to load a tests module so we explicitly add the root directory as if
+ # the project has been installed in editable mode
+ # (celery.CeleryExecutor, {'app': app})
+])
+def thread_executor(request):
+ executor_cls, executor_kwargs = request.param
+ result = executor_cls(**executor_kwargs)
+ yield result
+ result.close()
+
+
+@pytest.fixture
+def process_executor():
+ result = process.ProcessExecutor(python_path=tests.ROOT_DIR)
+ yield result
+ result.close()
+
+
+@pytest.fixture(autouse=True)
+def register_signals():
+ def start_handler(task, *args, **kwargs):
+ task.states.append('start')
+
+ def success_handler(task, *args, **kwargs):
+ task.states.append('success')
+
+ def failure_handler(task, exception, *args, **kwargs):
+ task.states.append('failure')
+ task.exception = exception
+
+ events.start_task_signal.connect(start_handler)
+ events.on_success_task_signal.connect(success_handler)
+ events.on_failure_task_signal.connect(failure_handler)
+ yield
+ events.start_task_signal.disconnect(start_handler)
+ events.on_success_task_signal.disconnect(success_handler)
+ events.on_failure_task_signal.disconnect(failure_handler)
diff --git a/azure/aria/aria-extension-cloudify/src/aria/tests/orchestrator/workflows/executor/test_process_executor.py b/azure/aria/aria-extension-cloudify/src/aria/tests/orchestrator/workflows/executor/test_process_executor.py
new file mode 100644
index 0000000..e050d18
--- /dev/null
+++ b/azure/aria/aria-extension-cloudify/src/aria/tests/orchestrator/workflows/executor/test_process_executor.py
@@ -0,0 +1,172 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import os
+import sys
+import time
+import Queue
+import subprocess
+
+import pytest
+import psutil
+import retrying
+
+import aria
+
+from aria import operation
+from aria.modeling import models
+from aria.orchestrator import events
+from aria.utils.plugin import create as create_plugin
+from aria.orchestrator.workflows.executor import process
+
+import tests.storage
+import tests.resources
+from tests.helpers import FilesystemDataHolder
+from tests.fixtures import ( # pylint: disable=unused-import
+ plugins_dir,
+ plugin_manager,
+)
+from . import MockContext
+
+
+class TestProcessExecutor(object):
+
+ def test_plugin_execution(self, executor, mock_plugin, model, queue):
+ ctx = MockContext(
+ model,
+ task_kwargs=dict(function='mock_plugin1.operation', plugin_fk=mock_plugin.id)
+ )
+
+ executor.execute(ctx)
+ error = queue.get(timeout=60)
+ # tests/resources/plugins/mock-plugin1 is the plugin installed
+ # during this tests setup. The module mock_plugin1 contains a single
+ # operation named "operation" which calls an entry point defined in the plugin's
+ # setup.py. This entry points simply prints 'mock-plugin-output' to stdout.
+ # The "operation" operation that called this subprocess, then raises a RuntimeError
+ # with that subprocess output as the error message.
+ # This is what we assert here. This tests checks that both the PYTHONPATH (operation)
+ # and PATH (entry point) are properly updated in the subprocess in which the task is
+ # running.
+ assert isinstance(error, RuntimeError)
+ assert error.message == 'mock-plugin-output'
+
+ def test_closed(self, executor, model):
+ executor.close()
+ with pytest.raises(RuntimeError) as exc_info:
+ executor.execute(MockContext(model, task_kwargs=dict(function='some.function')))
+ assert 'closed' in exc_info.value.message
+
+ def test_process_termination(self, executor, model, fs_test_holder, tmpdir):
+ freeze_script_path = str(tmpdir.join('freeze_script'))
+ with open(freeze_script_path, 'w+b') as f:
+ f.write(
+ '''import time
+while True:
+ time.sleep(5)
+ '''
+ )
+ holder_path_argument = models.Argument.wrap('holder_path', fs_test_holder._path)
+ script_path_argument = models.Argument.wrap('freezing_script_path',
+ str(tmpdir.join('freeze_script')))
+
+ model.argument.put(holder_path_argument)
+ model.argument.put(script_path_argument)
+ ctx = MockContext(
+ model,
+ task_kwargs=dict(
+ function='{0}.{1}'.format(__name__, freezing_task.__name__),
+ arguments=dict(holder_path=holder_path_argument,
+ freezing_script_path=script_path_argument)),
+ )
+
+ executor.execute(ctx)
+
+ @retrying.retry(retry_on_result=lambda r: r is False, stop_max_delay=60000, wait_fixed=500)
+ def wait_for_extra_process_id():
+ return fs_test_holder.get('subproc', False)
+
+ task_pid = executor._tasks[ctx.task.id].proc.pid
+ extra_process_pid = wait_for_extra_process_id()
+
+ assert set([task_pid, extra_process_pid]).issubset(set(psutil.pids()))
+ executor.terminate(ctx.task.id)
+
+ # Give a chance to the processes to terminate
+ time.sleep(2)
+
+ # all processes should be either zombies or non existent
+ pids = [task_pid, extra_process_pid]
+ for pid in pids:
+ if pid in psutil.pids():
+ assert psutil.Process(pid).status() == psutil.STATUS_ZOMBIE
+ else:
+ # making the test more readable
+ assert pid not in psutil.pids()
+
+
+@pytest.fixture
+def queue():
+ _queue = Queue.Queue()
+
+ def handler(_, exception=None, **kwargs):
+ _queue.put(exception)
+
+ events.on_success_task_signal.connect(handler)
+ events.on_failure_task_signal.connect(handler)
+ try:
+ yield _queue
+ finally:
+ events.on_success_task_signal.disconnect(handler)
+ events.on_failure_task_signal.disconnect(handler)
+
+
+@pytest.fixture
+def fs_test_holder(tmpdir):
+ dataholder_path = str(tmpdir.join('dataholder'))
+ holder = FilesystemDataHolder(dataholder_path)
+ return holder
+
+
+@pytest.fixture
+def executor(plugin_manager):
+ result = process.ProcessExecutor(plugin_manager=plugin_manager, python_path=[tests.ROOT_DIR])
+ try:
+ yield result
+ finally:
+ result.close()
+
+
+@pytest.fixture
+def mock_plugin(plugin_manager, tmpdir):
+ source = os.path.join(tests.resources.DIR, 'plugins', 'mock-plugin1')
+ plugin_path = create_plugin(source=source, destination_dir=str(tmpdir))
+ return plugin_manager.install(source=plugin_path)
+
+
+@pytest.fixture
+def model(tmpdir):
+ _storage = aria.application_model_storage(aria.storage.sql_mapi.SQLAlchemyModelAPI,
+ initiator_kwargs=dict(base_dir=str(tmpdir)))
+ yield _storage
+ tests.storage.release_sqlite_storage(_storage)
+
+
+@operation
+def freezing_task(holder_path, freezing_script_path, **_):
+ holder = FilesystemDataHolder(holder_path)
+ holder['subproc'] = subprocess.Popen([sys.executable, freezing_script_path], shell=True).pid
+ while True:
+ time.sleep(5)
diff --git a/azure/aria/aria-extension-cloudify/src/aria/tests/orchestrator/workflows/executor/test_process_executor_concurrent_modifications.py b/azure/aria/aria-extension-cloudify/src/aria/tests/orchestrator/workflows/executor/test_process_executor_concurrent_modifications.py
new file mode 100644
index 0000000..86a2edf
--- /dev/null
+++ b/azure/aria/aria-extension-cloudify/src/aria/tests/orchestrator/workflows/executor/test_process_executor_concurrent_modifications.py
@@ -0,0 +1,167 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+import time
+
+import fasteners
+import pytest
+
+from aria.orchestrator import events
+from aria.orchestrator.workflows.exceptions import ExecutorException
+from aria.orchestrator.workflows import api
+from aria.orchestrator.workflows.executor import process
+from aria.orchestrator import workflow, operation
+
+import tests
+from tests.orchestrator.context import execute as execute_workflow
+from tests.orchestrator.workflows.helpers import events_collector
+from tests import mock
+from tests import storage
+from tests import helpers
+
+
+@pytest.fixture
+def dataholder(tmpdir):
+ dataholder_path = str(tmpdir.join('dataholder'))
+ holder = helpers.FilesystemDataHolder(dataholder_path)
+ return holder
+
+
+def test_concurrent_modification_on_task_succeeded(context, executor, lock_files, dataholder):
+ _test(context, executor, lock_files, _test_task_succeeded, dataholder, expected_failure=False)
+
+
+@operation
+def _test_task_succeeded(ctx, lock_files, key, first_value, second_value, holder_path):
+ _concurrent_update(lock_files, ctx.node, key, first_value, second_value, holder_path)
+
+
+def test_concurrent_modification_on_task_failed(context, executor, lock_files, dataholder):
+ _test(context, executor, lock_files, _test_task_failed, dataholder, expected_failure=True)
+
+
+@operation
+def _test_task_failed(ctx, lock_files, key, first_value, second_value, holder_path):
+ first = _concurrent_update(lock_files, ctx.node, key, first_value, second_value, holder_path)
+ if not first:
+ raise RuntimeError('MESSAGE')
+
+
+def _test(context, executor, lock_files, func, dataholder, expected_failure):
+ def _node(ctx):
+ return ctx.model.node.get_by_name(mock.models.DEPENDENCY_NODE_NAME)
+
+ interface_name, operation_name = mock.operations.NODE_OPERATIONS_INSTALL[0]
+
+ key = 'key'
+ first_value = 'value1'
+ second_value = 'value2'
+ arguments = {
+ 'lock_files': lock_files,
+ 'key': key,
+ 'first_value': first_value,
+ 'second_value': second_value,
+ 'holder_path': dataholder.path
+ }
+
+ node = _node(context)
+ interface = mock.models.create_interface(
+ node.service,
+ interface_name,
+ operation_name,
+ operation_kwargs=dict(function='{0}.{1}'.format(__name__, func.__name__),
+ arguments=arguments)
+ )
+ node.interfaces[interface.name] = interface
+ context.model.node.update(node)
+
+ @workflow
+ def mock_workflow(graph, **_):
+ graph.add_tasks(
+ api.task.OperationTask(
+ node,
+ interface_name=interface_name,
+ operation_name=operation_name,
+ arguments=arguments),
+ api.task.OperationTask(
+ node,
+ interface_name=interface_name,
+ operation_name=operation_name,
+ arguments=arguments)
+ )
+
+ signal = events.on_failure_task_signal
+ with events_collector(signal) as collected:
+ try:
+ execute_workflow(mock_workflow, context, executor)
+ except ExecutorException:
+ pass
+
+ props = _node(context).attributes
+ assert dataholder['invocations'] == 2
+ assert props[key].value == dataholder[key]
+
+ exceptions = [event['kwargs']['exception'] for event in collected.get(signal, [])]
+ if expected_failure:
+ assert exceptions
+
+
+@pytest.fixture
+def executor():
+ result = process.ProcessExecutor(python_path=[tests.ROOT_DIR])
+ try:
+ yield result
+ finally:
+ result.close()
+
+
+@pytest.fixture
+def context(tmpdir):
+ result = mock.context.simple(str(tmpdir))
+ yield result
+ storage.release_sqlite_storage(result.model)
+
+
+@pytest.fixture
+def lock_files(tmpdir):
+ return str(tmpdir.join('first_lock_file')), str(tmpdir.join('second_lock_file'))
+
+
+def _concurrent_update(lock_files, node, key, first_value, second_value, holder_path):
+ holder = helpers.FilesystemDataHolder(holder_path)
+ locker1 = fasteners.InterProcessLock(lock_files[0])
+ locker2 = fasteners.InterProcessLock(lock_files[1])
+
+ first = locker1.acquire(blocking=False)
+
+ if first:
+ # Give chance for both processes to acquire locks
+ while locker2.acquire(blocking=False):
+ locker2.release()
+ time.sleep(0.1)
+ else:
+ locker2.acquire()
+
+ node.attributes[key] = first_value if first else second_value
+ holder['key'] = first_value if first else second_value
+ holder.setdefault('invocations', 0)
+ holder['invocations'] += 1
+
+ if first:
+ locker1.release()
+ else:
+ with locker1:
+ locker2.release()
+
+ return first
diff --git a/azure/aria/aria-extension-cloudify/src/aria/tests/orchestrator/workflows/executor/test_process_executor_extension.py b/azure/aria/aria-extension-cloudify/src/aria/tests/orchestrator/workflows/executor/test_process_executor_extension.py
new file mode 100644
index 0000000..b26fa43
--- /dev/null
+++ b/azure/aria/aria-extension-cloudify/src/aria/tests/orchestrator/workflows/executor/test_process_executor_extension.py
@@ -0,0 +1,99 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import pytest
+
+from aria import extension
+from aria.orchestrator.workflows import api
+from aria.orchestrator.workflows.core import engine, graph_compiler
+from aria.orchestrator.workflows.executor import process
+from aria.orchestrator import workflow, operation
+
+import tests
+from tests import mock
+from tests import storage
+
+
+def test_decorate_extension(context, executor):
+ arguments = {'arg1': 1, 'arg2': 2}
+
+ def get_node(ctx):
+ return ctx.model.node.get_by_name(mock.models.DEPENDENCY_NODE_NAME)
+
+ node = get_node(context)
+ interface_name = 'test_interface'
+ operation_name = 'operation'
+ interface = mock.models.create_interface(
+ context.service,
+ interface_name,
+ operation_name,
+ operation_kwargs=dict(function='{0}.{1}'.format(__name__, _mock_operation.__name__),
+ arguments=arguments)
+ )
+ node.interfaces[interface.name] = interface
+ context.model.node.update(node)
+
+
+ @workflow
+ def mock_workflow(ctx, graph):
+ node = get_node(ctx)
+ task = api.task.OperationTask(
+ node,
+ interface_name=interface_name,
+ operation_name=operation_name,
+ arguments=arguments)
+ graph.add_tasks(task)
+ return graph
+ graph = mock_workflow(ctx=context) # pylint: disable=no-value-for-parameter
+ graph_compiler.GraphCompiler(context, executor.__class__).compile(graph)
+ eng = engine.Engine({executor.__class__: executor})
+ eng.execute(context)
+ out = get_node(context).attributes.get('out').value
+ assert out['wrapper_arguments'] == arguments
+ assert out['function_arguments'] == arguments
+
+
+@extension.process_executor
+class MockProcessExecutorExtension(object):
+
+ def decorate(self):
+ def decorator(function):
+ def wrapper(ctx, **operation_arguments):
+ with ctx.model.instrument(ctx.model.node.model_cls.attributes):
+ ctx.node.attributes['out'] = {'wrapper_arguments': operation_arguments}
+ function(ctx=ctx, **operation_arguments)
+ return wrapper
+ return decorator
+
+
+@operation
+def _mock_operation(ctx, **operation_arguments):
+ ctx.node.attributes['out']['function_arguments'] = operation_arguments
+
+
+@pytest.fixture
+def executor():
+ result = process.ProcessExecutor(python_path=[tests.ROOT_DIR])
+ try:
+ yield result
+ finally:
+ result.close()
+
+
+@pytest.fixture
+def context(tmpdir):
+ result = mock.context.simple(str(tmpdir))
+ yield result
+ storage.release_sqlite_storage(result.model)
diff --git a/azure/aria/aria-extension-cloudify/src/aria/tests/orchestrator/workflows/executor/test_process_executor_tracked_changes.py b/azure/aria/aria-extension-cloudify/src/aria/tests/orchestrator/workflows/executor/test_process_executor_tracked_changes.py
new file mode 100644
index 0000000..47ee2f7
--- /dev/null
+++ b/azure/aria/aria-extension-cloudify/src/aria/tests/orchestrator/workflows/executor/test_process_executor_tracked_changes.py
@@ -0,0 +1,168 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import copy
+
+import pytest
+
+from aria.orchestrator.workflows import api
+from aria.orchestrator.workflows.core import engine, graph_compiler
+from aria.orchestrator.workflows.executor import process
+from aria.orchestrator import workflow, operation
+from aria.orchestrator.workflows import exceptions
+
+import tests
+from tests import mock
+from tests import storage
+
+
+_TEST_ATTRIBUTES = {
+ 'some': 'values', 'that': 'are', 'most': 'likely', 'only': 'set', 'here': 'yo'
+}
+
+
+def test_track_changes_of_successful_operation(context, executor):
+ _run_workflow(context=context, executor=executor, op_func=_mock_success_operation)
+ _assert_tracked_changes_are_applied(context)
+
+
+def test_track_changes_of_failed_operation(context, executor):
+ with pytest.raises(exceptions.ExecutorException):
+ _run_workflow(context=context, executor=executor, op_func=_mock_fail_operation)
+ _assert_tracked_changes_are_applied(context)
+
+
+def _assert_tracked_changes_are_applied(context):
+ instance = context.model.node.get_by_name(mock.models.DEPENDENCY_NODE_NAME)
+ assert all(instance.attributes[key].value == value
+ for key, value in _TEST_ATTRIBUTES.items())
+
+
+def _update_attributes(context):
+ context.node.attributes.clear()
+ context.node.attributes.update(_TEST_ATTRIBUTES)
+
+
+def test_refresh_state_of_tracked_attributes(context, executor):
+ out = _run_workflow(context=context, executor=executor, op_func=_mock_refreshing_operation)
+ assert out['after_refresh'] == out['after_change']
+ assert out['initial'] != out['after_change']
+
+
+def test_apply_tracked_changes_during_an_operation(context, executor):
+ arguments = {
+ 'committed': {'some': 'new', 'properties': 'right here'},
+ 'changed_but_refreshed': {'some': 'newer', 'properties': 'right there'}
+ }
+
+ expected_initial = context.model.node.get_by_name(mock.models.DEPENDENCY_NODE_NAME).attributes
+ out = _run_workflow(
+ context=context, executor=executor, op_func=_mock_updating_operation, arguments=arguments)
+
+ expected_after_update = expected_initial.copy()
+ expected_after_update.update(arguments['committed']) # pylint: disable=no-member
+ expected_after_change = expected_after_update.copy()
+ expected_after_change.update(arguments['changed_but_refreshed']) # pylint: disable=no-member
+
+ assert out['initial'] == expected_initial
+ assert out['after_update'] == expected_after_update
+ assert out['after_change'] == expected_after_change
+ assert out['after_refresh'] == expected_after_change
+
+
+def _run_workflow(context, executor, op_func, arguments=None):
+ node = context.model.node.get_by_name(mock.models.DEPENDENCY_NODE_NAME)
+ interface_name = 'test_interface'
+ operation_name = 'operation'
+ wf_arguments = arguments or {}
+ interface = mock.models.create_interface(
+ context.service,
+ interface_name,
+ operation_name,
+ operation_kwargs=dict(function=_operation_mapping(op_func),
+ arguments=wf_arguments)
+ )
+ node.interfaces[interface.name] = interface
+ context.model.node.update(node)
+
+ @workflow
+ def mock_workflow(ctx, graph):
+ task = api.task.OperationTask(
+ node,
+ interface_name=interface_name,
+ operation_name=operation_name,
+ arguments=wf_arguments)
+ graph.add_tasks(task)
+ return graph
+ graph = mock_workflow(ctx=context) # pylint: disable=no-value-for-parameter
+ graph_compiler.GraphCompiler(context, executor.__class__).compile(graph)
+ eng = engine.Engine({executor.__class__: executor})
+ eng.execute(context)
+ out = context.model.node.get_by_name(mock.models.DEPENDENCY_NODE_NAME).attributes.get('out')
+ return out.value if out else None
+
+
+@operation
+def _mock_success_operation(ctx):
+ _update_attributes(ctx)
+
+
+@operation
+def _mock_fail_operation(ctx):
+ _update_attributes(ctx)
+ raise RuntimeError
+
+
+@operation
+def _mock_refreshing_operation(ctx):
+ out = {'initial': copy.deepcopy(ctx.node.attributes)}
+ ctx.node.attributes.update({'some': 'new', 'properties': 'right here'})
+ out['after_change'] = copy.deepcopy(ctx.node.attributes)
+ ctx.model.node.refresh(ctx.node)
+ out['after_refresh'] = copy.deepcopy(ctx.node.attributes)
+ ctx.node.attributes['out'] = out
+
+
+@operation
+def _mock_updating_operation(ctx, committed, changed_but_refreshed):
+ out = {'initial': copy.deepcopy(ctx.node.attributes)}
+ ctx.node.attributes.update(committed)
+ ctx.model.node.update(ctx.node)
+ out['after_update'] = copy.deepcopy(ctx.node.attributes)
+ ctx.node.attributes.update(changed_but_refreshed)
+ out['after_change'] = copy.deepcopy(ctx.node.attributes)
+ ctx.model.node.refresh(ctx.node)
+ out['after_refresh'] = copy.deepcopy(ctx.node.attributes)
+ ctx.node.attributes['out'] = out
+
+
+def _operation_mapping(func):
+ return '{name}.{func.__name__}'.format(name=__name__, func=func)
+
+
+@pytest.fixture
+def executor():
+ result = process.ProcessExecutor(python_path=[tests.ROOT_DIR])
+ try:
+ yield result
+ finally:
+ result.close()
+
+
+@pytest.fixture
+def context(tmpdir):
+ result = mock.context.simple(str(tmpdir))
+ yield result
+ storage.release_sqlite_storage(result.model)