diff options
Diffstat (limited to 'azure/aria/aria-extension-cloudify/src/aria/tests/orchestrator/workflows/executor/test_process_executor_concurrent_modifications.py')
-rw-r--r-- | azure/aria/aria-extension-cloudify/src/aria/tests/orchestrator/workflows/executor/test_process_executor_concurrent_modifications.py | 167 |
1 files changed, 167 insertions, 0 deletions
diff --git a/azure/aria/aria-extension-cloudify/src/aria/tests/orchestrator/workflows/executor/test_process_executor_concurrent_modifications.py b/azure/aria/aria-extension-cloudify/src/aria/tests/orchestrator/workflows/executor/test_process_executor_concurrent_modifications.py new file mode 100644 index 0000000..86a2edf --- /dev/null +++ b/azure/aria/aria-extension-cloudify/src/aria/tests/orchestrator/workflows/executor/test_process_executor_concurrent_modifications.py @@ -0,0 +1,167 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +import time + +import fasteners +import pytest + +from aria.orchestrator import events +from aria.orchestrator.workflows.exceptions import ExecutorException +from aria.orchestrator.workflows import api +from aria.orchestrator.workflows.executor import process +from aria.orchestrator import workflow, operation + +import tests +from tests.orchestrator.context import execute as execute_workflow +from tests.orchestrator.workflows.helpers import events_collector +from tests import mock +from tests import storage +from tests import helpers + + +@pytest.fixture +def dataholder(tmpdir): + dataholder_path = str(tmpdir.join('dataholder')) + holder = helpers.FilesystemDataHolder(dataholder_path) + return holder + + +def test_concurrent_modification_on_task_succeeded(context, executor, lock_files, dataholder): + _test(context, executor, lock_files, _test_task_succeeded, dataholder, expected_failure=False) + + +@operation +def _test_task_succeeded(ctx, lock_files, key, first_value, second_value, holder_path): + _concurrent_update(lock_files, ctx.node, key, first_value, second_value, holder_path) + + +def test_concurrent_modification_on_task_failed(context, executor, lock_files, dataholder): + _test(context, executor, lock_files, _test_task_failed, dataholder, expected_failure=True) + + +@operation +def _test_task_failed(ctx, lock_files, key, first_value, second_value, holder_path): + first = _concurrent_update(lock_files, ctx.node, key, first_value, second_value, holder_path) + if not first: + raise RuntimeError('MESSAGE') + + +def _test(context, executor, lock_files, func, dataholder, expected_failure): + def _node(ctx): + return ctx.model.node.get_by_name(mock.models.DEPENDENCY_NODE_NAME) + + interface_name, operation_name = mock.operations.NODE_OPERATIONS_INSTALL[0] + + key = 'key' + first_value = 'value1' + second_value = 'value2' + arguments = { + 'lock_files': lock_files, + 'key': key, + 'first_value': first_value, + 'second_value': second_value, + 'holder_path': dataholder.path + } + + node = _node(context) + interface = mock.models.create_interface( + node.service, + interface_name, + operation_name, + operation_kwargs=dict(function='{0}.{1}'.format(__name__, func.__name__), + arguments=arguments) + ) + node.interfaces[interface.name] = interface + context.model.node.update(node) + + @workflow + def mock_workflow(graph, **_): + graph.add_tasks( + api.task.OperationTask( + node, + interface_name=interface_name, + operation_name=operation_name, + arguments=arguments), + api.task.OperationTask( + node, + interface_name=interface_name, + operation_name=operation_name, + arguments=arguments) + ) + + signal = events.on_failure_task_signal + with events_collector(signal) as collected: + try: + execute_workflow(mock_workflow, context, executor) + except ExecutorException: + pass + + props = _node(context).attributes + assert dataholder['invocations'] == 2 + assert props[key].value == dataholder[key] + + exceptions = [event['kwargs']['exception'] for event in collected.get(signal, [])] + if expected_failure: + assert exceptions + + +@pytest.fixture +def executor(): + result = process.ProcessExecutor(python_path=[tests.ROOT_DIR]) + try: + yield result + finally: + result.close() + + +@pytest.fixture +def context(tmpdir): + result = mock.context.simple(str(tmpdir)) + yield result + storage.release_sqlite_storage(result.model) + + +@pytest.fixture +def lock_files(tmpdir): + return str(tmpdir.join('first_lock_file')), str(tmpdir.join('second_lock_file')) + + +def _concurrent_update(lock_files, node, key, first_value, second_value, holder_path): + holder = helpers.FilesystemDataHolder(holder_path) + locker1 = fasteners.InterProcessLock(lock_files[0]) + locker2 = fasteners.InterProcessLock(lock_files[1]) + + first = locker1.acquire(blocking=False) + + if first: + # Give chance for both processes to acquire locks + while locker2.acquire(blocking=False): + locker2.release() + time.sleep(0.1) + else: + locker2.acquire() + + node.attributes[key] = first_value if first else second_value + holder['key'] = first_value if first else second_value + holder.setdefault('invocations', 0) + holder['invocations'] += 1 + + if first: + locker1.release() + else: + with locker1: + locker2.release() + + return first |