summaryrefslogtreecommitdiffstats
path: root/azure/aria/aria-extension-cloudify/src/aria/tests/orchestrator/workflows
diff options
context:
space:
mode:
Diffstat (limited to 'azure/aria/aria-extension-cloudify/src/aria/tests/orchestrator/workflows')
-rw-r--r--azure/aria/aria-extension-cloudify/src/aria/tests/orchestrator/workflows/__init__.py16
-rw-r--r--azure/aria/aria-extension-cloudify/src/aria/tests/orchestrator/workflows/api/__init__.py14
-rw-r--r--azure/aria/aria-extension-cloudify/src/aria/tests/orchestrator/workflows/api/test_task.py223
-rw-r--r--azure/aria/aria-extension-cloudify/src/aria/tests/orchestrator/workflows/api/test_task_graph.py745
-rw-r--r--azure/aria/aria-extension-cloudify/src/aria/tests/orchestrator/workflows/builtin/__init__.py70
-rw-r--r--azure/aria/aria-extension-cloudify/src/aria/tests/orchestrator/workflows/builtin/test_execute_operation.py64
-rw-r--r--azure/aria/aria-extension-cloudify/src/aria/tests/orchestrator/workflows/builtin/test_heal.py100
-rw-r--r--azure/aria/aria-extension-cloudify/src/aria/tests/orchestrator/workflows/builtin/test_install.py46
-rw-r--r--azure/aria/aria-extension-cloudify/src/aria/tests/orchestrator/workflows/builtin/test_uninstall.py47
-rw-r--r--azure/aria/aria-extension-cloudify/src/aria/tests/orchestrator/workflows/core/__init__.py14
-rw-r--r--azure/aria/aria-extension-cloudify/src/aria/tests/orchestrator/workflows/core/test_engine.py564
-rw-r--r--azure/aria/aria-extension-cloudify/src/aria/tests/orchestrator/workflows/core/test_events.py171
-rw-r--r--azure/aria/aria-extension-cloudify/src/aria/tests/orchestrator/workflows/core/test_task.py153
-rw-r--r--azure/aria/aria-extension-cloudify/src/aria/tests/orchestrator/workflows/core/test_task_graph_into_execution_graph.py172
-rw-r--r--azure/aria/aria-extension-cloudify/src/aria/tests/orchestrator/workflows/executor/__init__.py98
-rw-r--r--azure/aria/aria-extension-cloudify/src/aria/tests/orchestrator/workflows/executor/test_executor.py149
-rw-r--r--azure/aria/aria-extension-cloudify/src/aria/tests/orchestrator/workflows/executor/test_process_executor.py172
-rw-r--r--azure/aria/aria-extension-cloudify/src/aria/tests/orchestrator/workflows/executor/test_process_executor_concurrent_modifications.py167
-rw-r--r--azure/aria/aria-extension-cloudify/src/aria/tests/orchestrator/workflows/executor/test_process_executor_extension.py99
-rw-r--r--azure/aria/aria-extension-cloudify/src/aria/tests/orchestrator/workflows/executor/test_process_executor_tracked_changes.py168
-rw-r--r--azure/aria/aria-extension-cloudify/src/aria/tests/orchestrator/workflows/helpers.py37
21 files changed, 3289 insertions, 0 deletions
diff --git a/azure/aria/aria-extension-cloudify/src/aria/tests/orchestrator/workflows/__init__.py b/azure/aria/aria-extension-cloudify/src/aria/tests/orchestrator/workflows/__init__.py
new file mode 100644
index 0000000..7f0fd56
--- /dev/null
+++ b/azure/aria/aria-extension-cloudify/src/aria/tests/orchestrator/workflows/__init__.py
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from . import api, core
diff --git a/azure/aria/aria-extension-cloudify/src/aria/tests/orchestrator/workflows/api/__init__.py b/azure/aria/aria-extension-cloudify/src/aria/tests/orchestrator/workflows/api/__init__.py
new file mode 100644
index 0000000..ae1e83e
--- /dev/null
+++ b/azure/aria/aria-extension-cloudify/src/aria/tests/orchestrator/workflows/api/__init__.py
@@ -0,0 +1,14 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
diff --git a/azure/aria/aria-extension-cloudify/src/aria/tests/orchestrator/workflows/api/test_task.py b/azure/aria/aria-extension-cloudify/src/aria/tests/orchestrator/workflows/api/test_task.py
new file mode 100644
index 0000000..9d91b6b
--- /dev/null
+++ b/azure/aria/aria-extension-cloudify/src/aria/tests/orchestrator/workflows/api/test_task.py
@@ -0,0 +1,223 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+import pytest
+
+from aria.orchestrator import context
+from aria.orchestrator.workflows import api
+
+from tests import mock, storage
+
+
+@pytest.fixture
+def ctx(tmpdir):
+ """
+ Create the following graph in storage:
+ dependency_node <------ dependent_node
+ :return:
+ """
+ simple_context = mock.context.simple(str(tmpdir), inmemory=False)
+ simple_context.model.execution.put(mock.models.create_execution(simple_context.service))
+ yield simple_context
+ storage.release_sqlite_storage(simple_context.model)
+
+
+class TestOperationTask(object):
+
+ def test_node_operation_task_creation(self, ctx):
+ interface_name = 'test_interface'
+ operation_name = 'create'
+
+ plugin = mock.models.create_plugin('test_plugin', '0.1')
+ ctx.model.node.update(plugin)
+
+ arguments = {'test_input': True}
+
+ interface = mock.models.create_interface(
+ ctx.service,
+ interface_name,
+ operation_name,
+ operation_kwargs=dict(plugin=plugin,
+ function='op_path',
+ arguments=arguments),)
+
+ node = ctx.model.node.get_by_name(mock.models.DEPENDENT_NODE_NAME)
+ node.interfaces[interface_name] = interface
+ ctx.model.node.update(node)
+ max_attempts = 10
+ retry_interval = 10
+ ignore_failure = True
+
+ with context.workflow.current.push(ctx):
+ api_task = api.task.OperationTask(
+ node,
+ interface_name=interface_name,
+ operation_name=operation_name,
+ arguments=arguments,
+ max_attempts=max_attempts,
+ retry_interval=retry_interval,
+ ignore_failure=ignore_failure)
+
+ assert api_task.name == api.task.OperationTask.NAME_FORMAT.format(
+ type='node',
+ name=node.name,
+ interface=interface_name,
+ operation=operation_name
+ )
+ assert api_task.function == 'op_path'
+ assert api_task.actor == node
+ assert api_task.arguments['test_input'].value is True
+ assert api_task.retry_interval == retry_interval
+ assert api_task.max_attempts == max_attempts
+ assert api_task.ignore_failure == ignore_failure
+ assert api_task.plugin.name == 'test_plugin'
+
+ def test_source_relationship_operation_task_creation(self, ctx):
+ interface_name = 'test_interface'
+ operation_name = 'preconfigure'
+
+ plugin = mock.models.create_plugin('test_plugin', '0.1')
+ ctx.model.plugin.update(plugin)
+
+ arguments = {'test_input': True}
+
+ interface = mock.models.create_interface(
+ ctx.service,
+ interface_name,
+ operation_name,
+ operation_kwargs=dict(plugin=plugin,
+ function='op_path',
+ arguments=arguments)
+ )
+
+ relationship = ctx.model.relationship.list()[0]
+ relationship.interfaces[interface.name] = interface
+ max_attempts = 10
+ retry_interval = 10
+
+ with context.workflow.current.push(ctx):
+ api_task = api.task.OperationTask(
+ relationship,
+ interface_name=interface_name,
+ operation_name=operation_name,
+ arguments=arguments,
+ max_attempts=max_attempts,
+ retry_interval=retry_interval)
+
+ assert api_task.name == api.task.OperationTask.NAME_FORMAT.format(
+ type='relationship',
+ name=relationship.name,
+ interface=interface_name,
+ operation=operation_name
+ )
+ assert api_task.function == 'op_path'
+ assert api_task.actor == relationship
+ assert api_task.arguments['test_input'].value is True
+ assert api_task.retry_interval == retry_interval
+ assert api_task.max_attempts == max_attempts
+ assert api_task.plugin.name == 'test_plugin'
+
+ def test_target_relationship_operation_task_creation(self, ctx):
+ interface_name = 'test_interface'
+ operation_name = 'preconfigure'
+
+ plugin = mock.models.create_plugin('test_plugin', '0.1')
+ ctx.model.node.update(plugin)
+
+ arguments = {'test_input': True}
+
+ interface = mock.models.create_interface(
+ ctx.service,
+ interface_name,
+ operation_name,
+ operation_kwargs=dict(plugin=plugin,
+ function='op_path',
+ arguments=arguments)
+ )
+
+ relationship = ctx.model.relationship.list()[0]
+ relationship.interfaces[interface.name] = interface
+ max_attempts = 10
+ retry_interval = 10
+
+ with context.workflow.current.push(ctx):
+ api_task = api.task.OperationTask(
+ relationship,
+ interface_name=interface_name,
+ operation_name=operation_name,
+ arguments=arguments,
+ max_attempts=max_attempts,
+ retry_interval=retry_interval)
+
+ assert api_task.name == api.task.OperationTask.NAME_FORMAT.format(
+ type='relationship',
+ name=relationship.name,
+ interface=interface_name,
+ operation=operation_name
+ )
+ assert api_task.function == 'op_path'
+ assert api_task.actor == relationship
+ assert api_task.arguments['test_input'].value is True
+ assert api_task.retry_interval == retry_interval
+ assert api_task.max_attempts == max_attempts
+ assert api_task.plugin.name == 'test_plugin'
+
+ def test_operation_task_default_values(self, ctx):
+ interface_name = 'test_interface'
+ operation_name = 'create'
+
+ plugin = mock.models.create_plugin('package', '0.1')
+ ctx.model.node.update(plugin)
+
+ dependency_node = ctx.model.node.get_by_name(mock.models.DEPENDENCY_NODE_NAME)
+
+ interface = mock.models.create_interface(
+ ctx.service,
+ interface_name,
+ operation_name,
+ operation_kwargs=dict(plugin=plugin,
+ function='op_path'))
+ dependency_node.interfaces[interface_name] = interface
+
+ with context.workflow.current.push(ctx):
+ task = api.task.OperationTask(
+ dependency_node,
+ interface_name=interface_name,
+ operation_name=operation_name)
+
+ assert task.arguments == {}
+ assert task.retry_interval == ctx._task_retry_interval
+ assert task.max_attempts == ctx._task_max_attempts
+ assert task.ignore_failure == ctx._task_ignore_failure
+ assert task.plugin is plugin
+
+
+class TestWorkflowTask(object):
+
+ def test_workflow_task_creation(self, ctx):
+
+ workspace = {}
+
+ mock_class = type('mock_class', (object,), {'test_attribute': True})
+
+ def sub_workflow(**kwargs):
+ workspace.update(kwargs)
+ return mock_class
+
+ with context.workflow.current.push(ctx):
+ workflow_task = api.task.WorkflowTask(sub_workflow, kwarg='workflow_kwarg')
+ assert workflow_task.graph is mock_class
+ assert workflow_task.test_attribute is True
diff --git a/azure/aria/aria-extension-cloudify/src/aria/tests/orchestrator/workflows/api/test_task_graph.py b/azure/aria/aria-extension-cloudify/src/aria/tests/orchestrator/workflows/api/test_task_graph.py
new file mode 100644
index 0000000..a569386
--- /dev/null
+++ b/azure/aria/aria-extension-cloudify/src/aria/tests/orchestrator/workflows/api/test_task_graph.py
@@ -0,0 +1,745 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import pytest
+
+from aria.orchestrator.workflows.api import task_graph, task
+
+
+class MockTask(task.BaseTask):
+ def __init__(self):
+ super(MockTask, self).__init__(ctx={})
+
+
+@pytest.fixture
+def graph():
+ return task_graph.TaskGraph(name='mock-graph')
+
+
+class TestTaskGraphTasks(object):
+
+ def test_add_task(self, graph):
+ task = MockTask()
+ add_result = graph.add_tasks(task)
+ assert add_result == [task]
+ tasks = [t for t in graph.tasks]
+ assert len(tasks) == 1
+ assert tasks[0] == task
+
+ def test_add_empty_group(self, graph):
+ result = graph.add_tasks([])
+ assert result == []
+
+ def test_add_group(self, graph):
+ tasks = [MockTask(), MockTask(), MockTask()]
+ added_tasks = graph.add_tasks(*tasks)
+ assert added_tasks == tasks
+
+ def test_add_partially_existing_group(self, graph):
+ task = MockTask()
+ graph.add_tasks(task)
+ tasks = [MockTask(), task, MockTask()]
+ added_tasks = graph.add_tasks(*tasks)
+ assert added_tasks == [tasks[0], tasks[2]]
+
+ def test_add_recursively_group(self, graph):
+ recursive_group = [MockTask(), MockTask()]
+ tasks = [MockTask(), recursive_group, MockTask()]
+ added_tasks = graph.add_tasks(tasks)
+ assert added_tasks == [tasks[0], recursive_group[0], recursive_group[1], tasks[2]]
+
+ def test_add_existing_task(self, graph):
+ task = MockTask()
+ graph.add_tasks(task)
+ # adding a task already in graph - should have no effect, and return False
+ add_result = graph.add_tasks(task)
+ assert add_result == []
+ tasks = [t for t in graph.tasks]
+ assert len(tasks) == 1
+ assert tasks[0] == task
+
+ def test_remove_task(self, graph):
+ task = MockTask()
+ other_task = MockTask()
+ graph.add_tasks(task)
+ graph.add_tasks(other_task)
+ graph.remove_tasks(other_task)
+ tasks = [t for t in graph.tasks]
+ assert len(tasks) == 1
+ assert tasks[0] == task
+
+ def test_remove_tasks_with_dependency(self, graph):
+ task = MockTask()
+ dependent_task = MockTask()
+ graph.add_tasks(task)
+ graph.add_tasks(dependent_task)
+ graph.add_dependency(dependent_task, task)
+ remove_result = graph.remove_tasks(dependent_task)
+ assert remove_result == [dependent_task]
+ tasks = [t for t in graph.tasks]
+ assert len(tasks) == 1
+ assert tasks[0] == task
+ # asserting no dependencies are left for the dependent task
+ assert len(list(graph.get_dependencies(task))) == 0
+
+ def test_remove_empty_group(self, graph):
+ result = graph.remove_tasks([])
+ assert result == []
+
+ def test_remove_group(self, graph):
+ tasks = [MockTask(), MockTask(), MockTask()]
+ graph.add_tasks(*tasks)
+ removed_tasks = graph.remove_tasks(*tasks)
+ assert removed_tasks == tasks
+
+ def test_remove_partially_existing_group(self, graph):
+ task = MockTask()
+ graph.add_tasks(task)
+ tasks = [MockTask(), task, MockTask()]
+ removed_tasks = graph.remove_tasks(*tasks)
+ assert removed_tasks == [task]
+
+ def test_remove_recursively_group(self, graph):
+ recursive_group = [MockTask(), MockTask()]
+ tasks = [MockTask(), recursive_group, MockTask()]
+ graph.add_tasks(tasks)
+ removed_tasks = graph.remove_tasks(tasks)
+ assert removed_tasks == [tasks[0], recursive_group[0], recursive_group[1], tasks[2]]
+
+ def test_remove_nonexistent_task(self, graph):
+ task = MockTask()
+ task_not_in_graph = MockTask()
+ graph.add_tasks(task)
+ # removing a task not in graph - should have no effect, and return False
+ remove_result = graph.remove_tasks(task_not_in_graph)
+ assert remove_result == []
+ tasks = [t for t in graph.tasks]
+ assert len(tasks) == 1
+ assert tasks[0] == task
+
+ def test_has_task(self, graph):
+ task = MockTask()
+ graph.add_tasks(task)
+ assert graph.has_tasks(task) is True
+
+ def test_has_nonexistent_task(self, graph):
+ task = MockTask()
+ task_not_in_graph = MockTask()
+ graph.add_tasks(task)
+ assert graph.has_tasks(task_not_in_graph) is False
+
+ def test_has_empty_group(self, graph):
+ # the "empty task" is in the graph
+ assert graph.has_tasks([]) is True
+
+ def test_has_group(self, graph):
+ tasks = [MockTask(), MockTask(), MockTask()]
+ graph.add_tasks(*tasks)
+ assert graph.has_tasks(*tasks) is True
+
+ def test_has_partially_existing_group(self, graph):
+ task = MockTask()
+ graph.add_tasks(task)
+ tasks = [MockTask(), task, MockTask()]
+ assert graph.has_tasks(tasks) is False
+
+ def test_has_recursively_group(self, graph):
+ recursive_group = [MockTask(), MockTask()]
+ tasks = [MockTask(), recursive_group, MockTask()]
+ graph.add_tasks(tasks)
+ assert graph.has_tasks(tasks) is True
+
+ def test_get_task(self, graph):
+ task = MockTask()
+ graph.add_tasks(task)
+ assert graph.get_task(task.id) == task
+
+ def test_get_nonexistent_task(self, graph):
+ task = MockTask()
+ task_not_in_graph = MockTask()
+ graph.add_tasks(task)
+ with pytest.raises(task_graph.TaskNotInGraphError):
+ graph.get_task(task_not_in_graph.id)
+
+
+class TestTaskGraphGraphTraversal(object):
+
+ def test_tasks_iteration(self, graph):
+ task = MockTask()
+ other_task = MockTask()
+ graph.add_tasks(task)
+ graph.add_tasks(other_task)
+ tasks = [t for t in graph.tasks]
+ assert set(tasks) == set([task, other_task])
+
+ def test_get_dependents(self, graph):
+ task = MockTask()
+ dependent_task_1 = MockTask()
+ dependent_task_2 = MockTask()
+ transitively_dependent_task = MockTask()
+
+ graph.add_tasks(task)
+ graph.add_tasks(dependent_task_1)
+ graph.add_tasks(dependent_task_2)
+ graph.add_tasks(transitively_dependent_task)
+
+ graph.add_dependency(dependent_task_1, task)
+ graph.add_dependency(dependent_task_2, task)
+ graph.add_dependency(transitively_dependent_task, dependent_task_2)
+
+ dependent_tasks = list(graph.get_dependents(task))
+ # transitively_dependent_task not expected to appear in the result
+ assert set(dependent_tasks) == set([dependent_task_1, dependent_task_2])
+
+ def test_get_task_empty_dependents(self, graph):
+ task = MockTask()
+ other_task = MockTask()
+ graph.add_tasks(task)
+ graph.add_tasks(other_task)
+ dependent_tasks = list(graph.get_dependents(task))
+ assert len(dependent_tasks) == 0
+
+ def test_get_nonexistent_task_dependents(self, graph):
+ task = MockTask()
+ task_not_in_graph = MockTask()
+ graph.add_tasks(task)
+ with pytest.raises(task_graph.TaskNotInGraphError):
+ list(graph.get_dependents(task_not_in_graph))
+
+ def test_get_dependencies(self, graph):
+ task = MockTask()
+ dependency_task_1 = MockTask()
+ dependency_task_2 = MockTask()
+ transitively_dependency_task = MockTask()
+
+ graph.add_tasks(task)
+ graph.add_tasks(dependency_task_1)
+ graph.add_tasks(dependency_task_2)
+ graph.add_tasks(transitively_dependency_task)
+
+ graph.add_dependency(task, dependency_task_1)
+ graph.add_dependency(task, dependency_task_2)
+ graph.add_dependency(dependency_task_2, transitively_dependency_task)
+
+ dependency_tasks = list(graph.get_dependencies(task))
+ # transitively_dependency_task not expected to appear in the result
+ assert set(dependency_tasks) == set([dependency_task_1, dependency_task_2])
+
+ def test_get_task_empty_dependencies(self, graph):
+ task = MockTask()
+ other_task = MockTask()
+ graph.add_tasks(task)
+ graph.add_tasks(other_task)
+ dependency_tasks = list(graph.get_dependencies(task))
+ assert len(dependency_tasks) == 0
+
+ def test_get_nonexistent_task_dependencies(self, graph):
+ task = MockTask()
+ task_not_in_graph = MockTask()
+ graph.add_tasks(task)
+ with pytest.raises(task_graph.TaskNotInGraphError):
+ list(graph.get_dependencies(task_not_in_graph))
+
+
+class TestTaskGraphDependencies(object):
+
+ def test_add_dependency(self, graph):
+ task = MockTask()
+ dependency_task = MockTask()
+ unrelated_task = MockTask()
+ graph.add_tasks(task)
+ graph.add_tasks(dependency_task)
+ graph.add_tasks(unrelated_task)
+ graph.add_dependency(task, dependency_task)
+ add_result = graph.has_dependency(task, dependency_task)
+ assert add_result is True
+ dependency_tasks = list(graph.get_dependencies(task))
+ assert len(dependency_tasks) == 1
+ assert dependency_tasks[0] == dependency_task
+
+ def test_add_existing_dependency(self, graph):
+ task = MockTask()
+ dependency_task = MockTask()
+ graph.add_tasks(task)
+ graph.add_tasks(dependency_task)
+ graph.add_dependency(task, dependency_task)
+ add_result = graph.has_dependency(task, dependency_task)
+ # adding a dependency already in graph - should have no effect, and return False
+ assert add_result is True
+ graph.add_dependency(task, dependency_task)
+ add_result = graph.has_dependency(task, dependency_task)
+ assert add_result is True
+ dependency_tasks = list(graph.get_dependencies(task))
+ assert len(dependency_tasks) == 1
+ assert dependency_tasks[0] == dependency_task
+
+ def test_add_dependency_nonexistent_dependent(self, graph):
+ task = MockTask()
+ task_not_in_graph = MockTask()
+ graph.add_tasks(task)
+ with pytest.raises(task_graph.TaskNotInGraphError):
+ graph.add_dependency(task_not_in_graph, task)
+
+ def test_add_dependency_nonexistent_dependency(self, graph):
+ task = MockTask()
+ task_not_in_graph = MockTask()
+ graph.add_tasks(task)
+ with pytest.raises(task_graph.TaskNotInGraphError):
+ graph.add_dependency(task, task_not_in_graph)
+
+ def test_add_dependency_empty_dependent(self, graph):
+ task = MockTask()
+ graph.add_tasks(task)
+ # expecting add_dependency result to be False - no dependency has been created
+ assert set(graph.tasks) == set((task,))
+
+ def test_add_dependency_empty_dependency(self, graph):
+ task = MockTask()
+ graph.add_tasks(task)
+ # expecting add_dependency result to be False - no dependency has been created
+ assert set(graph.tasks) == set((task,))
+
+ def test_add_dependency_dependent_group(self, graph):
+ task = MockTask()
+ group_tasks = [MockTask() for _ in xrange(3)]
+ graph.add_tasks(task)
+ graph.add_tasks(*group_tasks)
+ graph.add_dependency(group_tasks, task)
+ assert graph.has_dependency(group_tasks[0], task) is True
+ assert graph.has_dependency(group_tasks[1], task) is True
+ assert graph.has_dependency(group_tasks[2], task) is True
+
+ def test_add_dependency_dependency_group(self, graph):
+ task = MockTask()
+ group_tasks = [MockTask() for _ in xrange(3)]
+ graph.add_tasks(task)
+ graph.add_tasks(*group_tasks)
+ graph.add_dependency(task, group_tasks)
+ assert graph.has_dependency(task, group_tasks[0]) is True
+ assert graph.has_dependency(task, group_tasks[1]) is True
+ assert graph.has_dependency(task, group_tasks[2]) is True
+
+ def test_add_dependency_between_groups(self, graph):
+ group_1_tasks = [MockTask() for _ in xrange(3)]
+ group_2_tasks = [MockTask() for _ in xrange(3)]
+ graph.add_tasks(*group_1_tasks)
+ graph.add_tasks(*group_2_tasks)
+ graph.add_dependency(group_1_tasks, group_2_tasks)
+ for group_2_task in group_2_tasks:
+ assert graph.has_dependency(group_1_tasks[0], group_2_task) is True
+ assert graph.has_dependency(group_1_tasks[1], group_2_task) is True
+ assert graph.has_dependency(group_1_tasks[2], group_2_task) is True
+
+ def test_add_dependency_dependency_group_with_some_existing_dependencies(self, graph):
+ task = MockTask()
+ group_tasks = [MockTask() for _ in xrange(3)]
+ graph.add_tasks(task)
+ graph.add_tasks(*group_tasks)
+ # adding a dependency on a specific task manually,
+ # before adding a dependency on the whole parallel
+ graph.add_dependency(task, group_tasks[1])
+ graph.add_dependency(task, group_tasks)
+ assert graph.has_dependency(task, group_tasks[0]) is True
+ assert graph.has_dependency(task, group_tasks[1]) is True
+ assert graph.has_dependency(task, group_tasks[2]) is True
+
+ def test_add_existing_dependency_between_groups(self, graph):
+ group_1_tasks = [MockTask() for _ in xrange(3)]
+ group_2_tasks = [MockTask() for _ in xrange(3)]
+ graph.add_tasks(*group_1_tasks)
+ graph.add_tasks(*group_2_tasks)
+ graph.add_dependency(group_1_tasks, group_2_tasks)
+ add_result = graph.has_dependency(group_1_tasks, group_2_tasks)
+ assert add_result is True
+ # adding a dependency already in graph - should have no effect, and return False
+ graph.add_dependency(group_1_tasks, group_2_tasks)
+ add_result = graph.has_dependency(group_1_tasks, group_2_tasks)
+ assert add_result is True
+ for group_2_task in group_2_tasks:
+ assert graph.has_dependency(group_1_tasks[0], group_2_task) is True
+ assert graph.has_dependency(group_1_tasks[1], group_2_task) is True
+ assert graph.has_dependency(group_1_tasks[2], group_2_task) is True
+
+ def test_has_dependency(self, graph):
+ task = MockTask()
+ dependency_task = MockTask()
+ graph.add_tasks(task)
+ graph.add_tasks(dependency_task)
+ graph.add_dependency(task, dependency_task)
+ assert graph.has_dependency(task, dependency_task) is True
+
+ def test_has_nonexistent_dependency(self, graph):
+ task = MockTask()
+ other_task = MockTask()
+ graph.add_tasks(task)
+ graph.add_tasks(other_task)
+ assert graph.has_dependency(task, other_task) is False
+
+ def test_has_dependency_nonexistent_dependent(self, graph):
+ task = MockTask()
+ task_not_in_graph = MockTask()
+ graph.add_tasks(task)
+ with pytest.raises(task_graph.TaskNotInGraphError):
+ graph.has_dependency(task_not_in_graph, task)
+
+ def test_has_dependency_nonexistent_dependency(self, graph):
+ task = MockTask()
+ task_not_in_graph = MockTask()
+ graph.add_tasks(task)
+ with pytest.raises(task_graph.TaskNotInGraphError):
+ graph.has_dependency(task, task_not_in_graph)
+
+ def test_has_dependency_empty_dependent(self, graph):
+ task = MockTask()
+ graph.add_tasks(task)
+ # expecting has_dependency result to be False - dependency in an empty form
+ assert graph.has_dependency([], task) is False
+
+ def test_has_dependency_empty_dependency(self, graph):
+ task = MockTask()
+ graph.add_tasks(task)
+ # expecting has_dependency result to be True - dependency in an empty form
+ assert graph.has_dependency(task, []) is False
+
+ def test_has_dependency_dependent_group(self, graph):
+ task = MockTask()
+ group_tasks = [MockTask() for _ in xrange(3)]
+ graph.add_tasks(task)
+ graph.add_tasks(*group_tasks)
+ assert graph.has_dependency(group_tasks, task) is False
+ graph.add_dependency(group_tasks, task)
+ assert graph.has_dependency(group_tasks, task) is True
+
+ def test_has_dependency_dependency_parallel(self, graph):
+ task = MockTask()
+ group_tasks = [MockTask() for _ in xrange(3)]
+ graph.add_tasks(task)
+ graph.add_tasks(*group_tasks)
+ assert graph.has_dependency(task, group_tasks) is False
+ graph.add_dependency(task, group_tasks)
+ assert graph.has_dependency(task, group_tasks) is True
+
+ def test_has_dependency_between_groups(self, graph):
+ group_1_tasks = [MockTask() for _ in xrange(3)]
+ group_2_tasks = [MockTask() for _ in xrange(3)]
+ graph.add_tasks(*group_1_tasks)
+ graph.add_tasks(*group_2_tasks)
+ assert graph.has_dependency(group_2_tasks, group_1_tasks) is False
+ graph.add_dependency(group_2_tasks, group_1_tasks)
+ assert graph.has_dependency(group_2_tasks, group_1_tasks) is True
+
+ def test_has_dependency_dependency_parallel_with_some_existing_dependencies(self, graph):
+ task = MockTask()
+ parallel_tasks = [MockTask() for _ in xrange(3)]
+ graph.add_tasks(task)
+ parallel = graph.add_tasks(*parallel_tasks)
+ graph.add_dependency(task, parallel_tasks[1])
+ # only a partial dependency exists - has_dependency is expected to return False
+ assert graph.has_dependency(task, parallel) is False
+
+ def test_has_nonexistent_dependency_between_groups(self, graph):
+ group_1_tasks = [MockTask() for _ in xrange(3)]
+ group_2_tasks = [MockTask() for _ in xrange(3)]
+ graph.add_tasks(*group_1_tasks)
+ graph.add_tasks(*group_2_tasks)
+ assert graph.has_dependency(group_1_tasks, group_2_tasks) is False
+
+ def test_remove_dependency(self, graph):
+ task = MockTask()
+ dependency_task = MockTask()
+ another_dependent_task = MockTask()
+ graph.add_tasks(task)
+ graph.add_tasks(dependency_task)
+ graph.add_tasks(another_dependent_task)
+ graph.add_dependency(task, dependency_task)
+ graph.add_dependency(another_dependent_task, dependency_task)
+
+ graph.remove_dependency(task, dependency_task)
+ remove_result = graph.has_dependency(task, dependency_task)
+ assert remove_result is False
+ assert graph.has_dependency(task, dependency_task) is False
+ assert graph.has_dependency(another_dependent_task, dependency_task) is True
+
+ def test_remove_nonexistent_dependency(self, graph):
+ task = MockTask()
+ dependency_task = MockTask()
+ graph.add_tasks(task)
+ graph.add_tasks(dependency_task)
+ # removing a dependency not in graph - should have no effect, and return False
+ graph.remove_dependency(task, dependency_task)
+ remove_result = graph.has_dependency(task, dependency_task)
+ assert remove_result is False
+ tasks = [t for t in graph.tasks]
+ assert set(tasks) == set([task, dependency_task])
+
+ def test_remove_dependency_nonexistent_dependent(self, graph):
+ task = MockTask()
+ task_not_in_graph = MockTask()
+ graph.add_tasks(task)
+ with pytest.raises(task_graph.TaskNotInGraphError):
+ graph.remove_dependency(task_not_in_graph, task)
+
+ def test_remove_dependency_nonexistent_dependency(self, graph):
+ # in this test the dependency *task* is not in the graph, not just the dependency itself
+ task = MockTask()
+ task_not_in_graph = MockTask()
+ graph.add_tasks(task)
+ with pytest.raises(task_graph.TaskNotInGraphError):
+ graph.remove_dependency(task, task_not_in_graph)
+
+ def test_remove_dependency_empty_dependent(self, graph):
+ task = MockTask()
+ graph.add_tasks(task)
+ # expecting remove_dependency result to be False - no dependency has been created
+ graph.remove_dependency([], task)
+ assert set(graph.tasks) == set((task,))
+
+ def test_remove_dependency_empty_dependency(self, graph):
+ task = MockTask()
+ graph.add_tasks(task)
+ # expecting remove_dependency result to be False - no dependency has been created
+ graph.remove_dependency(task, [])
+ assert set(graph.tasks) == set((task,))
+
+ def test_remove_dependency_dependent_group(self, graph):
+ task = MockTask()
+ group_tasks = [MockTask() for _ in xrange(3)]
+ graph.add_tasks(task)
+ graph.add_tasks(*group_tasks)
+ graph.add_dependency(group_tasks, task)
+ graph.remove_dependency(group_tasks, task)
+ remove_result = graph.has_dependency(group_tasks, task)
+ assert remove_result is False
+ assert graph.has_dependency(group_tasks[0], task) is False
+ assert graph.has_dependency(group_tasks[1], task) is False
+ assert graph.has_dependency(group_tasks[2], task) is False
+
+ def test_remove_dependency_dependency_group(self, graph):
+ task = MockTask()
+ group_tasks = [MockTask() for _ in xrange(3)]
+ graph.add_tasks(task)
+ graph.add_tasks(*group_tasks)
+ graph.add_dependency(task, group_tasks)
+ graph.remove_dependency(task, group_tasks)
+ remove_result = graph.has_dependency(task, group_tasks)
+ assert remove_result is False
+ assert graph.has_dependency(task, group_tasks[0]) is False
+ assert graph.has_dependency(task, group_tasks[1]) is False
+ assert graph.has_dependency(task, group_tasks[2]) is False
+
+ def test_remove_dependency_between_groups(self, graph):
+ group_1_tasks = [MockTask() for _ in xrange(3)]
+ group_2_tasks = [MockTask() for _ in xrange(3)]
+ graph.add_tasks(*group_1_tasks)
+ graph.add_tasks(*group_2_tasks)
+ graph.add_dependency(group_2_tasks, group_1_tasks)
+ graph.remove_dependency(group_2_tasks, group_1_tasks)
+ remove_result = graph.has_dependency(group_2_tasks, group_1_tasks)
+ assert remove_result is False
+ for group_2_task in group_2_tasks:
+ assert graph.has_dependency(group_2_task, group_1_tasks[0]) is False
+ assert graph.has_dependency(group_2_task, group_1_tasks[1]) is False
+ assert graph.has_dependency(group_2_task, group_1_tasks[2]) is False
+
+ def test_remove_dependency_dependency_group_with_some_existing_dependencies(self, graph):
+ task = MockTask()
+ group_tasks = [MockTask() for _ in xrange(3)]
+ graph.add_tasks(task)
+ graph.add_tasks(*group_tasks)
+ graph.add_dependency(task, group_tasks[1])
+ graph.remove_dependency(task, group_tasks)
+ remove_result = graph.has_dependency(task, group_tasks)
+ # only a partial dependency exists - remove_dependency is expected to return False
+ assert remove_result is False
+ # no dependencies are expected to have changed
+ assert graph.has_dependency(task, group_tasks[0]) is False
+ assert graph.has_dependency(task, group_tasks[1]) is True
+ assert graph.has_dependency(task, group_tasks[2]) is False
+
+ def test_remove_nonexistent_dependency_between_groups(self, graph):
+ group_1_tasks = [MockTask() for _ in xrange(3)]
+ group_2_tasks = [MockTask() for _ in xrange(3)]
+ graph.add_tasks(*group_1_tasks)
+ graph.add_tasks(*group_2_tasks)
+ # removing a dependency not in graph - should have no effect, and return False
+ graph.remove_dependency(group_2_tasks, group_1_tasks)
+ remove_result = graph.has_dependency(group_2_tasks, group_1_tasks)
+ assert remove_result is False
+
+ # nested tests
+
+ def test_group_with_nested_sequence(self, graph):
+ all_tasks = [MockTask() for _ in xrange(5)]
+ graph.add_tasks(all_tasks[0],
+ graph.sequence(all_tasks[1], all_tasks[2], all_tasks[3]),
+ all_tasks[4])
+ assert set(graph.tasks) == set(all_tasks)
+
+ # tasks[2] and tasks[3] should each have a single dependency; the rest should have none
+ assert len(list(graph.get_dependencies(all_tasks[0]))) == 0
+ assert len(list(graph.get_dependencies(all_tasks[1]))) == 0
+ assert set(graph.get_dependencies(all_tasks[2])) == set([all_tasks[1]])
+ assert set(graph.get_dependencies(all_tasks[3])) == set([all_tasks[2]])
+ assert len(list(graph.get_dependencies(all_tasks[4]))) == 0
+
+ def test_group_with_nested_group(self, graph):
+ tasks = [MockTask() for _ in xrange(5)]
+ graph.add_tasks(tasks[0], (tasks[1], tasks[2], tasks[3]), tasks[4])
+ graph_tasks = [t for t in graph.tasks]
+ assert set(graph_tasks) == set(tasks)
+ # none of the tasks should have any dependencies
+ for i in xrange(len(tasks)):
+ assert len(list(graph.get_dependencies(tasks[i]))) == 0
+
+ def test_group_with_recursively_nested_group(self, graph):
+ recursively_nested_tasks = [MockTask(), MockTask(), MockTask()]
+ nested_tasks = [MockTask(), MockTask(), MockTask(), recursively_nested_tasks]
+ tasks = [MockTask(), MockTask(), MockTask(), nested_tasks]
+ graph.add_tasks(*tasks)
+
+ assert set(graph.tasks) == set(tasks[:3] + nested_tasks[:3] + recursively_nested_tasks)
+ for tasks_list in [tasks, nested_tasks, recursively_nested_tasks]:
+ for i in xrange(len(tasks_list[:3])):
+ assert len(list(graph.get_dependencies(tasks_list[i]))) == 0
+
+ def test_group_with_recursively_nested_group_and_interdependencies(self, graph):
+ recursively_nested_tasks = [MockTask(), MockTask(), MockTask()]
+ nested_tasks = [MockTask(), MockTask(), MockTask(), recursively_nested_tasks]
+ tasks = [MockTask(), MockTask(), MockTask(), nested_tasks]
+ graph.add_tasks(*tasks)
+
+ graph.add_dependency(tasks[2], nested_tasks[2])
+ graph.add_dependency(nested_tasks[1], recursively_nested_tasks[0])
+ graph.add_dependency(recursively_nested_tasks[1], tasks[0])
+
+ assert set(graph.tasks) == set(tasks[:3] + nested_tasks[:3] + recursively_nested_tasks)
+ assert set(graph.get_dependencies(tasks[0])) == set()
+ assert set(graph.get_dependencies(tasks[1])) == set()
+ assert set(graph.get_dependencies(tasks[2])) == set([nested_tasks[2]])
+
+ assert set(graph.get_dependencies(nested_tasks[0])) == set()
+ assert set(graph.get_dependencies(nested_tasks[1])) == set([recursively_nested_tasks[0]])
+ assert set(graph.get_dependencies(nested_tasks[2])) == set()
+
+ assert set(graph.get_dependencies(recursively_nested_tasks[0])) == set()
+ assert set(graph.get_dependencies(recursively_nested_tasks[1])) == set([tasks[0]])
+ assert set(graph.get_dependencies(recursively_nested_tasks[2])) == set()
+
+
+class TestTaskGraphSequence(object):
+
+ def test_sequence(self, graph):
+ tasks = [MockTask(), MockTask(), MockTask()]
+ graph.sequence(*tasks)
+ graph_tasks = [t for t in graph.tasks]
+ assert set(graph_tasks) == set(tasks)
+ assert len(list(graph.get_dependencies(tasks[0]))) == 0
+ assert set(graph.get_dependencies(tasks[1])) == set([tasks[0]])
+ assert set(graph.get_dependencies(tasks[2])) == set([tasks[1]])
+
+ def test_sequence_with_some_tasks_and_dependencies_already_in_graph(self, graph):
+ # tests both that tasks which werent previously in graph get inserted, and
+ # that existing tasks don't get re-added to graph
+ tasks = [MockTask(), MockTask(), MockTask()]
+ # insert some tasks and dependencies to the graph
+ graph.add_tasks(tasks[1])
+ graph.add_tasks(tasks[2])
+ graph.add_dependency(tasks[2], tasks[1])
+
+ graph.sequence(*tasks)
+ graph_tasks = [t for t in graph.tasks]
+ assert set(graph_tasks) == set(tasks)
+ assert len(list(graph.get_dependencies(tasks[0]))) == 0
+ assert set(graph.get_dependencies(tasks[1])) == set([tasks[0]])
+ assert set(graph.get_dependencies(tasks[2])) == set([tasks[1]])
+
+ def test_sequence_with_nested_sequence(self, graph):
+ tasks = [MockTask() for _ in xrange(5)]
+ graph.sequence(tasks[0], graph.sequence(tasks[1], tasks[2], tasks[3]), tasks[4])
+ graph_tasks = [t for t in graph.tasks]
+ assert set(graph_tasks) == set(tasks)
+ # first task should have no dependencies
+ assert len(list(graph.get_dependencies(tasks[0]))) == 0
+ assert len(list(graph.get_dependencies(tasks[1]))) == 1
+ assert len(list(graph.get_dependencies(tasks[2]))) == 2
+ assert len(list(graph.get_dependencies(tasks[3]))) == 2
+ assert len(list(graph.get_dependencies(tasks[4]))) == 3
+
+ def test_sequence_with_nested_group(self, graph):
+ tasks = [MockTask() for _ in xrange(5)]
+ graph.sequence(tasks[0], (tasks[1], tasks[2], tasks[3]), tasks[4])
+ graph_tasks = [t for t in graph.tasks]
+ assert set(graph_tasks) == set(tasks)
+ # first task should have no dependencies
+ assert len(list(graph.get_dependencies(tasks[0]))) == 0
+ # rest of the tasks (except last) should have a single dependency - the first task
+ for i in xrange(1, 4):
+ assert set(graph.get_dependencies(tasks[i])) == set([tasks[0]])
+ # last task should have have a dependency on all tasks except for the first one
+ assert set(graph.get_dependencies(tasks[4])) == set([tasks[1], tasks[2], tasks[3]])
+
+ def test_sequence_with_recursively_nested_group(self, graph):
+ recursively_nested_group = [MockTask(), MockTask()]
+ nested_group = [MockTask(), recursively_nested_group, MockTask()]
+ sequence_tasks = [MockTask(), nested_group, MockTask()]
+
+ graph.sequence(*sequence_tasks)
+ graph_tasks = [t for t in graph.tasks]
+ assert set(graph_tasks) == set([sequence_tasks[0], nested_group[0],
+ recursively_nested_group[0], recursively_nested_group[1],
+ nested_group[2], sequence_tasks[2]])
+
+ assert list(graph.get_dependencies(nested_group[0])) == [sequence_tasks[0]]
+ assert list(graph.get_dependencies(recursively_nested_group[0])) == [sequence_tasks[0]]
+ assert list(graph.get_dependencies(recursively_nested_group[1])) == [sequence_tasks[0]]
+ assert list(graph.get_dependencies(nested_group[2])) == [sequence_tasks[0]]
+
+ assert list(graph.get_dependents(nested_group[0])) == [sequence_tasks[2]]
+ assert list(graph.get_dependents(recursively_nested_group[0])) == [sequence_tasks[2]]
+ assert list(graph.get_dependents(recursively_nested_group[1])) == [sequence_tasks[2]]
+ assert list(graph.get_dependents(nested_group[2])) == [sequence_tasks[2]]
+
+ def test_sequence_with_empty_group(self, graph):
+ tasks = [MockTask(), [], MockTask()]
+ graph.sequence(*tasks)
+ graph_tasks = set([t for t in graph.tasks])
+ assert graph_tasks == set([tasks[0], tasks[2]])
+ assert list(graph.get_dependents(tasks[0])) == [tasks[2]]
+ assert list(graph.get_dependencies(tasks[2])) == [tasks[0]]
+
+ def test_sequence_with_recursively_nested_sequence_and_interdependencies(self, graph):
+ recursively_nested_tasks = list(graph.sequence(MockTask(), MockTask(), MockTask()))
+ nested_tasks = list(graph.sequence(MockTask(),
+ MockTask(),
+ MockTask(),
+ recursively_nested_tasks))
+ tasks = [MockTask(), MockTask(), MockTask(), nested_tasks]
+ graph.sequence(*tasks)
+
+ assert set(graph.tasks) == set(tasks[:3] + nested_tasks[:3] + recursively_nested_tasks)
+ assert set(graph.get_dependencies(tasks[0])) == set()
+ for i in xrange(1, len(tasks[:-1])):
+ assert set(graph.get_dependencies(tasks[i])) == set([tasks[i - 1]])
+
+ assert set(graph.get_dependencies(nested_tasks[0])) == set([tasks[2]])
+ for i in xrange(1, len(nested_tasks[:-1])):
+ assert set(graph.get_dependencies(nested_tasks[i])) == \
+ set([tasks[2], nested_tasks[i-1]])
+
+ assert set(graph.get_dependencies(recursively_nested_tasks[0])) == \
+ set([tasks[2], nested_tasks[2]])
+ for i in xrange(1, len(recursively_nested_tasks[:-1])):
+ assert set(graph.get_dependencies(recursively_nested_tasks[i])) == \
+ set([tasks[2], nested_tasks[2], recursively_nested_tasks[i-1]])
diff --git a/azure/aria/aria-extension-cloudify/src/aria/tests/orchestrator/workflows/builtin/__init__.py b/azure/aria/aria-extension-cloudify/src/aria/tests/orchestrator/workflows/builtin/__init__.py
new file mode 100644
index 0000000..1809f82
--- /dev/null
+++ b/azure/aria/aria-extension-cloudify/src/aria/tests/orchestrator/workflows/builtin/__init__.py
@@ -0,0 +1,70 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from aria.orchestrator.workflows.builtin import workflows
+
+
+def _assert_relationships(operations, expected_op_full_name, relationships=0):
+ """
+
+ :param operations: and iterable of operations
+ :param expected_op_full_name: Note that source/target doesn't really matter since they are
+ dropped
+ :param relationships: the number of relationships
+ :return:
+ """
+ expected_op_name = expected_op_full_name.rsplit('_', 1)[0]
+ for _ in xrange(relationships):
+ # Since the target and source operations start of the same way, we only need to retrieve the
+ # suffix once
+ operation = next(operations)
+ relationship_id_1 = operation.actor.id
+ _assert_cfg_interface_op(operation, expected_op_name)
+
+ operation = next(operations)
+ relationship_id_2 = operation.actor.id
+ _assert_cfg_interface_op(operation, expected_op_name)
+
+ assert relationship_id_1 == relationship_id_2
+
+
+def assert_node_install_operations(operations, relationships=0):
+ operations = iter(operations)
+
+ _assert_std_interface_op(next(operations), workflows.NORMATIVE_CREATE)
+ _assert_relationships(operations, workflows.NORMATIVE_PRE_CONFIGURE_SOURCE, relationships)
+ _assert_std_interface_op(next(operations), workflows.NORMATIVE_CONFIGURE)
+ _assert_relationships(operations, workflows.NORMATIVE_POST_CONFIGURE_SOURCE, relationships)
+ _assert_std_interface_op(next(operations), workflows.NORMATIVE_START)
+ _assert_relationships(operations, workflows.NORMATIVE_ADD_SOURCE, relationships)
+
+
+def assert_node_uninstall_operations(operations, relationships=0):
+ operations = iter(operations)
+
+ _assert_std_interface_op(next(operations), workflows.NORMATIVE_STOP)
+ _assert_relationships(operations, workflows.NORMATIVE_REMOVE_SOURCE, relationships)
+ _assert_std_interface_op(next(operations), workflows.NORMATIVE_DELETE)
+
+
+def _assert_cfg_interface_op(op, operation_name):
+ # We need to remove the source/target
+ assert op.operation_name.rsplit('_', 1)[0] == operation_name
+ assert op.interface_name == workflows.NORMATIVE_CONFIGURE_INTERFACE
+
+
+def _assert_std_interface_op(op, operation_name):
+ assert op.operation_name == operation_name
+ assert op.interface_name == workflows.NORMATIVE_STANDARD_INTERFACE
diff --git a/azure/aria/aria-extension-cloudify/src/aria/tests/orchestrator/workflows/builtin/test_execute_operation.py b/azure/aria/aria-extension-cloudify/src/aria/tests/orchestrator/workflows/builtin/test_execute_operation.py
new file mode 100644
index 0000000..8713e3c
--- /dev/null
+++ b/azure/aria/aria-extension-cloudify/src/aria/tests/orchestrator/workflows/builtin/test_execute_operation.py
@@ -0,0 +1,64 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import pytest
+
+from aria.orchestrator.workflows.api import task
+from aria.orchestrator.workflows.builtin.execute_operation import execute_operation
+
+from tests import mock, storage
+
+
+@pytest.fixture
+def ctx(tmpdir):
+ context = mock.context.simple(str(tmpdir), inmemory=False)
+ yield context
+ storage.release_sqlite_storage(context.model)
+
+
+def test_execute_operation(ctx):
+ node = ctx.model.node.get_by_name(mock.models.DEPENDENCY_NODE_NAME)
+ interface_name, operation_name = mock.operations.NODE_OPERATIONS_INSTALL[0]
+ interface = mock.models.create_interface(
+ ctx.service,
+ interface_name,
+ operation_name,
+ operation_kwargs=dict(function='test')
+ )
+ node.interfaces[interface.name] = interface
+ ctx.model.node.update(node)
+
+ execute_tasks = list(
+ task.WorkflowTask(
+ execute_operation,
+ ctx=ctx,
+ interface_name=interface_name,
+ operation_name=operation_name,
+ operation_kwargs={},
+ allow_kwargs_override=False,
+ run_by_dependency_order=False,
+ type_names=[],
+ node_template_ids=[],
+ node_ids=[node.id]
+ ).topological_order()
+ )
+
+ assert len(execute_tasks) == 1
+ assert getattr(execute_tasks[0].actor, '_wrapped', execute_tasks[0].actor) == node
+ assert execute_tasks[0].operation_name == operation_name
+ assert execute_tasks[0].interface_name == interface_name
+
+
+# TODO: add more scenarios
diff --git a/azure/aria/aria-extension-cloudify/src/aria/tests/orchestrator/workflows/builtin/test_heal.py b/azure/aria/aria-extension-cloudify/src/aria/tests/orchestrator/workflows/builtin/test_heal.py
new file mode 100644
index 0000000..0a422bd
--- /dev/null
+++ b/azure/aria/aria-extension-cloudify/src/aria/tests/orchestrator/workflows/builtin/test_heal.py
@@ -0,0 +1,100 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import pytest
+
+from aria.orchestrator.workflows.api import task
+from aria.orchestrator.workflows.builtin.heal import heal
+
+from tests import mock, storage
+
+from . import (assert_node_install_operations, assert_node_uninstall_operations)
+
+
+@pytest.fixture
+def ctx(tmpdir):
+ context = mock.context.simple(str(tmpdir))
+ yield context
+ storage.release_sqlite_storage(context.model)
+
+
+@pytest.mark.skip(reason='heal is not implemented for now')
+def test_heal_dependent_node(ctx):
+ dependent_node = \
+ ctx.model.node.get_by_name(mock.models.DEPENDENT_NODE_NAME)
+ dependent_node.host_fk = dependent_node.id
+ ctx.model.node.update(dependent_node)
+ heal_graph = task.WorkflowTask(heal, ctx=ctx, node_id=dependent_node.id)
+
+ assert len(list(heal_graph.tasks)) == 2
+ uninstall_subgraph, install_subgraph = list(heal_graph.topological_order(reverse=True))
+
+ assert len(list(uninstall_subgraph.tasks)) == 2
+ dependent_node_subgraph_uninstall, dependency_node_subgraph_uninstall = \
+ list(uninstall_subgraph.topological_order(reverse=True))
+
+ assert len(list(install_subgraph.tasks)) == 2
+ dependency_node_subgraph_install, dependent_node_subgraph_install = \
+ list(install_subgraph.topological_order(reverse=True))
+
+ dependent_node_uninstall_tasks = \
+ list(dependent_node_subgraph_uninstall.topological_order(reverse=True))
+ assert isinstance(dependency_node_subgraph_uninstall, task.StubTask)
+ dependent_node_install_tasks = \
+ list(dependent_node_subgraph_install.topological_order(reverse=True))
+ assert isinstance(dependency_node_subgraph_install, task.StubTask)
+
+ assert_node_uninstall_operations(dependent_node_uninstall_tasks, relationships=1)
+ assert_node_install_operations(dependent_node_install_tasks, relationships=1)
+
+
+@pytest.mark.skip(reason='heal is not implemented for now')
+def test_heal_dependency_node(ctx):
+ dependency_node = \
+ ctx.model.node.get_by_name(mock.models.DEPENDENCY_NODE_NAME)
+ dependency_node.host_fk = dependency_node.id
+ ctx.model.node.update(dependency_node)
+ heal_graph = task.WorkflowTask(heal, ctx=ctx, node_id=dependency_node.id)
+ # both subgraphs should contain un\install for both the dependent and the dependency
+ assert len(list(heal_graph.tasks)) == 2
+ uninstall_subgraph, install_subgraph = list(heal_graph.topological_order(reverse=True))
+
+ uninstall_tasks = list(uninstall_subgraph.topological_order(reverse=True))
+ assert len(uninstall_tasks) == 4
+ unlink_source, unlink_target = uninstall_tasks[:2]
+ dependent_node_subgraph_uninstall, dependency_node_subgraph_uninstall = uninstall_tasks[2:]
+
+ install_tasks = list(install_subgraph.topological_order(reverse=True))
+ assert len(install_tasks) == 4
+ dependency_node_subgraph_install, dependent_node_subgraph_install = install_tasks[:2]
+ establish_source, establish_target = install_tasks[2:]
+
+ assert isinstance(dependent_node_subgraph_uninstall, task.StubTask)
+ dependency_node_uninstall_tasks = \
+ list(dependency_node_subgraph_uninstall.topological_order(reverse=True))
+ assert isinstance(dependent_node_subgraph_install, task.StubTask)
+ dependency_node_install_tasks = \
+ list(dependency_node_subgraph_install.topological_order(reverse=True))
+
+ assert unlink_source.name.startswith('aria.interfaces.relationship_lifecycle.unlink')
+ assert unlink_target.name.startswith('aria.interfaces.relationship_lifecycle.unlink')
+ assert_node_uninstall_operations(dependency_node_uninstall_tasks)
+
+ assert_node_install_operations(dependency_node_install_tasks)
+ assert establish_source.name.startswith('aria.interfaces.relationship_lifecycle.establish')
+ assert establish_target.name.startswith('aria.interfaces.relationship_lifecycle.establish')
+
+
+# TODO: add tests for contained in scenario
diff --git a/azure/aria/aria-extension-cloudify/src/aria/tests/orchestrator/workflows/builtin/test_install.py b/azure/aria/aria-extension-cloudify/src/aria/tests/orchestrator/workflows/builtin/test_install.py
new file mode 100644
index 0000000..1a4e1f9
--- /dev/null
+++ b/azure/aria/aria-extension-cloudify/src/aria/tests/orchestrator/workflows/builtin/test_install.py
@@ -0,0 +1,46 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+import pytest
+
+from aria.orchestrator.workflows.api import task
+from aria.orchestrator.workflows.builtin.install import install
+
+from tests import mock
+from tests import storage
+
+from . import assert_node_install_operations
+
+
+@pytest.fixture
+def ctx(tmpdir):
+ context = mock.context.simple(str(tmpdir),
+ topology=mock.topology.create_simple_topology_three_nodes)
+ yield context
+ storage.release_sqlite_storage(context.model)
+
+
+def test_install(ctx):
+
+ install_tasks = list(task.WorkflowTask(install, ctx=ctx).topological_order(True))
+
+ assert len(install_tasks) == 3
+ dependency_node_subgraph1, dependency_node_subgraph2, dependent_node_subgraph = install_tasks
+ dependent_node_tasks = list(dependent_node_subgraph.topological_order(reverse=True))
+ dependency_node1_tasks = list(dependency_node_subgraph1.topological_order(reverse=True))
+ dependency_node2_tasks = list(dependency_node_subgraph2.topological_order(reverse=True))
+
+ assert_node_install_operations(dependency_node1_tasks)
+ assert_node_install_operations(dependency_node2_tasks)
+ assert_node_install_operations(dependent_node_tasks, relationships=2)
diff --git a/azure/aria/aria-extension-cloudify/src/aria/tests/orchestrator/workflows/builtin/test_uninstall.py b/azure/aria/aria-extension-cloudify/src/aria/tests/orchestrator/workflows/builtin/test_uninstall.py
new file mode 100644
index 0000000..aa04c38
--- /dev/null
+++ b/azure/aria/aria-extension-cloudify/src/aria/tests/orchestrator/workflows/builtin/test_uninstall.py
@@ -0,0 +1,47 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import pytest
+
+from aria.orchestrator.workflows.api import task
+from aria.orchestrator.workflows.builtin.uninstall import uninstall
+
+from tests import mock
+from tests import storage
+
+from . import assert_node_uninstall_operations
+
+
+@pytest.fixture
+def ctx(tmpdir):
+ context = mock.context.simple(str(tmpdir),
+ topology=mock.topology.create_simple_topology_three_nodes)
+ yield context
+ storage.release_sqlite_storage(context.model)
+
+
+def test_uninstall(ctx):
+
+ uninstall_tasks = list(task.WorkflowTask(uninstall, ctx=ctx).topological_order(True))
+
+ assert len(uninstall_tasks) == 3
+ dependent_node_subgraph, dependency_node_subgraph1, dependency_node_subgraph2 = uninstall_tasks
+ dependent_node_tasks = list(dependent_node_subgraph.topological_order(reverse=True))
+ dependency_node1_tasks = list(dependency_node_subgraph1.topological_order(reverse=True))
+ dependency_node2_tasks = list(dependency_node_subgraph2.topological_order(reverse=True))
+
+ assert_node_uninstall_operations(operations=dependency_node1_tasks)
+ assert_node_uninstall_operations(operations=dependency_node2_tasks)
+ assert_node_uninstall_operations(operations=dependent_node_tasks, relationships=2)
diff --git a/azure/aria/aria-extension-cloudify/src/aria/tests/orchestrator/workflows/core/__init__.py b/azure/aria/aria-extension-cloudify/src/aria/tests/orchestrator/workflows/core/__init__.py
new file mode 100644
index 0000000..ae1e83e
--- /dev/null
+++ b/azure/aria/aria-extension-cloudify/src/aria/tests/orchestrator/workflows/core/__init__.py
@@ -0,0 +1,14 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
diff --git a/azure/aria/aria-extension-cloudify/src/aria/tests/orchestrator/workflows/core/test_engine.py b/azure/aria/aria-extension-cloudify/src/aria/tests/orchestrator/workflows/core/test_engine.py
new file mode 100644
index 0000000..0c704f5
--- /dev/null
+++ b/azure/aria/aria-extension-cloudify/src/aria/tests/orchestrator/workflows/core/test_engine.py
@@ -0,0 +1,564 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+import time
+import threading
+from datetime import datetime
+
+import pytest
+
+from aria.orchestrator import (
+ events,
+ workflow,
+ operation,
+)
+from aria.modeling import models
+from aria.orchestrator.workflows import (
+ api,
+ exceptions,
+)
+from aria.orchestrator.workflows.core import engine, graph_compiler
+from aria.orchestrator.workflows.executor import thread
+
+from tests import mock, storage
+
+
+global_test_holder = {}
+
+
+class BaseTest(object):
+
+ @classmethod
+ def _execute(cls, workflow_func, workflow_context, executor):
+ eng = cls._engine(workflow_func=workflow_func,
+ workflow_context=workflow_context,
+ executor=executor)
+ eng.execute(ctx=workflow_context)
+ return eng
+
+ @staticmethod
+ def _engine(workflow_func, workflow_context, executor):
+ graph = workflow_func(ctx=workflow_context)
+ graph_compiler.GraphCompiler(workflow_context, executor.__class__).compile(graph)
+
+ return engine.Engine(executors={executor.__class__: executor})
+
+ @staticmethod
+ def _create_interface(ctx, func, arguments=None):
+ node = ctx.model.node.get_by_name(mock.models.DEPENDENCY_NODE_NAME)
+ interface_name = 'aria.interfaces.lifecycle'
+ operation_kwargs = dict(function='{name}.{func.__name__}'.format(
+ name=__name__, func=func))
+ if arguments:
+ # the operation has to declare the arguments before those may be passed
+ operation_kwargs['arguments'] = arguments
+ operation_name = 'create'
+ interface = mock.models.create_interface(node.service, interface_name, operation_name,
+ operation_kwargs=operation_kwargs)
+ node.interfaces[interface.name] = interface
+ ctx.model.node.update(node)
+
+ return node, interface_name, operation_name
+
+ @staticmethod
+ def _op(node,
+ operation_name,
+ arguments=None,
+ max_attempts=None,
+ retry_interval=None,
+ ignore_failure=None):
+
+ return api.task.OperationTask(
+ node,
+ interface_name='aria.interfaces.lifecycle',
+ operation_name=operation_name,
+ arguments=arguments,
+ max_attempts=max_attempts,
+ retry_interval=retry_interval,
+ ignore_failure=ignore_failure,
+ )
+
+ @pytest.fixture(autouse=True)
+ def globals_cleanup(self):
+ try:
+ yield
+ finally:
+ global_test_holder.clear()
+
+ @pytest.fixture(autouse=True)
+ def signals_registration(self, ):
+ def sent_task_handler(ctx, *args, **kwargs):
+ if ctx.task._stub_type is None:
+ calls = global_test_holder.setdefault('sent_task_signal_calls', 0)
+ global_test_holder['sent_task_signal_calls'] = calls + 1
+
+ def start_workflow_handler(workflow_context, *args, **kwargs):
+ workflow_context.states.append('start')
+
+ def success_workflow_handler(workflow_context, *args, **kwargs):
+ workflow_context.states.append('success')
+
+ def failure_workflow_handler(workflow_context, exception, *args, **kwargs):
+ workflow_context.states.append('failure')
+ workflow_context.exception = exception
+
+ def cancel_workflow_handler(workflow_context, *args, **kwargs):
+ workflow_context.states.append('cancel')
+
+ events.start_workflow_signal.connect(start_workflow_handler)
+ events.on_success_workflow_signal.connect(success_workflow_handler)
+ events.on_failure_workflow_signal.connect(failure_workflow_handler)
+ events.on_cancelled_workflow_signal.connect(cancel_workflow_handler)
+ events.sent_task_signal.connect(sent_task_handler)
+ try:
+ yield
+ finally:
+ events.start_workflow_signal.disconnect(start_workflow_handler)
+ events.on_success_workflow_signal.disconnect(success_workflow_handler)
+ events.on_failure_workflow_signal.disconnect(failure_workflow_handler)
+ events.on_cancelled_workflow_signal.disconnect(cancel_workflow_handler)
+ events.sent_task_signal.disconnect(sent_task_handler)
+
+ @pytest.fixture
+ def executor(self):
+ result = thread.ThreadExecutor()
+ try:
+ yield result
+ finally:
+ result.close()
+
+ @pytest.fixture
+ def workflow_context(self, tmpdir):
+ workflow_context = mock.context.simple(str(tmpdir))
+ workflow_context.states = []
+ workflow_context.exception = None
+ yield workflow_context
+ storage.release_sqlite_storage(workflow_context.model)
+
+
+class TestEngine(BaseTest):
+
+ def test_empty_graph_execution(self, workflow_context, executor):
+ @workflow
+ def mock_workflow(**_):
+ pass
+ self._execute(workflow_func=mock_workflow,
+ workflow_context=workflow_context,
+ executor=executor)
+ assert workflow_context.states == ['start', 'success']
+ assert workflow_context.exception is None
+ assert 'sent_task_signal_calls' not in global_test_holder
+ execution = workflow_context.execution
+ assert execution.started_at <= execution.ended_at <= datetime.utcnow()
+ assert execution.error is None
+ assert execution.status == models.Execution.SUCCEEDED
+
+ def test_single_task_successful_execution(self, workflow_context, executor):
+ node, _, operation_name = self._create_interface(workflow_context, mock_success_task)
+
+ @workflow
+ def mock_workflow(ctx, graph):
+ graph.add_tasks(self._op(node, operation_name))
+ self._execute(
+ workflow_func=mock_workflow,
+ workflow_context=workflow_context,
+ executor=executor)
+ assert workflow_context.states == ['start', 'success']
+ assert workflow_context.exception is None
+ assert global_test_holder.get('sent_task_signal_calls') == 1
+
+ def test_single_task_failed_execution(self, workflow_context, executor):
+ node, _, operation_name = self._create_interface(workflow_context, mock_failed_task)
+
+ @workflow
+ def mock_workflow(ctx, graph):
+ graph.add_tasks(self._op(node, operation_name))
+ with pytest.raises(exceptions.ExecutorException):
+ self._execute(
+ workflow_func=mock_workflow,
+ workflow_context=workflow_context,
+ executor=executor)
+ assert workflow_context.states == ['start', 'failure']
+ assert isinstance(workflow_context.exception, exceptions.ExecutorException)
+ assert global_test_holder.get('sent_task_signal_calls') == 1
+ execution = workflow_context.execution
+ assert execution.started_at <= execution.ended_at <= datetime.utcnow()
+ assert execution.error is not None
+ assert execution.status == models.Execution.FAILED
+
+ def test_two_tasks_execution_order(self, workflow_context, executor):
+ node, _, operation_name = self._create_interface(
+ workflow_context, mock_ordered_task, {'counter': 1})
+
+ @workflow
+ def mock_workflow(ctx, graph):
+ op1 = self._op(node, operation_name, arguments={'counter': 1})
+ op2 = self._op(node, operation_name, arguments={'counter': 2})
+ graph.sequence(op1, op2)
+ self._execute(
+ workflow_func=mock_workflow,
+ workflow_context=workflow_context,
+ executor=executor)
+ assert workflow_context.states == ['start', 'success']
+ assert workflow_context.exception is None
+ assert global_test_holder.get('invocations') == [1, 2]
+ assert global_test_holder.get('sent_task_signal_calls') == 2
+
+ def test_stub_and_subworkflow_execution(self, workflow_context, executor):
+ node, _, operation_name = self._create_interface(
+ workflow_context, mock_ordered_task, {'counter': 1})
+
+ @workflow
+ def sub_workflow(ctx, graph):
+ op1 = self._op(node, operation_name, arguments={'counter': 1})
+ op2 = api.task.StubTask()
+ op3 = self._op(node, operation_name, arguments={'counter': 2})
+ graph.sequence(op1, op2, op3)
+
+ @workflow
+ def mock_workflow(ctx, graph):
+ graph.add_tasks(api.task.WorkflowTask(sub_workflow, ctx=ctx))
+ self._execute(workflow_func=mock_workflow,
+ workflow_context=workflow_context,
+ executor=executor)
+ assert workflow_context.states == ['start', 'success']
+ assert workflow_context.exception is None
+ assert global_test_holder.get('invocations') == [1, 2]
+ assert global_test_holder.get('sent_task_signal_calls') == 2
+
+
+class TestCancel(BaseTest):
+
+ def test_cancel_started_execution(self, workflow_context, executor):
+ number_of_tasks = 100
+ node, _, operation_name = self._create_interface(
+ workflow_context, mock_sleep_task, {'seconds': 0.1})
+
+ @workflow
+ def mock_workflow(ctx, graph):
+ operations = (
+ self._op(node, operation_name, arguments=dict(seconds=0.1))
+ for _ in range(number_of_tasks)
+ )
+ return graph.sequence(*operations)
+
+ eng = self._engine(workflow_func=mock_workflow,
+ workflow_context=workflow_context,
+ executor=executor)
+ t = threading.Thread(target=eng.execute, kwargs=dict(ctx=workflow_context))
+ t.daemon = True
+ t.start()
+ time.sleep(10)
+ eng.cancel_execution(workflow_context)
+ t.join(timeout=60) # we need to give this a *lot* of time because Travis can be *very* slow
+ assert not t.is_alive() # if join is timed out it will not raise an exception
+ assert workflow_context.states == ['start', 'cancel']
+ assert workflow_context.exception is None
+ invocations = global_test_holder.get('invocations', [])
+ assert 0 < len(invocations) < number_of_tasks
+ execution = workflow_context.execution
+ assert execution.started_at <= execution.ended_at <= datetime.utcnow()
+ assert execution.error is None
+ assert execution.status == models.Execution.CANCELLED
+
+ def test_cancel_pending_execution(self, workflow_context, executor):
+ @workflow
+ def mock_workflow(graph, **_):
+ return graph
+ eng = self._engine(workflow_func=mock_workflow,
+ workflow_context=workflow_context,
+ executor=executor)
+ eng.cancel_execution(workflow_context)
+ execution = workflow_context.execution
+ assert execution.status == models.Execution.CANCELLED
+
+
+class TestRetries(BaseTest):
+
+ def test_two_max_attempts_and_success_on_retry(self, workflow_context, executor):
+ node, _, operation_name = self._create_interface(
+ workflow_context, mock_conditional_failure_task, {'failure_count': 1})
+
+ @workflow
+ def mock_workflow(ctx, graph):
+ op = self._op(node, operation_name,
+ arguments={'failure_count': 1},
+ max_attempts=2)
+ graph.add_tasks(op)
+ self._execute(
+ workflow_func=mock_workflow,
+ workflow_context=workflow_context,
+ executor=executor)
+ assert workflow_context.states == ['start', 'success']
+ assert workflow_context.exception is None
+ assert len(global_test_holder.get('invocations', [])) == 2
+ assert global_test_holder.get('sent_task_signal_calls') == 2
+
+ def test_two_max_attempts_and_failure_on_retry(self, workflow_context, executor):
+ node, _, operation_name = self._create_interface(
+ workflow_context, mock_conditional_failure_task, {'failure_count': 1})
+
+ @workflow
+ def mock_workflow(ctx, graph):
+ op = self._op(node, operation_name,
+ arguments={'failure_count': 2},
+ max_attempts=2)
+ graph.add_tasks(op)
+ with pytest.raises(exceptions.ExecutorException):
+ self._execute(
+ workflow_func=mock_workflow,
+ workflow_context=workflow_context,
+ executor=executor)
+ assert workflow_context.states == ['start', 'failure']
+ assert isinstance(workflow_context.exception, exceptions.ExecutorException)
+ assert len(global_test_holder.get('invocations', [])) == 2
+ assert global_test_holder.get('sent_task_signal_calls') == 2
+
+ def test_three_max_attempts_and_success_on_first_retry(self, workflow_context, executor):
+ node, _, operation_name = self._create_interface(
+ workflow_context, mock_conditional_failure_task, {'failure_count': 1})
+ @workflow
+ def mock_workflow(ctx, graph):
+ op = self._op(node, operation_name,
+ arguments={'failure_count': 1},
+ max_attempts=3)
+ graph.add_tasks(op)
+ self._execute(
+ workflow_func=mock_workflow,
+ workflow_context=workflow_context,
+ executor=executor)
+ assert workflow_context.states == ['start', 'success']
+ assert workflow_context.exception is None
+ assert len(global_test_holder.get('invocations', [])) == 2
+ assert global_test_holder.get('sent_task_signal_calls') == 2
+
+ def test_three_max_attempts_and_success_on_second_retry(self, workflow_context, executor):
+ node, _, operation_name = self._create_interface(
+ workflow_context, mock_conditional_failure_task, {'failure_count': 1})
+
+ @workflow
+ def mock_workflow(ctx, graph):
+ op = self._op(node, operation_name,
+ arguments={'failure_count': 2},
+ max_attempts=3)
+ graph.add_tasks(op)
+ self._execute(
+ workflow_func=mock_workflow,
+ workflow_context=workflow_context,
+ executor=executor)
+ assert workflow_context.states == ['start', 'success']
+ assert workflow_context.exception is None
+ assert len(global_test_holder.get('invocations', [])) == 3
+ assert global_test_holder.get('sent_task_signal_calls') == 3
+
+ def test_infinite_retries(self, workflow_context, executor):
+ node, _, operation_name = self._create_interface(
+ workflow_context, mock_conditional_failure_task, {'failure_count': 1})
+ @workflow
+ def mock_workflow(ctx, graph):
+ op = self._op(node, operation_name,
+ arguments={'failure_count': 1},
+ max_attempts=-1)
+ graph.add_tasks(op)
+ self._execute(
+ workflow_func=mock_workflow,
+ workflow_context=workflow_context,
+ executor=executor)
+ assert workflow_context.states == ['start', 'success']
+ assert workflow_context.exception is None
+ assert len(global_test_holder.get('invocations', [])) == 2
+ assert global_test_holder.get('sent_task_signal_calls') == 2
+
+ def test_retry_interval_float(self, workflow_context, executor):
+ self._test_retry_interval(retry_interval=0.3,
+ workflow_context=workflow_context,
+ executor=executor)
+
+ def test_retry_interval_int(self, workflow_context, executor):
+ self._test_retry_interval(retry_interval=1,
+ workflow_context=workflow_context,
+ executor=executor)
+
+ def _test_retry_interval(self, retry_interval, workflow_context, executor):
+ node, _, operation_name = self._create_interface(
+ workflow_context, mock_conditional_failure_task, {'failure_count': 1})
+ @workflow
+ def mock_workflow(ctx, graph):
+ op = self._op(node, operation_name,
+ arguments={'failure_count': 1},
+ max_attempts=2,
+ retry_interval=retry_interval)
+ graph.add_tasks(op)
+ self._execute(
+ workflow_func=mock_workflow,
+ workflow_context=workflow_context,
+ executor=executor)
+ assert workflow_context.states == ['start', 'success']
+ assert workflow_context.exception is None
+ invocations = global_test_holder.get('invocations', [])
+ assert len(invocations) == 2
+ invocation1, invocation2 = invocations
+ assert invocation2 - invocation1 >= retry_interval
+ assert global_test_holder.get('sent_task_signal_calls') == 2
+
+ def test_ignore_failure(self, workflow_context, executor):
+ node, _, operation_name = self._create_interface(
+ workflow_context, mock_conditional_failure_task, {'failure_count': 1})
+ @workflow
+ def mock_workflow(ctx, graph):
+ op = self._op(node, operation_name,
+ ignore_failure=True,
+ arguments={'failure_count': 100},
+ max_attempts=100)
+ graph.add_tasks(op)
+ self._execute(
+ workflow_func=mock_workflow,
+ workflow_context=workflow_context,
+ executor=executor)
+ assert workflow_context.states == ['start', 'success']
+ assert workflow_context.exception is None
+ invocations = global_test_holder.get('invocations', [])
+ assert len(invocations) == 1
+ assert global_test_holder.get('sent_task_signal_calls') == 1
+
+
+class TestTaskRetryAndAbort(BaseTest):
+ message = 'EXPECTED_ERROR'
+
+ def test_task_retry_default_interval(self, workflow_context, executor):
+ default_retry_interval = 0.1
+ node, _, operation_name = self._create_interface(
+ workflow_context, mock_task_retry, {'message': self.message})
+
+ @workflow
+ def mock_workflow(ctx, graph):
+ op = self._op(node, operation_name,
+ arguments={'message': self.message},
+ retry_interval=default_retry_interval,
+ max_attempts=2)
+ graph.add_tasks(op)
+ with pytest.raises(exceptions.ExecutorException):
+ self._execute(
+ workflow_func=mock_workflow,
+ workflow_context=workflow_context,
+ executor=executor)
+ assert workflow_context.states == ['start', 'failure']
+ assert isinstance(workflow_context.exception, exceptions.ExecutorException)
+ invocations = global_test_holder.get('invocations', [])
+ assert len(invocations) == 2
+ invocation1, invocation2 = invocations
+ assert invocation2 - invocation1 >= default_retry_interval
+ assert global_test_holder.get('sent_task_signal_calls') == 2
+
+ def test_task_retry_custom_interval(self, workflow_context, executor):
+ default_retry_interval = 100
+ custom_retry_interval = 0.1
+ node, _, operation_name = self._create_interface(
+ workflow_context, mock_task_retry, {'message': self.message,
+ 'retry_interval': custom_retry_interval})
+
+ @workflow
+ def mock_workflow(ctx, graph):
+ op = self._op(node, operation_name,
+ arguments={'message': self.message,
+ 'retry_interval': custom_retry_interval},
+ retry_interval=default_retry_interval,
+ max_attempts=2)
+ graph.add_tasks(op)
+ execution_start = time.time()
+ with pytest.raises(exceptions.ExecutorException):
+ self._execute(
+ workflow_func=mock_workflow,
+ workflow_context=workflow_context,
+ executor=executor)
+ execution_end = time.time()
+ assert workflow_context.states == ['start', 'failure']
+ assert isinstance(workflow_context.exception, exceptions.ExecutorException)
+ invocations = global_test_holder.get('invocations', [])
+ assert len(invocations) == 2
+ assert (execution_end - execution_start) < default_retry_interval
+ assert global_test_holder.get('sent_task_signal_calls') == 2
+
+ def test_task_abort(self, workflow_context, executor):
+ node, _, operation_name = self._create_interface(
+ workflow_context, mock_task_abort, {'message': self.message})
+ @workflow
+ def mock_workflow(ctx, graph):
+ op = self._op(node, operation_name,
+ arguments={'message': self.message},
+ retry_interval=100,
+ max_attempts=100)
+ graph.add_tasks(op)
+ with pytest.raises(exceptions.ExecutorException):
+ self._execute(
+ workflow_func=mock_workflow,
+ workflow_context=workflow_context,
+ executor=executor)
+ assert workflow_context.states == ['start', 'failure']
+ assert isinstance(workflow_context.exception, exceptions.ExecutorException)
+ invocations = global_test_holder.get('invocations', [])
+ assert len(invocations) == 1
+ assert global_test_holder.get('sent_task_signal_calls') == 1
+
+
+@operation
+def mock_success_task(**_):
+ pass
+
+
+@operation
+def mock_failed_task(**_):
+ raise RuntimeError
+
+
+@operation
+def mock_ordered_task(counter, **_):
+ invocations = global_test_holder.setdefault('invocations', [])
+ invocations.append(counter)
+
+
+@operation
+def mock_conditional_failure_task(failure_count, **_):
+ invocations = global_test_holder.setdefault('invocations', [])
+ try:
+ if len(invocations) < failure_count:
+ raise RuntimeError
+ finally:
+ invocations.append(time.time())
+
+
+@operation
+def mock_sleep_task(seconds, **_):
+ _add_invocation_timestamp()
+ time.sleep(seconds)
+
+
+@operation
+def mock_task_retry(ctx, message, retry_interval=None, **_):
+ _add_invocation_timestamp()
+ retry_kwargs = {}
+ if retry_interval is not None:
+ retry_kwargs['retry_interval'] = retry_interval
+ ctx.task.retry(message, **retry_kwargs)
+
+
+@operation
+def mock_task_abort(ctx, message, **_):
+ _add_invocation_timestamp()
+ ctx.task.abort(message)
+
+
+def _add_invocation_timestamp():
+ invocations = global_test_holder.setdefault('invocations', [])
+ invocations.append(time.time())
diff --git a/azure/aria/aria-extension-cloudify/src/aria/tests/orchestrator/workflows/core/test_events.py b/azure/aria/aria-extension-cloudify/src/aria/tests/orchestrator/workflows/core/test_events.py
new file mode 100644
index 0000000..d804de5
--- /dev/null
+++ b/azure/aria/aria-extension-cloudify/src/aria/tests/orchestrator/workflows/core/test_events.py
@@ -0,0 +1,171 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import pytest
+
+from aria.orchestrator.decorators import operation, workflow
+from aria.orchestrator.workflows.core import engine, graph_compiler
+from aria.orchestrator.workflows.executor.thread import ThreadExecutor
+from aria.orchestrator.workflows import api
+from aria.modeling.service_instance import NodeBase
+
+from tests import mock, storage
+
+global_test_dict = {} # used to capture transitional node state changes
+
+
+@pytest.fixture
+def ctx(tmpdir):
+ context = mock.context.simple(str(tmpdir))
+ yield context
+ storage.release_sqlite_storage(context.model)
+
+# TODO another possible approach of writing these tests:
+# Don't create a ctx for every test.
+# Problem is, that if for every test we create a workflow that contains just one standard
+# lifecycle operation, then by the time we try to run the second test, the workflow failes since
+# the execution tries to go from 'terminated' to 'pending'.
+# And if we write a workflow that contains all the lifecycle operations, then first we need to
+# change the api of `mock.models.create_interface`, which a lot of other tests use, and second how
+# do we check all the state transition during the workflow execution in a convenient way.
+
+TYPE_URI_NAME = 'tosca.interfaces.node.lifecycle.Standard'
+SHORTHAND_NAME = 'Standard'
+
+
+def test_node_state_changes_as_a_result_of_standard_lifecycle_create(ctx, executor):
+ node = run_operation_on_node(
+ ctx, interface_name=TYPE_URI_NAME, op_name='create', executor=executor)
+ _assert_node_state_changed_as_a_result_of_standard_lifecycle_operation(node, 'create')
+
+
+def test_node_state_changes_as_a_result_of_standard_lifecycle_configure(ctx, executor):
+ node = run_operation_on_node(
+ ctx, interface_name=TYPE_URI_NAME, op_name='configure', executor=executor)
+ _assert_node_state_changed_as_a_result_of_standard_lifecycle_operation(node, 'configure')
+
+
+def test_node_state_changes_as_a_result_of_standard_lifecycle_start(ctx, executor):
+ node = run_operation_on_node(
+ ctx, interface_name=TYPE_URI_NAME, op_name='start', executor=executor)
+ _assert_node_state_changed_as_a_result_of_standard_lifecycle_operation(node, 'start')
+
+
+def test_node_state_changes_as_a_result_of_standard_lifecycle_stop(ctx, executor):
+ node = run_operation_on_node(
+ ctx, interface_name=TYPE_URI_NAME, op_name='stop', executor=executor)
+ _assert_node_state_changed_as_a_result_of_standard_lifecycle_operation(node, 'stop')
+
+
+def test_node_state_changes_as_a_result_of_standard_lifecycle_delete(ctx, executor):
+ node = run_operation_on_node(
+ ctx, interface_name=TYPE_URI_NAME, op_name='delete', executor=executor)
+ _assert_node_state_changed_as_a_result_of_standard_lifecycle_operation(node, 'delete')
+
+
+def test_node_state_changes_as_a_result_of_standard_lifecycle_create_shorthand_name(ctx, executor):
+ node = run_operation_on_node(
+ ctx, interface_name=SHORTHAND_NAME, op_name='create', executor=executor)
+ _assert_node_state_changed_as_a_result_of_standard_lifecycle_operation(node, 'create')
+
+
+def test_node_state_changes_as_a_result_of_standard_lifecycle_configure_shorthand_name(
+ ctx, executor):
+ node = run_operation_on_node(
+ ctx, interface_name=SHORTHAND_NAME, op_name='configure', executor=executor)
+ _assert_node_state_changed_as_a_result_of_standard_lifecycle_operation(node, 'configure')
+
+
+def test_node_state_changes_as_a_result_of_standard_lifecycle_start_shorthand_name(ctx, executor):
+ node = run_operation_on_node(
+ ctx, interface_name=SHORTHAND_NAME, op_name='start', executor=executor)
+ _assert_node_state_changed_as_a_result_of_standard_lifecycle_operation(node, 'start')
+
+
+def test_node_state_changes_as_a_result_of_standard_lifecycle_stop_shorthand_name(ctx, executor):
+ node = run_operation_on_node(
+ ctx, interface_name=SHORTHAND_NAME, op_name='stop', executor=executor)
+ _assert_node_state_changed_as_a_result_of_standard_lifecycle_operation(node, 'stop')
+
+
+def test_node_state_changes_as_a_result_of_standard_lifecycle_delete_shorthand_name(ctx, executor):
+ node = run_operation_on_node(
+ ctx, interface_name=SHORTHAND_NAME, op_name='delete', executor=executor)
+ _assert_node_state_changed_as_a_result_of_standard_lifecycle_operation(node, 'delete')
+
+
+def test_node_state_doesnt_change_as_a_result_of_an_operation_that_is_not_standard_lifecycle1(
+ ctx, executor):
+ node = run_operation_on_node(
+ ctx, interface_name='interface_name', op_name='op_name', executor=executor)
+ assert node.state == node.INITIAL
+
+
+def test_node_state_doesnt_change_as_a_result_of_an_operation_that_is_not_standard_lifecycle2(
+ ctx, executor):
+ node = run_operation_on_node(
+ ctx, interface_name='interface_name', op_name='create', executor=executor)
+ assert node.state == node.INITIAL
+
+
+def run_operation_on_node(ctx, op_name, interface_name, executor):
+ node = ctx.model.node.get_by_name(mock.models.DEPENDENCY_NODE_NAME)
+ interface = mock.models.create_interface(
+ service=node.service,
+ interface_name=interface_name,
+ operation_name=op_name,
+ operation_kwargs=dict(function='{name}.{func.__name__}'.format(name=__name__, func=func)))
+ node.interfaces[interface.name] = interface
+ graph_compiler.GraphCompiler(ctx, ThreadExecutor).compile(
+ single_operation_workflow(ctx, node=node, interface_name=interface_name, op_name=op_name)
+ )
+
+ eng = engine.Engine(executors={executor.__class__: executor})
+ eng.execute(ctx)
+ return node
+
+
+def run_standard_lifecycle_operation_on_node(ctx, op_name, executor):
+ return run_operation_on_node(ctx,
+ interface_name='aria.interfaces.lifecycle.Standard',
+ op_name=op_name,
+ executor=executor)
+
+
+def _assert_node_state_changed_as_a_result_of_standard_lifecycle_operation(node, op_name):
+ assert global_test_dict['transitional_state'] == NodeBase._OP_TO_STATE[op_name]['transitional']
+ assert node.state == NodeBase._OP_TO_STATE[op_name]['finished']
+
+
+@workflow
+def single_operation_workflow(graph, node, interface_name, op_name, **_):
+ graph.add_tasks(api.task.OperationTask(
+ node,
+ interface_name=interface_name,
+ operation_name=op_name))
+
+
+@operation
+def func(ctx):
+ global_test_dict['transitional_state'] = ctx.node.state
+
+
+@pytest.fixture
+def executor():
+ result = ThreadExecutor()
+ try:
+ yield result
+ finally:
+ result.close()
diff --git a/azure/aria/aria-extension-cloudify/src/aria/tests/orchestrator/workflows/core/test_task.py b/azure/aria/aria-extension-cloudify/src/aria/tests/orchestrator/workflows/core/test_task.py
new file mode 100644
index 0000000..2b3f7d7
--- /dev/null
+++ b/azure/aria/aria-extension-cloudify/src/aria/tests/orchestrator/workflows/core/test_task.py
@@ -0,0 +1,153 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+from datetime import (
+ datetime,
+ timedelta
+)
+
+import pytest
+
+from aria.orchestrator.context import workflow as workflow_context
+from aria.orchestrator.workflows import (
+ api,
+ exceptions,
+)
+from aria.modeling import models
+
+from tests import mock, storage
+
+NODE_INTERFACE_NAME = 'Standard'
+NODE_OPERATION_NAME = 'create'
+RELATIONSHIP_INTERFACE_NAME = 'Configure'
+RELATIONSHIP_OPERATION_NAME = 'pre_configure'
+
+
+@pytest.fixture
+def ctx(tmpdir):
+ context = mock.context.simple(str(tmpdir))
+
+ relationship = context.model.relationship.list()[0]
+ interface = mock.models.create_interface(
+ relationship.source_node.service,
+ RELATIONSHIP_INTERFACE_NAME,
+ RELATIONSHIP_OPERATION_NAME,
+ operation_kwargs=dict(function='test')
+ )
+ relationship.interfaces[interface.name] = interface
+ context.model.relationship.update(relationship)
+
+ node = context.model.node.get_by_name(mock.models.DEPENDENCY_NODE_NAME)
+ interface = mock.models.create_interface(
+ node.service,
+ NODE_INTERFACE_NAME,
+ NODE_OPERATION_NAME,
+ operation_kwargs=dict(function='test')
+ )
+ node.interfaces[interface.name] = interface
+ context.model.node.update(node)
+
+ yield context
+ storage.release_sqlite_storage(context.model)
+
+
+class TestOperationTask(object):
+
+ def _create_node_operation_task(self, ctx, node):
+ with workflow_context.current.push(ctx):
+ api_task = api.task.OperationTask(
+ node,
+ interface_name=NODE_INTERFACE_NAME,
+ operation_name=NODE_OPERATION_NAME)
+ model_task = models.Task.from_api_task(api_task, None)
+ return api_task, model_task
+
+ def _create_relationship_operation_task(self, ctx, relationship):
+ with workflow_context.current.push(ctx):
+ api_task = api.task.OperationTask(
+ relationship,
+ interface_name=RELATIONSHIP_INTERFACE_NAME,
+ operation_name=RELATIONSHIP_OPERATION_NAME)
+ core_task = models.Task.from_api_task(api_task, None)
+ return api_task, core_task
+
+ def test_node_operation_task_creation(self, ctx):
+ storage_plugin = mock.models.create_plugin('p1', '0.1')
+ storage_plugin_other = mock.models.create_plugin('p0', '0.0')
+ ctx.model.plugin.put(storage_plugin)
+ ctx.model.plugin.put(storage_plugin_other)
+ node = ctx.model.node.get_by_name(mock.models.DEPENDENCY_NODE_NAME)
+ interface = mock.models.create_interface(
+ node.service,
+ NODE_INTERFACE_NAME,
+ NODE_OPERATION_NAME,
+ operation_kwargs=dict(plugin=storage_plugin, function='test')
+ )
+ node.interfaces[interface.name] = interface
+ ctx.model.node.update(node)
+ api_task, model_task = self._create_node_operation_task(ctx, node)
+ assert model_task.name == api_task.name
+ assert model_task.function == api_task.function
+ assert model_task.actor == api_task.actor == node
+ assert model_task.arguments == api_task.arguments
+ assert model_task.plugin == storage_plugin
+
+ def test_relationship_operation_task_creation(self, ctx):
+ relationship = ctx.model.relationship.list()[0]
+ ctx.model.relationship.update(relationship)
+ _, model_task = self._create_relationship_operation_task(
+ ctx, relationship)
+ assert model_task.actor == relationship
+
+ @pytest.mark.skip("Currently not supported for model tasks")
+ def test_operation_task_edit_locked_attribute(self, ctx):
+ node = ctx.model.node.get_by_name(mock.models.DEPENDENCY_NODE_NAME)
+
+ _, core_task = self._create_node_operation_task(ctx, node)
+ now = datetime.utcnow()
+ with pytest.raises(exceptions.TaskException):
+ core_task.status = core_task.STARTED
+ with pytest.raises(exceptions.TaskException):
+ core_task.started_at = now
+ with pytest.raises(exceptions.TaskException):
+ core_task.ended_at = now
+ with pytest.raises(exceptions.TaskException):
+ core_task.attempts_count = 2
+ with pytest.raises(exceptions.TaskException):
+ core_task.due_at = now
+
+ @pytest.mark.skip("Currently not supported for model tasks")
+ def test_operation_task_edit_attributes(self, ctx):
+ node = ctx.model.node.get_by_name(mock.models.DEPENDENCY_NODE_NAME)
+
+ _, core_task = self._create_node_operation_task(ctx, node)
+ future_time = datetime.utcnow() + timedelta(seconds=3)
+
+ with core_task._update():
+ core_task.status = core_task.STARTED
+ core_task.started_at = future_time
+ core_task.ended_at = future_time
+ core_task.attempts_count = 2
+ core_task.due_at = future_time
+ assert core_task.status != core_task.STARTED
+ assert core_task.started_at != future_time
+ assert core_task.ended_at != future_time
+ assert core_task.attempts_count != 2
+ assert core_task.due_at != future_time
+
+ assert core_task.status == core_task.STARTED
+ assert core_task.started_at == future_time
+ assert core_task.ended_at == future_time
+ assert core_task.attempts_count == 2
+ assert core_task.due_at == future_time
diff --git a/azure/aria/aria-extension-cloudify/src/aria/tests/orchestrator/workflows/core/test_task_graph_into_execution_graph.py b/azure/aria/aria-extension-cloudify/src/aria/tests/orchestrator/workflows/core/test_task_graph_into_execution_graph.py
new file mode 100644
index 0000000..e24c901
--- /dev/null
+++ b/azure/aria/aria-extension-cloudify/src/aria/tests/orchestrator/workflows/core/test_task_graph_into_execution_graph.py
@@ -0,0 +1,172 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from networkx import topological_sort, DiGraph
+
+from aria.modeling import models
+from aria.orchestrator import context
+from aria.orchestrator.workflows import api
+from aria.orchestrator.workflows.core import graph_compiler
+from aria.orchestrator.workflows.executor import base
+from tests import mock
+from tests import storage
+
+
+def test_task_graph_into_execution_graph(tmpdir):
+ interface_name = 'Standard'
+ op1_name, op2_name, op3_name = 'create', 'configure', 'start'
+ workflow_context = mock.context.simple(str(tmpdir))
+ node = workflow_context.model.node.get_by_name(mock.models.DEPENDENCY_NODE_NAME)
+ interface = mock.models.create_interface(
+ node.service,
+ interface_name,
+ op1_name,
+ operation_kwargs=dict(function='test')
+ )
+ interface.operations[op2_name] = mock.models.create_operation(op2_name) # pylint: disable=unsubscriptable-object
+ interface.operations[op3_name] = mock.models.create_operation(op3_name) # pylint: disable=unsubscriptable-object
+ node.interfaces[interface.name] = interface
+ workflow_context.model.node.update(node)
+
+ def sub_workflow(name, **_):
+ return api.task_graph.TaskGraph(name)
+
+ with context.workflow.current.push(workflow_context):
+ test_task_graph = api.task.WorkflowTask(sub_workflow, name='test_task_graph')
+ simple_before_task = api.task.OperationTask(
+ node,
+ interface_name=interface_name,
+ operation_name=op1_name)
+ simple_after_task = api.task.OperationTask(
+ node,
+ interface_name=interface_name,
+ operation_name=op1_name)
+
+ inner_task_graph = api.task.WorkflowTask(sub_workflow, name='test_inner_task_graph')
+ inner_task_1 = api.task.OperationTask(
+ node,
+ interface_name=interface_name,
+ operation_name=op1_name)
+ inner_task_2 = api.task.OperationTask(
+ node,
+ interface_name=interface_name,
+ operation_name=op2_name)
+ inner_task_3 = api.task.OperationTask(
+ node,
+ interface_name=interface_name,
+ operation_name=op3_name)
+ inner_task_graph.add_tasks(inner_task_1)
+ inner_task_graph.add_tasks(inner_task_2)
+ inner_task_graph.add_tasks(inner_task_3)
+ inner_task_graph.add_dependency(inner_task_2, inner_task_1)
+ inner_task_graph.add_dependency(inner_task_3, inner_task_1)
+ inner_task_graph.add_dependency(inner_task_3, inner_task_2)
+
+ test_task_graph.add_tasks(simple_before_task)
+ test_task_graph.add_tasks(simple_after_task)
+ test_task_graph.add_tasks(inner_task_graph)
+ test_task_graph.add_dependency(inner_task_graph, simple_before_task)
+ test_task_graph.add_dependency(simple_after_task, inner_task_graph)
+
+ compiler = graph_compiler.GraphCompiler(workflow_context, base.StubTaskExecutor)
+ compiler.compile(test_task_graph)
+
+ execution_tasks = topological_sort(_graph(workflow_context.execution.tasks))
+
+ assert len(execution_tasks) == 9
+
+ expected_tasks_names = [
+ '{0}-Start'.format(test_task_graph.id),
+ simple_before_task.id,
+ '{0}-Start'.format(inner_task_graph.id),
+ inner_task_1.id,
+ inner_task_2.id,
+ inner_task_3.id,
+ '{0}-End'.format(inner_task_graph.id),
+ simple_after_task.id,
+ '{0}-End'.format(test_task_graph.id)
+ ]
+
+ assert expected_tasks_names == [compiler._model_to_api_id[t.id] for t in execution_tasks]
+ assert all(isinstance(task, models.Task) for task in execution_tasks)
+ execution_tasks = iter(execution_tasks)
+
+ _assert_tasks(
+ iter(execution_tasks),
+ iter([simple_after_task, inner_task_1, inner_task_2, inner_task_3, simple_after_task])
+ )
+ storage.release_sqlite_storage(workflow_context.model)
+
+
+def _assert_tasks(execution_tasks, api_tasks):
+ start_workflow_exec_task = next(execution_tasks)
+ assert start_workflow_exec_task._stub_type == models.Task.START_WORKFLOW
+
+ before_exec_task = next(execution_tasks)
+ simple_before_task = next(api_tasks)
+ _assert_execution_is_api_task(before_exec_task, simple_before_task)
+ assert before_exec_task.dependencies == [start_workflow_exec_task]
+
+ start_subworkflow_exec_task = next(execution_tasks)
+ assert start_subworkflow_exec_task._stub_type == models.Task.START_SUBWROFKLOW
+ assert start_subworkflow_exec_task.dependencies == [before_exec_task]
+
+ inner_exec_task_1 = next(execution_tasks)
+ inner_task_1 = next(api_tasks)
+ _assert_execution_is_api_task(inner_exec_task_1, inner_task_1)
+ assert inner_exec_task_1.dependencies == [start_subworkflow_exec_task]
+
+ inner_exec_task_2 = next(execution_tasks)
+ inner_task_2 = next(api_tasks)
+ _assert_execution_is_api_task(inner_exec_task_2, inner_task_2)
+ assert inner_exec_task_2.dependencies == [inner_exec_task_1]
+
+ inner_exec_task_3 = next(execution_tasks)
+ inner_task_3 = next(api_tasks)
+ _assert_execution_is_api_task(inner_exec_task_3, inner_task_3)
+ assert sorted(inner_exec_task_3.dependencies) == sorted([inner_exec_task_1, inner_exec_task_2])
+
+ end_subworkflow_exec_task = next(execution_tasks)
+ assert end_subworkflow_exec_task._stub_type == models.Task.END_SUBWORKFLOW
+ assert end_subworkflow_exec_task.dependencies == [inner_exec_task_3]
+
+ after_exec_task = next(execution_tasks)
+ simple_after_task = next(api_tasks)
+ _assert_execution_is_api_task(after_exec_task, simple_after_task)
+ assert after_exec_task.dependencies == [end_subworkflow_exec_task]
+
+ end_workflow_exec_task = next(execution_tasks)
+ assert end_workflow_exec_task._stub_type == models.Task.END_WORKFLOW
+ assert end_workflow_exec_task.dependencies == [after_exec_task]
+
+
+def _assert_execution_is_api_task(execution_task, api_task):
+ assert execution_task.name == api_task.name
+ assert execution_task.function == api_task.function
+ assert execution_task.actor == api_task.actor
+ assert execution_task.arguments == api_task.arguments
+
+
+def _get_task_by_name(task_name, graph):
+ return graph.node[task_name]['task']
+
+
+def _graph(tasks):
+ graph = DiGraph()
+ for task in tasks:
+ for dependency in task.dependencies:
+ graph.add_edge(dependency, task)
+
+ return graph
diff --git a/azure/aria/aria-extension-cloudify/src/aria/tests/orchestrator/workflows/executor/__init__.py b/azure/aria/aria-extension-cloudify/src/aria/tests/orchestrator/workflows/executor/__init__.py
new file mode 100644
index 0000000..99d0b39
--- /dev/null
+++ b/azure/aria/aria-extension-cloudify/src/aria/tests/orchestrator/workflows/executor/__init__.py
@@ -0,0 +1,98 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+import logging
+import uuid
+from contextlib import contextmanager
+
+import aria
+from aria.modeling import models
+from aria.orchestrator.context.common import BaseContext
+
+
+class MockContext(object):
+
+ INSTRUMENTATION_FIELDS = BaseContext.INSTRUMENTATION_FIELDS
+
+ def __init__(self, storage, task_kwargs=None):
+ self.logger = logging.getLogger('mock_logger')
+ self._task_kwargs = task_kwargs or {}
+ self._storage = storage
+ self.task = MockTask(storage, **task_kwargs)
+ self.states = []
+ self.exception = None
+
+ @property
+ def serialization_dict(self):
+ return {
+ 'context_cls': self.__class__,
+ 'context': {
+ 'storage_kwargs': self._storage.serialization_dict,
+ 'task_kwargs': self._task_kwargs
+ }
+ }
+
+ def __getattr__(self, item):
+ return None
+
+ def close(self):
+ pass
+
+ @property
+ def model(self):
+ return self._storage
+
+ @classmethod
+ def instantiate_from_dict(cls, storage_kwargs=None, task_kwargs=None):
+ return cls(storage=aria.application_model_storage(**(storage_kwargs or {})),
+ task_kwargs=(task_kwargs or {}))
+
+ @property
+ @contextmanager
+ def persist_changes(self):
+ yield
+
+
+class MockActor(object):
+ def __init__(self):
+ self.name = 'actor_name'
+
+
+class MockTask(object):
+
+ INFINITE_RETRIES = models.Task.INFINITE_RETRIES
+
+ def __init__(self, model, function, arguments=None, plugin_fk=None):
+ self.function = self.name = function
+ self.plugin_fk = plugin_fk
+ self.arguments = arguments or {}
+ self.states = []
+ self.exception = None
+ self.id = str(uuid.uuid4())
+ self.logger = logging.getLogger()
+ self.attempts_count = 1
+ self.max_attempts = 1
+ self.ignore_failure = False
+ self.interface_name = 'interface_name'
+ self.operation_name = 'operation_name'
+ self.actor = MockActor()
+ self.node = self.actor
+ self.model = model
+
+ for state in models.Task.STATES:
+ setattr(self, state.upper(), state)
+
+ @property
+ def plugin(self):
+ return self.model.plugin.get(self.plugin_fk) if self.plugin_fk else None
diff --git a/azure/aria/aria-extension-cloudify/src/aria/tests/orchestrator/workflows/executor/test_executor.py b/azure/aria/aria-extension-cloudify/src/aria/tests/orchestrator/workflows/executor/test_executor.py
new file mode 100644
index 0000000..32a68e0
--- /dev/null
+++ b/azure/aria/aria-extension-cloudify/src/aria/tests/orchestrator/workflows/executor/test_executor.py
@@ -0,0 +1,149 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+import pytest
+import retrying
+
+try:
+ import celery as _celery
+ app = _celery.Celery()
+ app.conf.update(CELERY_RESULT_BACKEND='amqp://')
+except ImportError:
+ _celery = None
+ app = None
+
+import aria
+from aria.modeling import models
+from aria.orchestrator import events
+from aria.orchestrator.workflows.executor import (
+ thread,
+ process,
+ # celery
+)
+
+import tests
+from . import MockContext
+
+
+def _get_function(func):
+ return '{module}.{func.__name__}'.format(module=__name__, func=func)
+
+
+def execute_and_assert(executor, storage=None):
+ expected_value = 'value'
+ successful_task = MockContext(
+ storage, task_kwargs=dict(function=_get_function(mock_successful_task))
+ )
+ failing_task = MockContext(
+ storage, task_kwargs=dict(function=_get_function(mock_failing_task))
+ )
+ task_with_inputs = MockContext(
+ storage,
+ task_kwargs=dict(function=_get_function(mock_task_with_input),
+ arguments={'input': models.Argument.wrap('input', 'value')})
+ )
+
+ for task in [successful_task, failing_task, task_with_inputs]:
+ executor.execute(task)
+
+ @retrying.retry(stop_max_delay=10000, wait_fixed=100)
+ def assertion():
+ assert successful_task.states == ['start', 'success']
+ assert failing_task.states == ['start', 'failure']
+ assert task_with_inputs.states == ['start', 'failure']
+ assert isinstance(failing_task.exception, MockException)
+ assert isinstance(task_with_inputs.exception, MockException)
+ assert task_with_inputs.exception.message == expected_value
+ assertion()
+
+
+def test_thread_execute(thread_executor):
+ execute_and_assert(thread_executor)
+
+
+def test_process_execute(process_executor, storage):
+ execute_and_assert(process_executor, storage)
+
+
+def mock_successful_task(**_):
+ pass
+
+
+def mock_failing_task(**_):
+ raise MockException
+
+
+def mock_task_with_input(input, **_):
+ raise MockException(input)
+
+if app:
+ mock_successful_task = app.task(mock_successful_task)
+ mock_failing_task = app.task(mock_failing_task)
+ mock_task_with_input = app.task(mock_task_with_input)
+
+
+class MockException(Exception):
+ pass
+
+
+@pytest.fixture
+def storage(tmpdir):
+ _storage = aria.application_model_storage(aria.storage.sql_mapi.SQLAlchemyModelAPI,
+ initiator_kwargs=dict(base_dir=str(tmpdir)))
+ yield _storage
+ tests.storage.release_sqlite_storage(_storage)
+
+
+@pytest.fixture(params=[
+ (thread.ThreadExecutor, {'pool_size': 1}),
+ (thread.ThreadExecutor, {'pool_size': 2}),
+ # subprocess needs to load a tests module so we explicitly add the root directory as if
+ # the project has been installed in editable mode
+ # (celery.CeleryExecutor, {'app': app})
+])
+def thread_executor(request):
+ executor_cls, executor_kwargs = request.param
+ result = executor_cls(**executor_kwargs)
+ yield result
+ result.close()
+
+
+@pytest.fixture
+def process_executor():
+ result = process.ProcessExecutor(python_path=tests.ROOT_DIR)
+ yield result
+ result.close()
+
+
+@pytest.fixture(autouse=True)
+def register_signals():
+ def start_handler(task, *args, **kwargs):
+ task.states.append('start')
+
+ def success_handler(task, *args, **kwargs):
+ task.states.append('success')
+
+ def failure_handler(task, exception, *args, **kwargs):
+ task.states.append('failure')
+ task.exception = exception
+
+ events.start_task_signal.connect(start_handler)
+ events.on_success_task_signal.connect(success_handler)
+ events.on_failure_task_signal.connect(failure_handler)
+ yield
+ events.start_task_signal.disconnect(start_handler)
+ events.on_success_task_signal.disconnect(success_handler)
+ events.on_failure_task_signal.disconnect(failure_handler)
diff --git a/azure/aria/aria-extension-cloudify/src/aria/tests/orchestrator/workflows/executor/test_process_executor.py b/azure/aria/aria-extension-cloudify/src/aria/tests/orchestrator/workflows/executor/test_process_executor.py
new file mode 100644
index 0000000..e050d18
--- /dev/null
+++ b/azure/aria/aria-extension-cloudify/src/aria/tests/orchestrator/workflows/executor/test_process_executor.py
@@ -0,0 +1,172 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import os
+import sys
+import time
+import Queue
+import subprocess
+
+import pytest
+import psutil
+import retrying
+
+import aria
+
+from aria import operation
+from aria.modeling import models
+from aria.orchestrator import events
+from aria.utils.plugin import create as create_plugin
+from aria.orchestrator.workflows.executor import process
+
+import tests.storage
+import tests.resources
+from tests.helpers import FilesystemDataHolder
+from tests.fixtures import ( # pylint: disable=unused-import
+ plugins_dir,
+ plugin_manager,
+)
+from . import MockContext
+
+
+class TestProcessExecutor(object):
+
+ def test_plugin_execution(self, executor, mock_plugin, model, queue):
+ ctx = MockContext(
+ model,
+ task_kwargs=dict(function='mock_plugin1.operation', plugin_fk=mock_plugin.id)
+ )
+
+ executor.execute(ctx)
+ error = queue.get(timeout=60)
+ # tests/resources/plugins/mock-plugin1 is the plugin installed
+ # during this tests setup. The module mock_plugin1 contains a single
+ # operation named "operation" which calls an entry point defined in the plugin's
+ # setup.py. This entry points simply prints 'mock-plugin-output' to stdout.
+ # The "operation" operation that called this subprocess, then raises a RuntimeError
+ # with that subprocess output as the error message.
+ # This is what we assert here. This tests checks that both the PYTHONPATH (operation)
+ # and PATH (entry point) are properly updated in the subprocess in which the task is
+ # running.
+ assert isinstance(error, RuntimeError)
+ assert error.message == 'mock-plugin-output'
+
+ def test_closed(self, executor, model):
+ executor.close()
+ with pytest.raises(RuntimeError) as exc_info:
+ executor.execute(MockContext(model, task_kwargs=dict(function='some.function')))
+ assert 'closed' in exc_info.value.message
+
+ def test_process_termination(self, executor, model, fs_test_holder, tmpdir):
+ freeze_script_path = str(tmpdir.join('freeze_script'))
+ with open(freeze_script_path, 'w+b') as f:
+ f.write(
+ '''import time
+while True:
+ time.sleep(5)
+ '''
+ )
+ holder_path_argument = models.Argument.wrap('holder_path', fs_test_holder._path)
+ script_path_argument = models.Argument.wrap('freezing_script_path',
+ str(tmpdir.join('freeze_script')))
+
+ model.argument.put(holder_path_argument)
+ model.argument.put(script_path_argument)
+ ctx = MockContext(
+ model,
+ task_kwargs=dict(
+ function='{0}.{1}'.format(__name__, freezing_task.__name__),
+ arguments=dict(holder_path=holder_path_argument,
+ freezing_script_path=script_path_argument)),
+ )
+
+ executor.execute(ctx)
+
+ @retrying.retry(retry_on_result=lambda r: r is False, stop_max_delay=60000, wait_fixed=500)
+ def wait_for_extra_process_id():
+ return fs_test_holder.get('subproc', False)
+
+ task_pid = executor._tasks[ctx.task.id].proc.pid
+ extra_process_pid = wait_for_extra_process_id()
+
+ assert set([task_pid, extra_process_pid]).issubset(set(psutil.pids()))
+ executor.terminate(ctx.task.id)
+
+ # Give a chance to the processes to terminate
+ time.sleep(2)
+
+ # all processes should be either zombies or non existent
+ pids = [task_pid, extra_process_pid]
+ for pid in pids:
+ if pid in psutil.pids():
+ assert psutil.Process(pid).status() == psutil.STATUS_ZOMBIE
+ else:
+ # making the test more readable
+ assert pid not in psutil.pids()
+
+
+@pytest.fixture
+def queue():
+ _queue = Queue.Queue()
+
+ def handler(_, exception=None, **kwargs):
+ _queue.put(exception)
+
+ events.on_success_task_signal.connect(handler)
+ events.on_failure_task_signal.connect(handler)
+ try:
+ yield _queue
+ finally:
+ events.on_success_task_signal.disconnect(handler)
+ events.on_failure_task_signal.disconnect(handler)
+
+
+@pytest.fixture
+def fs_test_holder(tmpdir):
+ dataholder_path = str(tmpdir.join('dataholder'))
+ holder = FilesystemDataHolder(dataholder_path)
+ return holder
+
+
+@pytest.fixture
+def executor(plugin_manager):
+ result = process.ProcessExecutor(plugin_manager=plugin_manager, python_path=[tests.ROOT_DIR])
+ try:
+ yield result
+ finally:
+ result.close()
+
+
+@pytest.fixture
+def mock_plugin(plugin_manager, tmpdir):
+ source = os.path.join(tests.resources.DIR, 'plugins', 'mock-plugin1')
+ plugin_path = create_plugin(source=source, destination_dir=str(tmpdir))
+ return plugin_manager.install(source=plugin_path)
+
+
+@pytest.fixture
+def model(tmpdir):
+ _storage = aria.application_model_storage(aria.storage.sql_mapi.SQLAlchemyModelAPI,
+ initiator_kwargs=dict(base_dir=str(tmpdir)))
+ yield _storage
+ tests.storage.release_sqlite_storage(_storage)
+
+
+@operation
+def freezing_task(holder_path, freezing_script_path, **_):
+ holder = FilesystemDataHolder(holder_path)
+ holder['subproc'] = subprocess.Popen([sys.executable, freezing_script_path], shell=True).pid
+ while True:
+ time.sleep(5)
diff --git a/azure/aria/aria-extension-cloudify/src/aria/tests/orchestrator/workflows/executor/test_process_executor_concurrent_modifications.py b/azure/aria/aria-extension-cloudify/src/aria/tests/orchestrator/workflows/executor/test_process_executor_concurrent_modifications.py
new file mode 100644
index 0000000..86a2edf
--- /dev/null
+++ b/azure/aria/aria-extension-cloudify/src/aria/tests/orchestrator/workflows/executor/test_process_executor_concurrent_modifications.py
@@ -0,0 +1,167 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+import time
+
+import fasteners
+import pytest
+
+from aria.orchestrator import events
+from aria.orchestrator.workflows.exceptions import ExecutorException
+from aria.orchestrator.workflows import api
+from aria.orchestrator.workflows.executor import process
+from aria.orchestrator import workflow, operation
+
+import tests
+from tests.orchestrator.context import execute as execute_workflow
+from tests.orchestrator.workflows.helpers import events_collector
+from tests import mock
+from tests import storage
+from tests import helpers
+
+
+@pytest.fixture
+def dataholder(tmpdir):
+ dataholder_path = str(tmpdir.join('dataholder'))
+ holder = helpers.FilesystemDataHolder(dataholder_path)
+ return holder
+
+
+def test_concurrent_modification_on_task_succeeded(context, executor, lock_files, dataholder):
+ _test(context, executor, lock_files, _test_task_succeeded, dataholder, expected_failure=False)
+
+
+@operation
+def _test_task_succeeded(ctx, lock_files, key, first_value, second_value, holder_path):
+ _concurrent_update(lock_files, ctx.node, key, first_value, second_value, holder_path)
+
+
+def test_concurrent_modification_on_task_failed(context, executor, lock_files, dataholder):
+ _test(context, executor, lock_files, _test_task_failed, dataholder, expected_failure=True)
+
+
+@operation
+def _test_task_failed(ctx, lock_files, key, first_value, second_value, holder_path):
+ first = _concurrent_update(lock_files, ctx.node, key, first_value, second_value, holder_path)
+ if not first:
+ raise RuntimeError('MESSAGE')
+
+
+def _test(context, executor, lock_files, func, dataholder, expected_failure):
+ def _node(ctx):
+ return ctx.model.node.get_by_name(mock.models.DEPENDENCY_NODE_NAME)
+
+ interface_name, operation_name = mock.operations.NODE_OPERATIONS_INSTALL[0]
+
+ key = 'key'
+ first_value = 'value1'
+ second_value = 'value2'
+ arguments = {
+ 'lock_files': lock_files,
+ 'key': key,
+ 'first_value': first_value,
+ 'second_value': second_value,
+ 'holder_path': dataholder.path
+ }
+
+ node = _node(context)
+ interface = mock.models.create_interface(
+ node.service,
+ interface_name,
+ operation_name,
+ operation_kwargs=dict(function='{0}.{1}'.format(__name__, func.__name__),
+ arguments=arguments)
+ )
+ node.interfaces[interface.name] = interface
+ context.model.node.update(node)
+
+ @workflow
+ def mock_workflow(graph, **_):
+ graph.add_tasks(
+ api.task.OperationTask(
+ node,
+ interface_name=interface_name,
+ operation_name=operation_name,
+ arguments=arguments),
+ api.task.OperationTask(
+ node,
+ interface_name=interface_name,
+ operation_name=operation_name,
+ arguments=arguments)
+ )
+
+ signal = events.on_failure_task_signal
+ with events_collector(signal) as collected:
+ try:
+ execute_workflow(mock_workflow, context, executor)
+ except ExecutorException:
+ pass
+
+ props = _node(context).attributes
+ assert dataholder['invocations'] == 2
+ assert props[key].value == dataholder[key]
+
+ exceptions = [event['kwargs']['exception'] for event in collected.get(signal, [])]
+ if expected_failure:
+ assert exceptions
+
+
+@pytest.fixture
+def executor():
+ result = process.ProcessExecutor(python_path=[tests.ROOT_DIR])
+ try:
+ yield result
+ finally:
+ result.close()
+
+
+@pytest.fixture
+def context(tmpdir):
+ result = mock.context.simple(str(tmpdir))
+ yield result
+ storage.release_sqlite_storage(result.model)
+
+
+@pytest.fixture
+def lock_files(tmpdir):
+ return str(tmpdir.join('first_lock_file')), str(tmpdir.join('second_lock_file'))
+
+
+def _concurrent_update(lock_files, node, key, first_value, second_value, holder_path):
+ holder = helpers.FilesystemDataHolder(holder_path)
+ locker1 = fasteners.InterProcessLock(lock_files[0])
+ locker2 = fasteners.InterProcessLock(lock_files[1])
+
+ first = locker1.acquire(blocking=False)
+
+ if first:
+ # Give chance for both processes to acquire locks
+ while locker2.acquire(blocking=False):
+ locker2.release()
+ time.sleep(0.1)
+ else:
+ locker2.acquire()
+
+ node.attributes[key] = first_value if first else second_value
+ holder['key'] = first_value if first else second_value
+ holder.setdefault('invocations', 0)
+ holder['invocations'] += 1
+
+ if first:
+ locker1.release()
+ else:
+ with locker1:
+ locker2.release()
+
+ return first
diff --git a/azure/aria/aria-extension-cloudify/src/aria/tests/orchestrator/workflows/executor/test_process_executor_extension.py b/azure/aria/aria-extension-cloudify/src/aria/tests/orchestrator/workflows/executor/test_process_executor_extension.py
new file mode 100644
index 0000000..b26fa43
--- /dev/null
+++ b/azure/aria/aria-extension-cloudify/src/aria/tests/orchestrator/workflows/executor/test_process_executor_extension.py
@@ -0,0 +1,99 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import pytest
+
+from aria import extension
+from aria.orchestrator.workflows import api
+from aria.orchestrator.workflows.core import engine, graph_compiler
+from aria.orchestrator.workflows.executor import process
+from aria.orchestrator import workflow, operation
+
+import tests
+from tests import mock
+from tests import storage
+
+
+def test_decorate_extension(context, executor):
+ arguments = {'arg1': 1, 'arg2': 2}
+
+ def get_node(ctx):
+ return ctx.model.node.get_by_name(mock.models.DEPENDENCY_NODE_NAME)
+
+ node = get_node(context)
+ interface_name = 'test_interface'
+ operation_name = 'operation'
+ interface = mock.models.create_interface(
+ context.service,
+ interface_name,
+ operation_name,
+ operation_kwargs=dict(function='{0}.{1}'.format(__name__, _mock_operation.__name__),
+ arguments=arguments)
+ )
+ node.interfaces[interface.name] = interface
+ context.model.node.update(node)
+
+
+ @workflow
+ def mock_workflow(ctx, graph):
+ node = get_node(ctx)
+ task = api.task.OperationTask(
+ node,
+ interface_name=interface_name,
+ operation_name=operation_name,
+ arguments=arguments)
+ graph.add_tasks(task)
+ return graph
+ graph = mock_workflow(ctx=context) # pylint: disable=no-value-for-parameter
+ graph_compiler.GraphCompiler(context, executor.__class__).compile(graph)
+ eng = engine.Engine({executor.__class__: executor})
+ eng.execute(context)
+ out = get_node(context).attributes.get('out').value
+ assert out['wrapper_arguments'] == arguments
+ assert out['function_arguments'] == arguments
+
+
+@extension.process_executor
+class MockProcessExecutorExtension(object):
+
+ def decorate(self):
+ def decorator(function):
+ def wrapper(ctx, **operation_arguments):
+ with ctx.model.instrument(ctx.model.node.model_cls.attributes):
+ ctx.node.attributes['out'] = {'wrapper_arguments': operation_arguments}
+ function(ctx=ctx, **operation_arguments)
+ return wrapper
+ return decorator
+
+
+@operation
+def _mock_operation(ctx, **operation_arguments):
+ ctx.node.attributes['out']['function_arguments'] = operation_arguments
+
+
+@pytest.fixture
+def executor():
+ result = process.ProcessExecutor(python_path=[tests.ROOT_DIR])
+ try:
+ yield result
+ finally:
+ result.close()
+
+
+@pytest.fixture
+def context(tmpdir):
+ result = mock.context.simple(str(tmpdir))
+ yield result
+ storage.release_sqlite_storage(result.model)
diff --git a/azure/aria/aria-extension-cloudify/src/aria/tests/orchestrator/workflows/executor/test_process_executor_tracked_changes.py b/azure/aria/aria-extension-cloudify/src/aria/tests/orchestrator/workflows/executor/test_process_executor_tracked_changes.py
new file mode 100644
index 0000000..47ee2f7
--- /dev/null
+++ b/azure/aria/aria-extension-cloudify/src/aria/tests/orchestrator/workflows/executor/test_process_executor_tracked_changes.py
@@ -0,0 +1,168 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import copy
+
+import pytest
+
+from aria.orchestrator.workflows import api
+from aria.orchestrator.workflows.core import engine, graph_compiler
+from aria.orchestrator.workflows.executor import process
+from aria.orchestrator import workflow, operation
+from aria.orchestrator.workflows import exceptions
+
+import tests
+from tests import mock
+from tests import storage
+
+
+_TEST_ATTRIBUTES = {
+ 'some': 'values', 'that': 'are', 'most': 'likely', 'only': 'set', 'here': 'yo'
+}
+
+
+def test_track_changes_of_successful_operation(context, executor):
+ _run_workflow(context=context, executor=executor, op_func=_mock_success_operation)
+ _assert_tracked_changes_are_applied(context)
+
+
+def test_track_changes_of_failed_operation(context, executor):
+ with pytest.raises(exceptions.ExecutorException):
+ _run_workflow(context=context, executor=executor, op_func=_mock_fail_operation)
+ _assert_tracked_changes_are_applied(context)
+
+
+def _assert_tracked_changes_are_applied(context):
+ instance = context.model.node.get_by_name(mock.models.DEPENDENCY_NODE_NAME)
+ assert all(instance.attributes[key].value == value
+ for key, value in _TEST_ATTRIBUTES.items())
+
+
+def _update_attributes(context):
+ context.node.attributes.clear()
+ context.node.attributes.update(_TEST_ATTRIBUTES)
+
+
+def test_refresh_state_of_tracked_attributes(context, executor):
+ out = _run_workflow(context=context, executor=executor, op_func=_mock_refreshing_operation)
+ assert out['after_refresh'] == out['after_change']
+ assert out['initial'] != out['after_change']
+
+
+def test_apply_tracked_changes_during_an_operation(context, executor):
+ arguments = {
+ 'committed': {'some': 'new', 'properties': 'right here'},
+ 'changed_but_refreshed': {'some': 'newer', 'properties': 'right there'}
+ }
+
+ expected_initial = context.model.node.get_by_name(mock.models.DEPENDENCY_NODE_NAME).attributes
+ out = _run_workflow(
+ context=context, executor=executor, op_func=_mock_updating_operation, arguments=arguments)
+
+ expected_after_update = expected_initial.copy()
+ expected_after_update.update(arguments['committed']) # pylint: disable=no-member
+ expected_after_change = expected_after_update.copy()
+ expected_after_change.update(arguments['changed_but_refreshed']) # pylint: disable=no-member
+
+ assert out['initial'] == expected_initial
+ assert out['after_update'] == expected_after_update
+ assert out['after_change'] == expected_after_change
+ assert out['after_refresh'] == expected_after_change
+
+
+def _run_workflow(context, executor, op_func, arguments=None):
+ node = context.model.node.get_by_name(mock.models.DEPENDENCY_NODE_NAME)
+ interface_name = 'test_interface'
+ operation_name = 'operation'
+ wf_arguments = arguments or {}
+ interface = mock.models.create_interface(
+ context.service,
+ interface_name,
+ operation_name,
+ operation_kwargs=dict(function=_operation_mapping(op_func),
+ arguments=wf_arguments)
+ )
+ node.interfaces[interface.name] = interface
+ context.model.node.update(node)
+
+ @workflow
+ def mock_workflow(ctx, graph):
+ task = api.task.OperationTask(
+ node,
+ interface_name=interface_name,
+ operation_name=operation_name,
+ arguments=wf_arguments)
+ graph.add_tasks(task)
+ return graph
+ graph = mock_workflow(ctx=context) # pylint: disable=no-value-for-parameter
+ graph_compiler.GraphCompiler(context, executor.__class__).compile(graph)
+ eng = engine.Engine({executor.__class__: executor})
+ eng.execute(context)
+ out = context.model.node.get_by_name(mock.models.DEPENDENCY_NODE_NAME).attributes.get('out')
+ return out.value if out else None
+
+
+@operation
+def _mock_success_operation(ctx):
+ _update_attributes(ctx)
+
+
+@operation
+def _mock_fail_operation(ctx):
+ _update_attributes(ctx)
+ raise RuntimeError
+
+
+@operation
+def _mock_refreshing_operation(ctx):
+ out = {'initial': copy.deepcopy(ctx.node.attributes)}
+ ctx.node.attributes.update({'some': 'new', 'properties': 'right here'})
+ out['after_change'] = copy.deepcopy(ctx.node.attributes)
+ ctx.model.node.refresh(ctx.node)
+ out['after_refresh'] = copy.deepcopy(ctx.node.attributes)
+ ctx.node.attributes['out'] = out
+
+
+@operation
+def _mock_updating_operation(ctx, committed, changed_but_refreshed):
+ out = {'initial': copy.deepcopy(ctx.node.attributes)}
+ ctx.node.attributes.update(committed)
+ ctx.model.node.update(ctx.node)
+ out['after_update'] = copy.deepcopy(ctx.node.attributes)
+ ctx.node.attributes.update(changed_but_refreshed)
+ out['after_change'] = copy.deepcopy(ctx.node.attributes)
+ ctx.model.node.refresh(ctx.node)
+ out['after_refresh'] = copy.deepcopy(ctx.node.attributes)
+ ctx.node.attributes['out'] = out
+
+
+def _operation_mapping(func):
+ return '{name}.{func.__name__}'.format(name=__name__, func=func)
+
+
+@pytest.fixture
+def executor():
+ result = process.ProcessExecutor(python_path=[tests.ROOT_DIR])
+ try:
+ yield result
+ finally:
+ result.close()
+
+
+@pytest.fixture
+def context(tmpdir):
+ result = mock.context.simple(str(tmpdir))
+ yield result
+ storage.release_sqlite_storage(result.model)
diff --git a/azure/aria/aria-extension-cloudify/src/aria/tests/orchestrator/workflows/helpers.py b/azure/aria/aria-extension-cloudify/src/aria/tests/orchestrator/workflows/helpers.py
new file mode 100644
index 0000000..8e3f9b1
--- /dev/null
+++ b/azure/aria/aria-extension-cloudify/src/aria/tests/orchestrator/workflows/helpers.py
@@ -0,0 +1,37 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from contextlib import contextmanager
+
+
+@contextmanager
+def events_collector(*signals):
+ handlers = {}
+ collected = {}
+
+ def handler_factory(key):
+ def handler(*args, **kwargs):
+ signal_events = collected.setdefault(key, [])
+ signal_events.append({'args': args, 'kwargs': kwargs})
+ handlers[signal] = handler
+ return handler
+
+ for signal in signals:
+ signal.connect(handler_factory(signal))
+ try:
+ yield collected
+ finally:
+ for signal in signals:
+ signal.disconnect(handlers[signal])