summaryrefslogtreecommitdiffstats
path: root/azure/aria/aria-extension-cloudify/src/aria/tests/orchestrator/workflows/executor/test_process_executor_concurrent_modifications.py
diff options
context:
space:
mode:
Diffstat (limited to 'azure/aria/aria-extension-cloudify/src/aria/tests/orchestrator/workflows/executor/test_process_executor_concurrent_modifications.py')
-rw-r--r--azure/aria/aria-extension-cloudify/src/aria/tests/orchestrator/workflows/executor/test_process_executor_concurrent_modifications.py167
1 files changed, 167 insertions, 0 deletions
diff --git a/azure/aria/aria-extension-cloudify/src/aria/tests/orchestrator/workflows/executor/test_process_executor_concurrent_modifications.py b/azure/aria/aria-extension-cloudify/src/aria/tests/orchestrator/workflows/executor/test_process_executor_concurrent_modifications.py
new file mode 100644
index 0000000..86a2edf
--- /dev/null
+++ b/azure/aria/aria-extension-cloudify/src/aria/tests/orchestrator/workflows/executor/test_process_executor_concurrent_modifications.py
@@ -0,0 +1,167 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+import time
+
+import fasteners
+import pytest
+
+from aria.orchestrator import events
+from aria.orchestrator.workflows.exceptions import ExecutorException
+from aria.orchestrator.workflows import api
+from aria.orchestrator.workflows.executor import process
+from aria.orchestrator import workflow, operation
+
+import tests
+from tests.orchestrator.context import execute as execute_workflow
+from tests.orchestrator.workflows.helpers import events_collector
+from tests import mock
+from tests import storage
+from tests import helpers
+
+
+@pytest.fixture
+def dataholder(tmpdir):
+ dataholder_path = str(tmpdir.join('dataholder'))
+ holder = helpers.FilesystemDataHolder(dataholder_path)
+ return holder
+
+
+def test_concurrent_modification_on_task_succeeded(context, executor, lock_files, dataholder):
+ _test(context, executor, lock_files, _test_task_succeeded, dataholder, expected_failure=False)
+
+
+@operation
+def _test_task_succeeded(ctx, lock_files, key, first_value, second_value, holder_path):
+ _concurrent_update(lock_files, ctx.node, key, first_value, second_value, holder_path)
+
+
+def test_concurrent_modification_on_task_failed(context, executor, lock_files, dataholder):
+ _test(context, executor, lock_files, _test_task_failed, dataholder, expected_failure=True)
+
+
+@operation
+def _test_task_failed(ctx, lock_files, key, first_value, second_value, holder_path):
+ first = _concurrent_update(lock_files, ctx.node, key, first_value, second_value, holder_path)
+ if not first:
+ raise RuntimeError('MESSAGE')
+
+
+def _test(context, executor, lock_files, func, dataholder, expected_failure):
+ def _node(ctx):
+ return ctx.model.node.get_by_name(mock.models.DEPENDENCY_NODE_NAME)
+
+ interface_name, operation_name = mock.operations.NODE_OPERATIONS_INSTALL[0]
+
+ key = 'key'
+ first_value = 'value1'
+ second_value = 'value2'
+ arguments = {
+ 'lock_files': lock_files,
+ 'key': key,
+ 'first_value': first_value,
+ 'second_value': second_value,
+ 'holder_path': dataholder.path
+ }
+
+ node = _node(context)
+ interface = mock.models.create_interface(
+ node.service,
+ interface_name,
+ operation_name,
+ operation_kwargs=dict(function='{0}.{1}'.format(__name__, func.__name__),
+ arguments=arguments)
+ )
+ node.interfaces[interface.name] = interface
+ context.model.node.update(node)
+
+ @workflow
+ def mock_workflow(graph, **_):
+ graph.add_tasks(
+ api.task.OperationTask(
+ node,
+ interface_name=interface_name,
+ operation_name=operation_name,
+ arguments=arguments),
+ api.task.OperationTask(
+ node,
+ interface_name=interface_name,
+ operation_name=operation_name,
+ arguments=arguments)
+ )
+
+ signal = events.on_failure_task_signal
+ with events_collector(signal) as collected:
+ try:
+ execute_workflow(mock_workflow, context, executor)
+ except ExecutorException:
+ pass
+
+ props = _node(context).attributes
+ assert dataholder['invocations'] == 2
+ assert props[key].value == dataholder[key]
+
+ exceptions = [event['kwargs']['exception'] for event in collected.get(signal, [])]
+ if expected_failure:
+ assert exceptions
+
+
+@pytest.fixture
+def executor():
+ result = process.ProcessExecutor(python_path=[tests.ROOT_DIR])
+ try:
+ yield result
+ finally:
+ result.close()
+
+
+@pytest.fixture
+def context(tmpdir):
+ result = mock.context.simple(str(tmpdir))
+ yield result
+ storage.release_sqlite_storage(result.model)
+
+
+@pytest.fixture
+def lock_files(tmpdir):
+ return str(tmpdir.join('first_lock_file')), str(tmpdir.join('second_lock_file'))
+
+
+def _concurrent_update(lock_files, node, key, first_value, second_value, holder_path):
+ holder = helpers.FilesystemDataHolder(holder_path)
+ locker1 = fasteners.InterProcessLock(lock_files[0])
+ locker2 = fasteners.InterProcessLock(lock_files[1])
+
+ first = locker1.acquire(blocking=False)
+
+ if first:
+ # Give chance for both processes to acquire locks
+ while locker2.acquire(blocking=False):
+ locker2.release()
+ time.sleep(0.1)
+ else:
+ locker2.acquire()
+
+ node.attributes[key] = first_value if first else second_value
+ holder['key'] = first_value if first else second_value
+ holder.setdefault('invocations', 0)
+ holder['invocations'] += 1
+
+ if first:
+ locker1.release()
+ else:
+ with locker1:
+ locker2.release()
+
+ return first