diff options
Diffstat (limited to 'azure/aria/aria-extension-cloudify/src/aria/tests/orchestrator/workflows')
21 files changed, 3289 insertions, 0 deletions
diff --git a/azure/aria/aria-extension-cloudify/src/aria/tests/orchestrator/workflows/__init__.py b/azure/aria/aria-extension-cloudify/src/aria/tests/orchestrator/workflows/__init__.py new file mode 100644 index 0000000..7f0fd56 --- /dev/null +++ b/azure/aria/aria-extension-cloudify/src/aria/tests/orchestrator/workflows/__init__.py @@ -0,0 +1,16 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from . import api, core diff --git a/azure/aria/aria-extension-cloudify/src/aria/tests/orchestrator/workflows/api/__init__.py b/azure/aria/aria-extension-cloudify/src/aria/tests/orchestrator/workflows/api/__init__.py new file mode 100644 index 0000000..ae1e83e --- /dev/null +++ b/azure/aria/aria-extension-cloudify/src/aria/tests/orchestrator/workflows/api/__init__.py @@ -0,0 +1,14 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. diff --git a/azure/aria/aria-extension-cloudify/src/aria/tests/orchestrator/workflows/api/test_task.py b/azure/aria/aria-extension-cloudify/src/aria/tests/orchestrator/workflows/api/test_task.py new file mode 100644 index 0000000..9d91b6b --- /dev/null +++ b/azure/aria/aria-extension-cloudify/src/aria/tests/orchestrator/workflows/api/test_task.py @@ -0,0 +1,223 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +import pytest + +from aria.orchestrator import context +from aria.orchestrator.workflows import api + +from tests import mock, storage + + +@pytest.fixture +def ctx(tmpdir): + """ + Create the following graph in storage: + dependency_node <------ dependent_node + :return: + """ + simple_context = mock.context.simple(str(tmpdir), inmemory=False) + simple_context.model.execution.put(mock.models.create_execution(simple_context.service)) + yield simple_context + storage.release_sqlite_storage(simple_context.model) + + +class TestOperationTask(object): + + def test_node_operation_task_creation(self, ctx): + interface_name = 'test_interface' + operation_name = 'create' + + plugin = mock.models.create_plugin('test_plugin', '0.1') + ctx.model.node.update(plugin) + + arguments = {'test_input': True} + + interface = mock.models.create_interface( + ctx.service, + interface_name, + operation_name, + operation_kwargs=dict(plugin=plugin, + function='op_path', + arguments=arguments),) + + node = ctx.model.node.get_by_name(mock.models.DEPENDENT_NODE_NAME) + node.interfaces[interface_name] = interface + ctx.model.node.update(node) + max_attempts = 10 + retry_interval = 10 + ignore_failure = True + + with context.workflow.current.push(ctx): + api_task = api.task.OperationTask( + node, + interface_name=interface_name, + operation_name=operation_name, + arguments=arguments, + max_attempts=max_attempts, + retry_interval=retry_interval, + ignore_failure=ignore_failure) + + assert api_task.name == api.task.OperationTask.NAME_FORMAT.format( + type='node', + name=node.name, + interface=interface_name, + operation=operation_name + ) + assert api_task.function == 'op_path' + assert api_task.actor == node + assert api_task.arguments['test_input'].value is True + assert api_task.retry_interval == retry_interval + assert api_task.max_attempts == max_attempts + assert api_task.ignore_failure == ignore_failure + assert api_task.plugin.name == 'test_plugin' + + def test_source_relationship_operation_task_creation(self, ctx): + interface_name = 'test_interface' + operation_name = 'preconfigure' + + plugin = mock.models.create_plugin('test_plugin', '0.1') + ctx.model.plugin.update(plugin) + + arguments = {'test_input': True} + + interface = mock.models.create_interface( + ctx.service, + interface_name, + operation_name, + operation_kwargs=dict(plugin=plugin, + function='op_path', + arguments=arguments) + ) + + relationship = ctx.model.relationship.list()[0] + relationship.interfaces[interface.name] = interface + max_attempts = 10 + retry_interval = 10 + + with context.workflow.current.push(ctx): + api_task = api.task.OperationTask( + relationship, + interface_name=interface_name, + operation_name=operation_name, + arguments=arguments, + max_attempts=max_attempts, + retry_interval=retry_interval) + + assert api_task.name == api.task.OperationTask.NAME_FORMAT.format( + type='relationship', + name=relationship.name, + interface=interface_name, + operation=operation_name + ) + assert api_task.function == 'op_path' + assert api_task.actor == relationship + assert api_task.arguments['test_input'].value is True + assert api_task.retry_interval == retry_interval + assert api_task.max_attempts == max_attempts + assert api_task.plugin.name == 'test_plugin' + + def test_target_relationship_operation_task_creation(self, ctx): + interface_name = 'test_interface' + operation_name = 'preconfigure' + + plugin = mock.models.create_plugin('test_plugin', '0.1') + ctx.model.node.update(plugin) + + arguments = {'test_input': True} + + interface = mock.models.create_interface( + ctx.service, + interface_name, + operation_name, + operation_kwargs=dict(plugin=plugin, + function='op_path', + arguments=arguments) + ) + + relationship = ctx.model.relationship.list()[0] + relationship.interfaces[interface.name] = interface + max_attempts = 10 + retry_interval = 10 + + with context.workflow.current.push(ctx): + api_task = api.task.OperationTask( + relationship, + interface_name=interface_name, + operation_name=operation_name, + arguments=arguments, + max_attempts=max_attempts, + retry_interval=retry_interval) + + assert api_task.name == api.task.OperationTask.NAME_FORMAT.format( + type='relationship', + name=relationship.name, + interface=interface_name, + operation=operation_name + ) + assert api_task.function == 'op_path' + assert api_task.actor == relationship + assert api_task.arguments['test_input'].value is True + assert api_task.retry_interval == retry_interval + assert api_task.max_attempts == max_attempts + assert api_task.plugin.name == 'test_plugin' + + def test_operation_task_default_values(self, ctx): + interface_name = 'test_interface' + operation_name = 'create' + + plugin = mock.models.create_plugin('package', '0.1') + ctx.model.node.update(plugin) + + dependency_node = ctx.model.node.get_by_name(mock.models.DEPENDENCY_NODE_NAME) + + interface = mock.models.create_interface( + ctx.service, + interface_name, + operation_name, + operation_kwargs=dict(plugin=plugin, + function='op_path')) + dependency_node.interfaces[interface_name] = interface + + with context.workflow.current.push(ctx): + task = api.task.OperationTask( + dependency_node, + interface_name=interface_name, + operation_name=operation_name) + + assert task.arguments == {} + assert task.retry_interval == ctx._task_retry_interval + assert task.max_attempts == ctx._task_max_attempts + assert task.ignore_failure == ctx._task_ignore_failure + assert task.plugin is plugin + + +class TestWorkflowTask(object): + + def test_workflow_task_creation(self, ctx): + + workspace = {} + + mock_class = type('mock_class', (object,), {'test_attribute': True}) + + def sub_workflow(**kwargs): + workspace.update(kwargs) + return mock_class + + with context.workflow.current.push(ctx): + workflow_task = api.task.WorkflowTask(sub_workflow, kwarg='workflow_kwarg') + assert workflow_task.graph is mock_class + assert workflow_task.test_attribute is True diff --git a/azure/aria/aria-extension-cloudify/src/aria/tests/orchestrator/workflows/api/test_task_graph.py b/azure/aria/aria-extension-cloudify/src/aria/tests/orchestrator/workflows/api/test_task_graph.py new file mode 100644 index 0000000..a569386 --- /dev/null +++ b/azure/aria/aria-extension-cloudify/src/aria/tests/orchestrator/workflows/api/test_task_graph.py @@ -0,0 +1,745 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import pytest + +from aria.orchestrator.workflows.api import task_graph, task + + +class MockTask(task.BaseTask): + def __init__(self): + super(MockTask, self).__init__(ctx={}) + + +@pytest.fixture +def graph(): + return task_graph.TaskGraph(name='mock-graph') + + +class TestTaskGraphTasks(object): + + def test_add_task(self, graph): + task = MockTask() + add_result = graph.add_tasks(task) + assert add_result == [task] + tasks = [t for t in graph.tasks] + assert len(tasks) == 1 + assert tasks[0] == task + + def test_add_empty_group(self, graph): + result = graph.add_tasks([]) + assert result == [] + + def test_add_group(self, graph): + tasks = [MockTask(), MockTask(), MockTask()] + added_tasks = graph.add_tasks(*tasks) + assert added_tasks == tasks + + def test_add_partially_existing_group(self, graph): + task = MockTask() + graph.add_tasks(task) + tasks = [MockTask(), task, MockTask()] + added_tasks = graph.add_tasks(*tasks) + assert added_tasks == [tasks[0], tasks[2]] + + def test_add_recursively_group(self, graph): + recursive_group = [MockTask(), MockTask()] + tasks = [MockTask(), recursive_group, MockTask()] + added_tasks = graph.add_tasks(tasks) + assert added_tasks == [tasks[0], recursive_group[0], recursive_group[1], tasks[2]] + + def test_add_existing_task(self, graph): + task = MockTask() + graph.add_tasks(task) + # adding a task already in graph - should have no effect, and return False + add_result = graph.add_tasks(task) + assert add_result == [] + tasks = [t for t in graph.tasks] + assert len(tasks) == 1 + assert tasks[0] == task + + def test_remove_task(self, graph): + task = MockTask() + other_task = MockTask() + graph.add_tasks(task) + graph.add_tasks(other_task) + graph.remove_tasks(other_task) + tasks = [t for t in graph.tasks] + assert len(tasks) == 1 + assert tasks[0] == task + + def test_remove_tasks_with_dependency(self, graph): + task = MockTask() + dependent_task = MockTask() + graph.add_tasks(task) + graph.add_tasks(dependent_task) + graph.add_dependency(dependent_task, task) + remove_result = graph.remove_tasks(dependent_task) + assert remove_result == [dependent_task] + tasks = [t for t in graph.tasks] + assert len(tasks) == 1 + assert tasks[0] == task + # asserting no dependencies are left for the dependent task + assert len(list(graph.get_dependencies(task))) == 0 + + def test_remove_empty_group(self, graph): + result = graph.remove_tasks([]) + assert result == [] + + def test_remove_group(self, graph): + tasks = [MockTask(), MockTask(), MockTask()] + graph.add_tasks(*tasks) + removed_tasks = graph.remove_tasks(*tasks) + assert removed_tasks == tasks + + def test_remove_partially_existing_group(self, graph): + task = MockTask() + graph.add_tasks(task) + tasks = [MockTask(), task, MockTask()] + removed_tasks = graph.remove_tasks(*tasks) + assert removed_tasks == [task] + + def test_remove_recursively_group(self, graph): + recursive_group = [MockTask(), MockTask()] + tasks = [MockTask(), recursive_group, MockTask()] + graph.add_tasks(tasks) + removed_tasks = graph.remove_tasks(tasks) + assert removed_tasks == [tasks[0], recursive_group[0], recursive_group[1], tasks[2]] + + def test_remove_nonexistent_task(self, graph): + task = MockTask() + task_not_in_graph = MockTask() + graph.add_tasks(task) + # removing a task not in graph - should have no effect, and return False + remove_result = graph.remove_tasks(task_not_in_graph) + assert remove_result == [] + tasks = [t for t in graph.tasks] + assert len(tasks) == 1 + assert tasks[0] == task + + def test_has_task(self, graph): + task = MockTask() + graph.add_tasks(task) + assert graph.has_tasks(task) is True + + def test_has_nonexistent_task(self, graph): + task = MockTask() + task_not_in_graph = MockTask() + graph.add_tasks(task) + assert graph.has_tasks(task_not_in_graph) is False + + def test_has_empty_group(self, graph): + # the "empty task" is in the graph + assert graph.has_tasks([]) is True + + def test_has_group(self, graph): + tasks = [MockTask(), MockTask(), MockTask()] + graph.add_tasks(*tasks) + assert graph.has_tasks(*tasks) is True + + def test_has_partially_existing_group(self, graph): + task = MockTask() + graph.add_tasks(task) + tasks = [MockTask(), task, MockTask()] + assert graph.has_tasks(tasks) is False + + def test_has_recursively_group(self, graph): + recursive_group = [MockTask(), MockTask()] + tasks = [MockTask(), recursive_group, MockTask()] + graph.add_tasks(tasks) + assert graph.has_tasks(tasks) is True + + def test_get_task(self, graph): + task = MockTask() + graph.add_tasks(task) + assert graph.get_task(task.id) == task + + def test_get_nonexistent_task(self, graph): + task = MockTask() + task_not_in_graph = MockTask() + graph.add_tasks(task) + with pytest.raises(task_graph.TaskNotInGraphError): + graph.get_task(task_not_in_graph.id) + + +class TestTaskGraphGraphTraversal(object): + + def test_tasks_iteration(self, graph): + task = MockTask() + other_task = MockTask() + graph.add_tasks(task) + graph.add_tasks(other_task) + tasks = [t for t in graph.tasks] + assert set(tasks) == set([task, other_task]) + + def test_get_dependents(self, graph): + task = MockTask() + dependent_task_1 = MockTask() + dependent_task_2 = MockTask() + transitively_dependent_task = MockTask() + + graph.add_tasks(task) + graph.add_tasks(dependent_task_1) + graph.add_tasks(dependent_task_2) + graph.add_tasks(transitively_dependent_task) + + graph.add_dependency(dependent_task_1, task) + graph.add_dependency(dependent_task_2, task) + graph.add_dependency(transitively_dependent_task, dependent_task_2) + + dependent_tasks = list(graph.get_dependents(task)) + # transitively_dependent_task not expected to appear in the result + assert set(dependent_tasks) == set([dependent_task_1, dependent_task_2]) + + def test_get_task_empty_dependents(self, graph): + task = MockTask() + other_task = MockTask() + graph.add_tasks(task) + graph.add_tasks(other_task) + dependent_tasks = list(graph.get_dependents(task)) + assert len(dependent_tasks) == 0 + + def test_get_nonexistent_task_dependents(self, graph): + task = MockTask() + task_not_in_graph = MockTask() + graph.add_tasks(task) + with pytest.raises(task_graph.TaskNotInGraphError): + list(graph.get_dependents(task_not_in_graph)) + + def test_get_dependencies(self, graph): + task = MockTask() + dependency_task_1 = MockTask() + dependency_task_2 = MockTask() + transitively_dependency_task = MockTask() + + graph.add_tasks(task) + graph.add_tasks(dependency_task_1) + graph.add_tasks(dependency_task_2) + graph.add_tasks(transitively_dependency_task) + + graph.add_dependency(task, dependency_task_1) + graph.add_dependency(task, dependency_task_2) + graph.add_dependency(dependency_task_2, transitively_dependency_task) + + dependency_tasks = list(graph.get_dependencies(task)) + # transitively_dependency_task not expected to appear in the result + assert set(dependency_tasks) == set([dependency_task_1, dependency_task_2]) + + def test_get_task_empty_dependencies(self, graph): + task = MockTask() + other_task = MockTask() + graph.add_tasks(task) + graph.add_tasks(other_task) + dependency_tasks = list(graph.get_dependencies(task)) + assert len(dependency_tasks) == 0 + + def test_get_nonexistent_task_dependencies(self, graph): + task = MockTask() + task_not_in_graph = MockTask() + graph.add_tasks(task) + with pytest.raises(task_graph.TaskNotInGraphError): + list(graph.get_dependencies(task_not_in_graph)) + + +class TestTaskGraphDependencies(object): + + def test_add_dependency(self, graph): + task = MockTask() + dependency_task = MockTask() + unrelated_task = MockTask() + graph.add_tasks(task) + graph.add_tasks(dependency_task) + graph.add_tasks(unrelated_task) + graph.add_dependency(task, dependency_task) + add_result = graph.has_dependency(task, dependency_task) + assert add_result is True + dependency_tasks = list(graph.get_dependencies(task)) + assert len(dependency_tasks) == 1 + assert dependency_tasks[0] == dependency_task + + def test_add_existing_dependency(self, graph): + task = MockTask() + dependency_task = MockTask() + graph.add_tasks(task) + graph.add_tasks(dependency_task) + graph.add_dependency(task, dependency_task) + add_result = graph.has_dependency(task, dependency_task) + # adding a dependency already in graph - should have no effect, and return False + assert add_result is True + graph.add_dependency(task, dependency_task) + add_result = graph.has_dependency(task, dependency_task) + assert add_result is True + dependency_tasks = list(graph.get_dependencies(task)) + assert len(dependency_tasks) == 1 + assert dependency_tasks[0] == dependency_task + + def test_add_dependency_nonexistent_dependent(self, graph): + task = MockTask() + task_not_in_graph = MockTask() + graph.add_tasks(task) + with pytest.raises(task_graph.TaskNotInGraphError): + graph.add_dependency(task_not_in_graph, task) + + def test_add_dependency_nonexistent_dependency(self, graph): + task = MockTask() + task_not_in_graph = MockTask() + graph.add_tasks(task) + with pytest.raises(task_graph.TaskNotInGraphError): + graph.add_dependency(task, task_not_in_graph) + + def test_add_dependency_empty_dependent(self, graph): + task = MockTask() + graph.add_tasks(task) + # expecting add_dependency result to be False - no dependency has been created + assert set(graph.tasks) == set((task,)) + + def test_add_dependency_empty_dependency(self, graph): + task = MockTask() + graph.add_tasks(task) + # expecting add_dependency result to be False - no dependency has been created + assert set(graph.tasks) == set((task,)) + + def test_add_dependency_dependent_group(self, graph): + task = MockTask() + group_tasks = [MockTask() for _ in xrange(3)] + graph.add_tasks(task) + graph.add_tasks(*group_tasks) + graph.add_dependency(group_tasks, task) + assert graph.has_dependency(group_tasks[0], task) is True + assert graph.has_dependency(group_tasks[1], task) is True + assert graph.has_dependency(group_tasks[2], task) is True + + def test_add_dependency_dependency_group(self, graph): + task = MockTask() + group_tasks = [MockTask() for _ in xrange(3)] + graph.add_tasks(task) + graph.add_tasks(*group_tasks) + graph.add_dependency(task, group_tasks) + assert graph.has_dependency(task, group_tasks[0]) is True + assert graph.has_dependency(task, group_tasks[1]) is True + assert graph.has_dependency(task, group_tasks[2]) is True + + def test_add_dependency_between_groups(self, graph): + group_1_tasks = [MockTask() for _ in xrange(3)] + group_2_tasks = [MockTask() for _ in xrange(3)] + graph.add_tasks(*group_1_tasks) + graph.add_tasks(*group_2_tasks) + graph.add_dependency(group_1_tasks, group_2_tasks) + for group_2_task in group_2_tasks: + assert graph.has_dependency(group_1_tasks[0], group_2_task) is True + assert graph.has_dependency(group_1_tasks[1], group_2_task) is True + assert graph.has_dependency(group_1_tasks[2], group_2_task) is True + + def test_add_dependency_dependency_group_with_some_existing_dependencies(self, graph): + task = MockTask() + group_tasks = [MockTask() for _ in xrange(3)] + graph.add_tasks(task) + graph.add_tasks(*group_tasks) + # adding a dependency on a specific task manually, + # before adding a dependency on the whole parallel + graph.add_dependency(task, group_tasks[1]) + graph.add_dependency(task, group_tasks) + assert graph.has_dependency(task, group_tasks[0]) is True + assert graph.has_dependency(task, group_tasks[1]) is True + assert graph.has_dependency(task, group_tasks[2]) is True + + def test_add_existing_dependency_between_groups(self, graph): + group_1_tasks = [MockTask() for _ in xrange(3)] + group_2_tasks = [MockTask() for _ in xrange(3)] + graph.add_tasks(*group_1_tasks) + graph.add_tasks(*group_2_tasks) + graph.add_dependency(group_1_tasks, group_2_tasks) + add_result = graph.has_dependency(group_1_tasks, group_2_tasks) + assert add_result is True + # adding a dependency already in graph - should have no effect, and return False + graph.add_dependency(group_1_tasks, group_2_tasks) + add_result = graph.has_dependency(group_1_tasks, group_2_tasks) + assert add_result is True + for group_2_task in group_2_tasks: + assert graph.has_dependency(group_1_tasks[0], group_2_task) is True + assert graph.has_dependency(group_1_tasks[1], group_2_task) is True + assert graph.has_dependency(group_1_tasks[2], group_2_task) is True + + def test_has_dependency(self, graph): + task = MockTask() + dependency_task = MockTask() + graph.add_tasks(task) + graph.add_tasks(dependency_task) + graph.add_dependency(task, dependency_task) + assert graph.has_dependency(task, dependency_task) is True + + def test_has_nonexistent_dependency(self, graph): + task = MockTask() + other_task = MockTask() + graph.add_tasks(task) + graph.add_tasks(other_task) + assert graph.has_dependency(task, other_task) is False + + def test_has_dependency_nonexistent_dependent(self, graph): + task = MockTask() + task_not_in_graph = MockTask() + graph.add_tasks(task) + with pytest.raises(task_graph.TaskNotInGraphError): + graph.has_dependency(task_not_in_graph, task) + + def test_has_dependency_nonexistent_dependency(self, graph): + task = MockTask() + task_not_in_graph = MockTask() + graph.add_tasks(task) + with pytest.raises(task_graph.TaskNotInGraphError): + graph.has_dependency(task, task_not_in_graph) + + def test_has_dependency_empty_dependent(self, graph): + task = MockTask() + graph.add_tasks(task) + # expecting has_dependency result to be False - dependency in an empty form + assert graph.has_dependency([], task) is False + + def test_has_dependency_empty_dependency(self, graph): + task = MockTask() + graph.add_tasks(task) + # expecting has_dependency result to be True - dependency in an empty form + assert graph.has_dependency(task, []) is False + + def test_has_dependency_dependent_group(self, graph): + task = MockTask() + group_tasks = [MockTask() for _ in xrange(3)] + graph.add_tasks(task) + graph.add_tasks(*group_tasks) + assert graph.has_dependency(group_tasks, task) is False + graph.add_dependency(group_tasks, task) + assert graph.has_dependency(group_tasks, task) is True + + def test_has_dependency_dependency_parallel(self, graph): + task = MockTask() + group_tasks = [MockTask() for _ in xrange(3)] + graph.add_tasks(task) + graph.add_tasks(*group_tasks) + assert graph.has_dependency(task, group_tasks) is False + graph.add_dependency(task, group_tasks) + assert graph.has_dependency(task, group_tasks) is True + + def test_has_dependency_between_groups(self, graph): + group_1_tasks = [MockTask() for _ in xrange(3)] + group_2_tasks = [MockTask() for _ in xrange(3)] + graph.add_tasks(*group_1_tasks) + graph.add_tasks(*group_2_tasks) + assert graph.has_dependency(group_2_tasks, group_1_tasks) is False + graph.add_dependency(group_2_tasks, group_1_tasks) + assert graph.has_dependency(group_2_tasks, group_1_tasks) is True + + def test_has_dependency_dependency_parallel_with_some_existing_dependencies(self, graph): + task = MockTask() + parallel_tasks = [MockTask() for _ in xrange(3)] + graph.add_tasks(task) + parallel = graph.add_tasks(*parallel_tasks) + graph.add_dependency(task, parallel_tasks[1]) + # only a partial dependency exists - has_dependency is expected to return False + assert graph.has_dependency(task, parallel) is False + + def test_has_nonexistent_dependency_between_groups(self, graph): + group_1_tasks = [MockTask() for _ in xrange(3)] + group_2_tasks = [MockTask() for _ in xrange(3)] + graph.add_tasks(*group_1_tasks) + graph.add_tasks(*group_2_tasks) + assert graph.has_dependency(group_1_tasks, group_2_tasks) is False + + def test_remove_dependency(self, graph): + task = MockTask() + dependency_task = MockTask() + another_dependent_task = MockTask() + graph.add_tasks(task) + graph.add_tasks(dependency_task) + graph.add_tasks(another_dependent_task) + graph.add_dependency(task, dependency_task) + graph.add_dependency(another_dependent_task, dependency_task) + + graph.remove_dependency(task, dependency_task) + remove_result = graph.has_dependency(task, dependency_task) + assert remove_result is False + assert graph.has_dependency(task, dependency_task) is False + assert graph.has_dependency(another_dependent_task, dependency_task) is True + + def test_remove_nonexistent_dependency(self, graph): + task = MockTask() + dependency_task = MockTask() + graph.add_tasks(task) + graph.add_tasks(dependency_task) + # removing a dependency not in graph - should have no effect, and return False + graph.remove_dependency(task, dependency_task) + remove_result = graph.has_dependency(task, dependency_task) + assert remove_result is False + tasks = [t for t in graph.tasks] + assert set(tasks) == set([task, dependency_task]) + + def test_remove_dependency_nonexistent_dependent(self, graph): + task = MockTask() + task_not_in_graph = MockTask() + graph.add_tasks(task) + with pytest.raises(task_graph.TaskNotInGraphError): + graph.remove_dependency(task_not_in_graph, task) + + def test_remove_dependency_nonexistent_dependency(self, graph): + # in this test the dependency *task* is not in the graph, not just the dependency itself + task = MockTask() + task_not_in_graph = MockTask() + graph.add_tasks(task) + with pytest.raises(task_graph.TaskNotInGraphError): + graph.remove_dependency(task, task_not_in_graph) + + def test_remove_dependency_empty_dependent(self, graph): + task = MockTask() + graph.add_tasks(task) + # expecting remove_dependency result to be False - no dependency has been created + graph.remove_dependency([], task) + assert set(graph.tasks) == set((task,)) + + def test_remove_dependency_empty_dependency(self, graph): + task = MockTask() + graph.add_tasks(task) + # expecting remove_dependency result to be False - no dependency has been created + graph.remove_dependency(task, []) + assert set(graph.tasks) == set((task,)) + + def test_remove_dependency_dependent_group(self, graph): + task = MockTask() + group_tasks = [MockTask() for _ in xrange(3)] + graph.add_tasks(task) + graph.add_tasks(*group_tasks) + graph.add_dependency(group_tasks, task) + graph.remove_dependency(group_tasks, task) + remove_result = graph.has_dependency(group_tasks, task) + assert remove_result is False + assert graph.has_dependency(group_tasks[0], task) is False + assert graph.has_dependency(group_tasks[1], task) is False + assert graph.has_dependency(group_tasks[2], task) is False + + def test_remove_dependency_dependency_group(self, graph): + task = MockTask() + group_tasks = [MockTask() for _ in xrange(3)] + graph.add_tasks(task) + graph.add_tasks(*group_tasks) + graph.add_dependency(task, group_tasks) + graph.remove_dependency(task, group_tasks) + remove_result = graph.has_dependency(task, group_tasks) + assert remove_result is False + assert graph.has_dependency(task, group_tasks[0]) is False + assert graph.has_dependency(task, group_tasks[1]) is False + assert graph.has_dependency(task, group_tasks[2]) is False + + def test_remove_dependency_between_groups(self, graph): + group_1_tasks = [MockTask() for _ in xrange(3)] + group_2_tasks = [MockTask() for _ in xrange(3)] + graph.add_tasks(*group_1_tasks) + graph.add_tasks(*group_2_tasks) + graph.add_dependency(group_2_tasks, group_1_tasks) + graph.remove_dependency(group_2_tasks, group_1_tasks) + remove_result = graph.has_dependency(group_2_tasks, group_1_tasks) + assert remove_result is False + for group_2_task in group_2_tasks: + assert graph.has_dependency(group_2_task, group_1_tasks[0]) is False + assert graph.has_dependency(group_2_task, group_1_tasks[1]) is False + assert graph.has_dependency(group_2_task, group_1_tasks[2]) is False + + def test_remove_dependency_dependency_group_with_some_existing_dependencies(self, graph): + task = MockTask() + group_tasks = [MockTask() for _ in xrange(3)] + graph.add_tasks(task) + graph.add_tasks(*group_tasks) + graph.add_dependency(task, group_tasks[1]) + graph.remove_dependency(task, group_tasks) + remove_result = graph.has_dependency(task, group_tasks) + # only a partial dependency exists - remove_dependency is expected to return False + assert remove_result is False + # no dependencies are expected to have changed + assert graph.has_dependency(task, group_tasks[0]) is False + assert graph.has_dependency(task, group_tasks[1]) is True + assert graph.has_dependency(task, group_tasks[2]) is False + + def test_remove_nonexistent_dependency_between_groups(self, graph): + group_1_tasks = [MockTask() for _ in xrange(3)] + group_2_tasks = [MockTask() for _ in xrange(3)] + graph.add_tasks(*group_1_tasks) + graph.add_tasks(*group_2_tasks) + # removing a dependency not in graph - should have no effect, and return False + graph.remove_dependency(group_2_tasks, group_1_tasks) + remove_result = graph.has_dependency(group_2_tasks, group_1_tasks) + assert remove_result is False + + # nested tests + + def test_group_with_nested_sequence(self, graph): + all_tasks = [MockTask() for _ in xrange(5)] + graph.add_tasks(all_tasks[0], + graph.sequence(all_tasks[1], all_tasks[2], all_tasks[3]), + all_tasks[4]) + assert set(graph.tasks) == set(all_tasks) + + # tasks[2] and tasks[3] should each have a single dependency; the rest should have none + assert len(list(graph.get_dependencies(all_tasks[0]))) == 0 + assert len(list(graph.get_dependencies(all_tasks[1]))) == 0 + assert set(graph.get_dependencies(all_tasks[2])) == set([all_tasks[1]]) + assert set(graph.get_dependencies(all_tasks[3])) == set([all_tasks[2]]) + assert len(list(graph.get_dependencies(all_tasks[4]))) == 0 + + def test_group_with_nested_group(self, graph): + tasks = [MockTask() for _ in xrange(5)] + graph.add_tasks(tasks[0], (tasks[1], tasks[2], tasks[3]), tasks[4]) + graph_tasks = [t for t in graph.tasks] + assert set(graph_tasks) == set(tasks) + # none of the tasks should have any dependencies + for i in xrange(len(tasks)): + assert len(list(graph.get_dependencies(tasks[i]))) == 0 + + def test_group_with_recursively_nested_group(self, graph): + recursively_nested_tasks = [MockTask(), MockTask(), MockTask()] + nested_tasks = [MockTask(), MockTask(), MockTask(), recursively_nested_tasks] + tasks = [MockTask(), MockTask(), MockTask(), nested_tasks] + graph.add_tasks(*tasks) + + assert set(graph.tasks) == set(tasks[:3] + nested_tasks[:3] + recursively_nested_tasks) + for tasks_list in [tasks, nested_tasks, recursively_nested_tasks]: + for i in xrange(len(tasks_list[:3])): + assert len(list(graph.get_dependencies(tasks_list[i]))) == 0 + + def test_group_with_recursively_nested_group_and_interdependencies(self, graph): + recursively_nested_tasks = [MockTask(), MockTask(), MockTask()] + nested_tasks = [MockTask(), MockTask(), MockTask(), recursively_nested_tasks] + tasks = [MockTask(), MockTask(), MockTask(), nested_tasks] + graph.add_tasks(*tasks) + + graph.add_dependency(tasks[2], nested_tasks[2]) + graph.add_dependency(nested_tasks[1], recursively_nested_tasks[0]) + graph.add_dependency(recursively_nested_tasks[1], tasks[0]) + + assert set(graph.tasks) == set(tasks[:3] + nested_tasks[:3] + recursively_nested_tasks) + assert set(graph.get_dependencies(tasks[0])) == set() + assert set(graph.get_dependencies(tasks[1])) == set() + assert set(graph.get_dependencies(tasks[2])) == set([nested_tasks[2]]) + + assert set(graph.get_dependencies(nested_tasks[0])) == set() + assert set(graph.get_dependencies(nested_tasks[1])) == set([recursively_nested_tasks[0]]) + assert set(graph.get_dependencies(nested_tasks[2])) == set() + + assert set(graph.get_dependencies(recursively_nested_tasks[0])) == set() + assert set(graph.get_dependencies(recursively_nested_tasks[1])) == set([tasks[0]]) + assert set(graph.get_dependencies(recursively_nested_tasks[2])) == set() + + +class TestTaskGraphSequence(object): + + def test_sequence(self, graph): + tasks = [MockTask(), MockTask(), MockTask()] + graph.sequence(*tasks) + graph_tasks = [t for t in graph.tasks] + assert set(graph_tasks) == set(tasks) + assert len(list(graph.get_dependencies(tasks[0]))) == 0 + assert set(graph.get_dependencies(tasks[1])) == set([tasks[0]]) + assert set(graph.get_dependencies(tasks[2])) == set([tasks[1]]) + + def test_sequence_with_some_tasks_and_dependencies_already_in_graph(self, graph): + # tests both that tasks which werent previously in graph get inserted, and + # that existing tasks don't get re-added to graph + tasks = [MockTask(), MockTask(), MockTask()] + # insert some tasks and dependencies to the graph + graph.add_tasks(tasks[1]) + graph.add_tasks(tasks[2]) + graph.add_dependency(tasks[2], tasks[1]) + + graph.sequence(*tasks) + graph_tasks = [t for t in graph.tasks] + assert set(graph_tasks) == set(tasks) + assert len(list(graph.get_dependencies(tasks[0]))) == 0 + assert set(graph.get_dependencies(tasks[1])) == set([tasks[0]]) + assert set(graph.get_dependencies(tasks[2])) == set([tasks[1]]) + + def test_sequence_with_nested_sequence(self, graph): + tasks = [MockTask() for _ in xrange(5)] + graph.sequence(tasks[0], graph.sequence(tasks[1], tasks[2], tasks[3]), tasks[4]) + graph_tasks = [t for t in graph.tasks] + assert set(graph_tasks) == set(tasks) + # first task should have no dependencies + assert len(list(graph.get_dependencies(tasks[0]))) == 0 + assert len(list(graph.get_dependencies(tasks[1]))) == 1 + assert len(list(graph.get_dependencies(tasks[2]))) == 2 + assert len(list(graph.get_dependencies(tasks[3]))) == 2 + assert len(list(graph.get_dependencies(tasks[4]))) == 3 + + def test_sequence_with_nested_group(self, graph): + tasks = [MockTask() for _ in xrange(5)] + graph.sequence(tasks[0], (tasks[1], tasks[2], tasks[3]), tasks[4]) + graph_tasks = [t for t in graph.tasks] + assert set(graph_tasks) == set(tasks) + # first task should have no dependencies + assert len(list(graph.get_dependencies(tasks[0]))) == 0 + # rest of the tasks (except last) should have a single dependency - the first task + for i in xrange(1, 4): + assert set(graph.get_dependencies(tasks[i])) == set([tasks[0]]) + # last task should have have a dependency on all tasks except for the first one + assert set(graph.get_dependencies(tasks[4])) == set([tasks[1], tasks[2], tasks[3]]) + + def test_sequence_with_recursively_nested_group(self, graph): + recursively_nested_group = [MockTask(), MockTask()] + nested_group = [MockTask(), recursively_nested_group, MockTask()] + sequence_tasks = [MockTask(), nested_group, MockTask()] + + graph.sequence(*sequence_tasks) + graph_tasks = [t for t in graph.tasks] + assert set(graph_tasks) == set([sequence_tasks[0], nested_group[0], + recursively_nested_group[0], recursively_nested_group[1], + nested_group[2], sequence_tasks[2]]) + + assert list(graph.get_dependencies(nested_group[0])) == [sequence_tasks[0]] + assert list(graph.get_dependencies(recursively_nested_group[0])) == [sequence_tasks[0]] + assert list(graph.get_dependencies(recursively_nested_group[1])) == [sequence_tasks[0]] + assert list(graph.get_dependencies(nested_group[2])) == [sequence_tasks[0]] + + assert list(graph.get_dependents(nested_group[0])) == [sequence_tasks[2]] + assert list(graph.get_dependents(recursively_nested_group[0])) == [sequence_tasks[2]] + assert list(graph.get_dependents(recursively_nested_group[1])) == [sequence_tasks[2]] + assert list(graph.get_dependents(nested_group[2])) == [sequence_tasks[2]] + + def test_sequence_with_empty_group(self, graph): + tasks = [MockTask(), [], MockTask()] + graph.sequence(*tasks) + graph_tasks = set([t for t in graph.tasks]) + assert graph_tasks == set([tasks[0], tasks[2]]) + assert list(graph.get_dependents(tasks[0])) == [tasks[2]] + assert list(graph.get_dependencies(tasks[2])) == [tasks[0]] + + def test_sequence_with_recursively_nested_sequence_and_interdependencies(self, graph): + recursively_nested_tasks = list(graph.sequence(MockTask(), MockTask(), MockTask())) + nested_tasks = list(graph.sequence(MockTask(), + MockTask(), + MockTask(), + recursively_nested_tasks)) + tasks = [MockTask(), MockTask(), MockTask(), nested_tasks] + graph.sequence(*tasks) + + assert set(graph.tasks) == set(tasks[:3] + nested_tasks[:3] + recursively_nested_tasks) + assert set(graph.get_dependencies(tasks[0])) == set() + for i in xrange(1, len(tasks[:-1])): + assert set(graph.get_dependencies(tasks[i])) == set([tasks[i - 1]]) + + assert set(graph.get_dependencies(nested_tasks[0])) == set([tasks[2]]) + for i in xrange(1, len(nested_tasks[:-1])): + assert set(graph.get_dependencies(nested_tasks[i])) == \ + set([tasks[2], nested_tasks[i-1]]) + + assert set(graph.get_dependencies(recursively_nested_tasks[0])) == \ + set([tasks[2], nested_tasks[2]]) + for i in xrange(1, len(recursively_nested_tasks[:-1])): + assert set(graph.get_dependencies(recursively_nested_tasks[i])) == \ + set([tasks[2], nested_tasks[2], recursively_nested_tasks[i-1]]) diff --git a/azure/aria/aria-extension-cloudify/src/aria/tests/orchestrator/workflows/builtin/__init__.py b/azure/aria/aria-extension-cloudify/src/aria/tests/orchestrator/workflows/builtin/__init__.py new file mode 100644 index 0000000..1809f82 --- /dev/null +++ b/azure/aria/aria-extension-cloudify/src/aria/tests/orchestrator/workflows/builtin/__init__.py @@ -0,0 +1,70 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from aria.orchestrator.workflows.builtin import workflows + + +def _assert_relationships(operations, expected_op_full_name, relationships=0): + """ + + :param operations: and iterable of operations + :param expected_op_full_name: Note that source/target doesn't really matter since they are + dropped + :param relationships: the number of relationships + :return: + """ + expected_op_name = expected_op_full_name.rsplit('_', 1)[0] + for _ in xrange(relationships): + # Since the target and source operations start of the same way, we only need to retrieve the + # suffix once + operation = next(operations) + relationship_id_1 = operation.actor.id + _assert_cfg_interface_op(operation, expected_op_name) + + operation = next(operations) + relationship_id_2 = operation.actor.id + _assert_cfg_interface_op(operation, expected_op_name) + + assert relationship_id_1 == relationship_id_2 + + +def assert_node_install_operations(operations, relationships=0): + operations = iter(operations) + + _assert_std_interface_op(next(operations), workflows.NORMATIVE_CREATE) + _assert_relationships(operations, workflows.NORMATIVE_PRE_CONFIGURE_SOURCE, relationships) + _assert_std_interface_op(next(operations), workflows.NORMATIVE_CONFIGURE) + _assert_relationships(operations, workflows.NORMATIVE_POST_CONFIGURE_SOURCE, relationships) + _assert_std_interface_op(next(operations), workflows.NORMATIVE_START) + _assert_relationships(operations, workflows.NORMATIVE_ADD_SOURCE, relationships) + + +def assert_node_uninstall_operations(operations, relationships=0): + operations = iter(operations) + + _assert_std_interface_op(next(operations), workflows.NORMATIVE_STOP) + _assert_relationships(operations, workflows.NORMATIVE_REMOVE_SOURCE, relationships) + _assert_std_interface_op(next(operations), workflows.NORMATIVE_DELETE) + + +def _assert_cfg_interface_op(op, operation_name): + # We need to remove the source/target + assert op.operation_name.rsplit('_', 1)[0] == operation_name + assert op.interface_name == workflows.NORMATIVE_CONFIGURE_INTERFACE + + +def _assert_std_interface_op(op, operation_name): + assert op.operation_name == operation_name + assert op.interface_name == workflows.NORMATIVE_STANDARD_INTERFACE diff --git a/azure/aria/aria-extension-cloudify/src/aria/tests/orchestrator/workflows/builtin/test_execute_operation.py b/azure/aria/aria-extension-cloudify/src/aria/tests/orchestrator/workflows/builtin/test_execute_operation.py new file mode 100644 index 0000000..8713e3c --- /dev/null +++ b/azure/aria/aria-extension-cloudify/src/aria/tests/orchestrator/workflows/builtin/test_execute_operation.py @@ -0,0 +1,64 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import pytest + +from aria.orchestrator.workflows.api import task +from aria.orchestrator.workflows.builtin.execute_operation import execute_operation + +from tests import mock, storage + + +@pytest.fixture +def ctx(tmpdir): + context = mock.context.simple(str(tmpdir), inmemory=False) + yield context + storage.release_sqlite_storage(context.model) + + +def test_execute_operation(ctx): + node = ctx.model.node.get_by_name(mock.models.DEPENDENCY_NODE_NAME) + interface_name, operation_name = mock.operations.NODE_OPERATIONS_INSTALL[0] + interface = mock.models.create_interface( + ctx.service, + interface_name, + operation_name, + operation_kwargs=dict(function='test') + ) + node.interfaces[interface.name] = interface + ctx.model.node.update(node) + + execute_tasks = list( + task.WorkflowTask( + execute_operation, + ctx=ctx, + interface_name=interface_name, + operation_name=operation_name, + operation_kwargs={}, + allow_kwargs_override=False, + run_by_dependency_order=False, + type_names=[], + node_template_ids=[], + node_ids=[node.id] + ).topological_order() + ) + + assert len(execute_tasks) == 1 + assert getattr(execute_tasks[0].actor, '_wrapped', execute_tasks[0].actor) == node + assert execute_tasks[0].operation_name == operation_name + assert execute_tasks[0].interface_name == interface_name + + +# TODO: add more scenarios diff --git a/azure/aria/aria-extension-cloudify/src/aria/tests/orchestrator/workflows/builtin/test_heal.py b/azure/aria/aria-extension-cloudify/src/aria/tests/orchestrator/workflows/builtin/test_heal.py new file mode 100644 index 0000000..0a422bd --- /dev/null +++ b/azure/aria/aria-extension-cloudify/src/aria/tests/orchestrator/workflows/builtin/test_heal.py @@ -0,0 +1,100 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import pytest + +from aria.orchestrator.workflows.api import task +from aria.orchestrator.workflows.builtin.heal import heal + +from tests import mock, storage + +from . import (assert_node_install_operations, assert_node_uninstall_operations) + + +@pytest.fixture +def ctx(tmpdir): + context = mock.context.simple(str(tmpdir)) + yield context + storage.release_sqlite_storage(context.model) + + +@pytest.mark.skip(reason='heal is not implemented for now') +def test_heal_dependent_node(ctx): + dependent_node = \ + ctx.model.node.get_by_name(mock.models.DEPENDENT_NODE_NAME) + dependent_node.host_fk = dependent_node.id + ctx.model.node.update(dependent_node) + heal_graph = task.WorkflowTask(heal, ctx=ctx, node_id=dependent_node.id) + + assert len(list(heal_graph.tasks)) == 2 + uninstall_subgraph, install_subgraph = list(heal_graph.topological_order(reverse=True)) + + assert len(list(uninstall_subgraph.tasks)) == 2 + dependent_node_subgraph_uninstall, dependency_node_subgraph_uninstall = \ + list(uninstall_subgraph.topological_order(reverse=True)) + + assert len(list(install_subgraph.tasks)) == 2 + dependency_node_subgraph_install, dependent_node_subgraph_install = \ + list(install_subgraph.topological_order(reverse=True)) + + dependent_node_uninstall_tasks = \ + list(dependent_node_subgraph_uninstall.topological_order(reverse=True)) + assert isinstance(dependency_node_subgraph_uninstall, task.StubTask) + dependent_node_install_tasks = \ + list(dependent_node_subgraph_install.topological_order(reverse=True)) + assert isinstance(dependency_node_subgraph_install, task.StubTask) + + assert_node_uninstall_operations(dependent_node_uninstall_tasks, relationships=1) + assert_node_install_operations(dependent_node_install_tasks, relationships=1) + + +@pytest.mark.skip(reason='heal is not implemented for now') +def test_heal_dependency_node(ctx): + dependency_node = \ + ctx.model.node.get_by_name(mock.models.DEPENDENCY_NODE_NAME) + dependency_node.host_fk = dependency_node.id + ctx.model.node.update(dependency_node) + heal_graph = task.WorkflowTask(heal, ctx=ctx, node_id=dependency_node.id) + # both subgraphs should contain un\install for both the dependent and the dependency + assert len(list(heal_graph.tasks)) == 2 + uninstall_subgraph, install_subgraph = list(heal_graph.topological_order(reverse=True)) + + uninstall_tasks = list(uninstall_subgraph.topological_order(reverse=True)) + assert len(uninstall_tasks) == 4 + unlink_source, unlink_target = uninstall_tasks[:2] + dependent_node_subgraph_uninstall, dependency_node_subgraph_uninstall = uninstall_tasks[2:] + + install_tasks = list(install_subgraph.topological_order(reverse=True)) + assert len(install_tasks) == 4 + dependency_node_subgraph_install, dependent_node_subgraph_install = install_tasks[:2] + establish_source, establish_target = install_tasks[2:] + + assert isinstance(dependent_node_subgraph_uninstall, task.StubTask) + dependency_node_uninstall_tasks = \ + list(dependency_node_subgraph_uninstall.topological_order(reverse=True)) + assert isinstance(dependent_node_subgraph_install, task.StubTask) + dependency_node_install_tasks = \ + list(dependency_node_subgraph_install.topological_order(reverse=True)) + + assert unlink_source.name.startswith('aria.interfaces.relationship_lifecycle.unlink') + assert unlink_target.name.startswith('aria.interfaces.relationship_lifecycle.unlink') + assert_node_uninstall_operations(dependency_node_uninstall_tasks) + + assert_node_install_operations(dependency_node_install_tasks) + assert establish_source.name.startswith('aria.interfaces.relationship_lifecycle.establish') + assert establish_target.name.startswith('aria.interfaces.relationship_lifecycle.establish') + + +# TODO: add tests for contained in scenario diff --git a/azure/aria/aria-extension-cloudify/src/aria/tests/orchestrator/workflows/builtin/test_install.py b/azure/aria/aria-extension-cloudify/src/aria/tests/orchestrator/workflows/builtin/test_install.py new file mode 100644 index 0000000..1a4e1f9 --- /dev/null +++ b/azure/aria/aria-extension-cloudify/src/aria/tests/orchestrator/workflows/builtin/test_install.py @@ -0,0 +1,46 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +import pytest + +from aria.orchestrator.workflows.api import task +from aria.orchestrator.workflows.builtin.install import install + +from tests import mock +from tests import storage + +from . import assert_node_install_operations + + +@pytest.fixture +def ctx(tmpdir): + context = mock.context.simple(str(tmpdir), + topology=mock.topology.create_simple_topology_three_nodes) + yield context + storage.release_sqlite_storage(context.model) + + +def test_install(ctx): + + install_tasks = list(task.WorkflowTask(install, ctx=ctx).topological_order(True)) + + assert len(install_tasks) == 3 + dependency_node_subgraph1, dependency_node_subgraph2, dependent_node_subgraph = install_tasks + dependent_node_tasks = list(dependent_node_subgraph.topological_order(reverse=True)) + dependency_node1_tasks = list(dependency_node_subgraph1.topological_order(reverse=True)) + dependency_node2_tasks = list(dependency_node_subgraph2.topological_order(reverse=True)) + + assert_node_install_operations(dependency_node1_tasks) + assert_node_install_operations(dependency_node2_tasks) + assert_node_install_operations(dependent_node_tasks, relationships=2) diff --git a/azure/aria/aria-extension-cloudify/src/aria/tests/orchestrator/workflows/builtin/test_uninstall.py b/azure/aria/aria-extension-cloudify/src/aria/tests/orchestrator/workflows/builtin/test_uninstall.py new file mode 100644 index 0000000..aa04c38 --- /dev/null +++ b/azure/aria/aria-extension-cloudify/src/aria/tests/orchestrator/workflows/builtin/test_uninstall.py @@ -0,0 +1,47 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import pytest + +from aria.orchestrator.workflows.api import task +from aria.orchestrator.workflows.builtin.uninstall import uninstall + +from tests import mock +from tests import storage + +from . import assert_node_uninstall_operations + + +@pytest.fixture +def ctx(tmpdir): + context = mock.context.simple(str(tmpdir), + topology=mock.topology.create_simple_topology_three_nodes) + yield context + storage.release_sqlite_storage(context.model) + + +def test_uninstall(ctx): + + uninstall_tasks = list(task.WorkflowTask(uninstall, ctx=ctx).topological_order(True)) + + assert len(uninstall_tasks) == 3 + dependent_node_subgraph, dependency_node_subgraph1, dependency_node_subgraph2 = uninstall_tasks + dependent_node_tasks = list(dependent_node_subgraph.topological_order(reverse=True)) + dependency_node1_tasks = list(dependency_node_subgraph1.topological_order(reverse=True)) + dependency_node2_tasks = list(dependency_node_subgraph2.topological_order(reverse=True)) + + assert_node_uninstall_operations(operations=dependency_node1_tasks) + assert_node_uninstall_operations(operations=dependency_node2_tasks) + assert_node_uninstall_operations(operations=dependent_node_tasks, relationships=2) diff --git a/azure/aria/aria-extension-cloudify/src/aria/tests/orchestrator/workflows/core/__init__.py b/azure/aria/aria-extension-cloudify/src/aria/tests/orchestrator/workflows/core/__init__.py new file mode 100644 index 0000000..ae1e83e --- /dev/null +++ b/azure/aria/aria-extension-cloudify/src/aria/tests/orchestrator/workflows/core/__init__.py @@ -0,0 +1,14 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. diff --git a/azure/aria/aria-extension-cloudify/src/aria/tests/orchestrator/workflows/core/test_engine.py b/azure/aria/aria-extension-cloudify/src/aria/tests/orchestrator/workflows/core/test_engine.py new file mode 100644 index 0000000..0c704f5 --- /dev/null +++ b/azure/aria/aria-extension-cloudify/src/aria/tests/orchestrator/workflows/core/test_engine.py @@ -0,0 +1,564 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +import time +import threading +from datetime import datetime + +import pytest + +from aria.orchestrator import ( + events, + workflow, + operation, +) +from aria.modeling import models +from aria.orchestrator.workflows import ( + api, + exceptions, +) +from aria.orchestrator.workflows.core import engine, graph_compiler +from aria.orchestrator.workflows.executor import thread + +from tests import mock, storage + + +global_test_holder = {} + + +class BaseTest(object): + + @classmethod + def _execute(cls, workflow_func, workflow_context, executor): + eng = cls._engine(workflow_func=workflow_func, + workflow_context=workflow_context, + executor=executor) + eng.execute(ctx=workflow_context) + return eng + + @staticmethod + def _engine(workflow_func, workflow_context, executor): + graph = workflow_func(ctx=workflow_context) + graph_compiler.GraphCompiler(workflow_context, executor.__class__).compile(graph) + + return engine.Engine(executors={executor.__class__: executor}) + + @staticmethod + def _create_interface(ctx, func, arguments=None): + node = ctx.model.node.get_by_name(mock.models.DEPENDENCY_NODE_NAME) + interface_name = 'aria.interfaces.lifecycle' + operation_kwargs = dict(function='{name}.{func.__name__}'.format( + name=__name__, func=func)) + if arguments: + # the operation has to declare the arguments before those may be passed + operation_kwargs['arguments'] = arguments + operation_name = 'create' + interface = mock.models.create_interface(node.service, interface_name, operation_name, + operation_kwargs=operation_kwargs) + node.interfaces[interface.name] = interface + ctx.model.node.update(node) + + return node, interface_name, operation_name + + @staticmethod + def _op(node, + operation_name, + arguments=None, + max_attempts=None, + retry_interval=None, + ignore_failure=None): + + return api.task.OperationTask( + node, + interface_name='aria.interfaces.lifecycle', + operation_name=operation_name, + arguments=arguments, + max_attempts=max_attempts, + retry_interval=retry_interval, + ignore_failure=ignore_failure, + ) + + @pytest.fixture(autouse=True) + def globals_cleanup(self): + try: + yield + finally: + global_test_holder.clear() + + @pytest.fixture(autouse=True) + def signals_registration(self, ): + def sent_task_handler(ctx, *args, **kwargs): + if ctx.task._stub_type is None: + calls = global_test_holder.setdefault('sent_task_signal_calls', 0) + global_test_holder['sent_task_signal_calls'] = calls + 1 + + def start_workflow_handler(workflow_context, *args, **kwargs): + workflow_context.states.append('start') + + def success_workflow_handler(workflow_context, *args, **kwargs): + workflow_context.states.append('success') + + def failure_workflow_handler(workflow_context, exception, *args, **kwargs): + workflow_context.states.append('failure') + workflow_context.exception = exception + + def cancel_workflow_handler(workflow_context, *args, **kwargs): + workflow_context.states.append('cancel') + + events.start_workflow_signal.connect(start_workflow_handler) + events.on_success_workflow_signal.connect(success_workflow_handler) + events.on_failure_workflow_signal.connect(failure_workflow_handler) + events.on_cancelled_workflow_signal.connect(cancel_workflow_handler) + events.sent_task_signal.connect(sent_task_handler) + try: + yield + finally: + events.start_workflow_signal.disconnect(start_workflow_handler) + events.on_success_workflow_signal.disconnect(success_workflow_handler) + events.on_failure_workflow_signal.disconnect(failure_workflow_handler) + events.on_cancelled_workflow_signal.disconnect(cancel_workflow_handler) + events.sent_task_signal.disconnect(sent_task_handler) + + @pytest.fixture + def executor(self): + result = thread.ThreadExecutor() + try: + yield result + finally: + result.close() + + @pytest.fixture + def workflow_context(self, tmpdir): + workflow_context = mock.context.simple(str(tmpdir)) + workflow_context.states = [] + workflow_context.exception = None + yield workflow_context + storage.release_sqlite_storage(workflow_context.model) + + +class TestEngine(BaseTest): + + def test_empty_graph_execution(self, workflow_context, executor): + @workflow + def mock_workflow(**_): + pass + self._execute(workflow_func=mock_workflow, + workflow_context=workflow_context, + executor=executor) + assert workflow_context.states == ['start', 'success'] + assert workflow_context.exception is None + assert 'sent_task_signal_calls' not in global_test_holder + execution = workflow_context.execution + assert execution.started_at <= execution.ended_at <= datetime.utcnow() + assert execution.error is None + assert execution.status == models.Execution.SUCCEEDED + + def test_single_task_successful_execution(self, workflow_context, executor): + node, _, operation_name = self._create_interface(workflow_context, mock_success_task) + + @workflow + def mock_workflow(ctx, graph): + graph.add_tasks(self._op(node, operation_name)) + self._execute( + workflow_func=mock_workflow, + workflow_context=workflow_context, + executor=executor) + assert workflow_context.states == ['start', 'success'] + assert workflow_context.exception is None + assert global_test_holder.get('sent_task_signal_calls') == 1 + + def test_single_task_failed_execution(self, workflow_context, executor): + node, _, operation_name = self._create_interface(workflow_context, mock_failed_task) + + @workflow + def mock_workflow(ctx, graph): + graph.add_tasks(self._op(node, operation_name)) + with pytest.raises(exceptions.ExecutorException): + self._execute( + workflow_func=mock_workflow, + workflow_context=workflow_context, + executor=executor) + assert workflow_context.states == ['start', 'failure'] + assert isinstance(workflow_context.exception, exceptions.ExecutorException) + assert global_test_holder.get('sent_task_signal_calls') == 1 + execution = workflow_context.execution + assert execution.started_at <= execution.ended_at <= datetime.utcnow() + assert execution.error is not None + assert execution.status == models.Execution.FAILED + + def test_two_tasks_execution_order(self, workflow_context, executor): + node, _, operation_name = self._create_interface( + workflow_context, mock_ordered_task, {'counter': 1}) + + @workflow + def mock_workflow(ctx, graph): + op1 = self._op(node, operation_name, arguments={'counter': 1}) + op2 = self._op(node, operation_name, arguments={'counter': 2}) + graph.sequence(op1, op2) + self._execute( + workflow_func=mock_workflow, + workflow_context=workflow_context, + executor=executor) + assert workflow_context.states == ['start', 'success'] + assert workflow_context.exception is None + assert global_test_holder.get('invocations') == [1, 2] + assert global_test_holder.get('sent_task_signal_calls') == 2 + + def test_stub_and_subworkflow_execution(self, workflow_context, executor): + node, _, operation_name = self._create_interface( + workflow_context, mock_ordered_task, {'counter': 1}) + + @workflow + def sub_workflow(ctx, graph): + op1 = self._op(node, operation_name, arguments={'counter': 1}) + op2 = api.task.StubTask() + op3 = self._op(node, operation_name, arguments={'counter': 2}) + graph.sequence(op1, op2, op3) + + @workflow + def mock_workflow(ctx, graph): + graph.add_tasks(api.task.WorkflowTask(sub_workflow, ctx=ctx)) + self._execute(workflow_func=mock_workflow, + workflow_context=workflow_context, + executor=executor) + assert workflow_context.states == ['start', 'success'] + assert workflow_context.exception is None + assert global_test_holder.get('invocations') == [1, 2] + assert global_test_holder.get('sent_task_signal_calls') == 2 + + +class TestCancel(BaseTest): + + def test_cancel_started_execution(self, workflow_context, executor): + number_of_tasks = 100 + node, _, operation_name = self._create_interface( + workflow_context, mock_sleep_task, {'seconds': 0.1}) + + @workflow + def mock_workflow(ctx, graph): + operations = ( + self._op(node, operation_name, arguments=dict(seconds=0.1)) + for _ in range(number_of_tasks) + ) + return graph.sequence(*operations) + + eng = self._engine(workflow_func=mock_workflow, + workflow_context=workflow_context, + executor=executor) + t = threading.Thread(target=eng.execute, kwargs=dict(ctx=workflow_context)) + t.daemon = True + t.start() + time.sleep(10) + eng.cancel_execution(workflow_context) + t.join(timeout=60) # we need to give this a *lot* of time because Travis can be *very* slow + assert not t.is_alive() # if join is timed out it will not raise an exception + assert workflow_context.states == ['start', 'cancel'] + assert workflow_context.exception is None + invocations = global_test_holder.get('invocations', []) + assert 0 < len(invocations) < number_of_tasks + execution = workflow_context.execution + assert execution.started_at <= execution.ended_at <= datetime.utcnow() + assert execution.error is None + assert execution.status == models.Execution.CANCELLED + + def test_cancel_pending_execution(self, workflow_context, executor): + @workflow + def mock_workflow(graph, **_): + return graph + eng = self._engine(workflow_func=mock_workflow, + workflow_context=workflow_context, + executor=executor) + eng.cancel_execution(workflow_context) + execution = workflow_context.execution + assert execution.status == models.Execution.CANCELLED + + +class TestRetries(BaseTest): + + def test_two_max_attempts_and_success_on_retry(self, workflow_context, executor): + node, _, operation_name = self._create_interface( + workflow_context, mock_conditional_failure_task, {'failure_count': 1}) + + @workflow + def mock_workflow(ctx, graph): + op = self._op(node, operation_name, + arguments={'failure_count': 1}, + max_attempts=2) + graph.add_tasks(op) + self._execute( + workflow_func=mock_workflow, + workflow_context=workflow_context, + executor=executor) + assert workflow_context.states == ['start', 'success'] + assert workflow_context.exception is None + assert len(global_test_holder.get('invocations', [])) == 2 + assert global_test_holder.get('sent_task_signal_calls') == 2 + + def test_two_max_attempts_and_failure_on_retry(self, workflow_context, executor): + node, _, operation_name = self._create_interface( + workflow_context, mock_conditional_failure_task, {'failure_count': 1}) + + @workflow + def mock_workflow(ctx, graph): + op = self._op(node, operation_name, + arguments={'failure_count': 2}, + max_attempts=2) + graph.add_tasks(op) + with pytest.raises(exceptions.ExecutorException): + self._execute( + workflow_func=mock_workflow, + workflow_context=workflow_context, + executor=executor) + assert workflow_context.states == ['start', 'failure'] + assert isinstance(workflow_context.exception, exceptions.ExecutorException) + assert len(global_test_holder.get('invocations', [])) == 2 + assert global_test_holder.get('sent_task_signal_calls') == 2 + + def test_three_max_attempts_and_success_on_first_retry(self, workflow_context, executor): + node, _, operation_name = self._create_interface( + workflow_context, mock_conditional_failure_task, {'failure_count': 1}) + @workflow + def mock_workflow(ctx, graph): + op = self._op(node, operation_name, + arguments={'failure_count': 1}, + max_attempts=3) + graph.add_tasks(op) + self._execute( + workflow_func=mock_workflow, + workflow_context=workflow_context, + executor=executor) + assert workflow_context.states == ['start', 'success'] + assert workflow_context.exception is None + assert len(global_test_holder.get('invocations', [])) == 2 + assert global_test_holder.get('sent_task_signal_calls') == 2 + + def test_three_max_attempts_and_success_on_second_retry(self, workflow_context, executor): + node, _, operation_name = self._create_interface( + workflow_context, mock_conditional_failure_task, {'failure_count': 1}) + + @workflow + def mock_workflow(ctx, graph): + op = self._op(node, operation_name, + arguments={'failure_count': 2}, + max_attempts=3) + graph.add_tasks(op) + self._execute( + workflow_func=mock_workflow, + workflow_context=workflow_context, + executor=executor) + assert workflow_context.states == ['start', 'success'] + assert workflow_context.exception is None + assert len(global_test_holder.get('invocations', [])) == 3 + assert global_test_holder.get('sent_task_signal_calls') == 3 + + def test_infinite_retries(self, workflow_context, executor): + node, _, operation_name = self._create_interface( + workflow_context, mock_conditional_failure_task, {'failure_count': 1}) + @workflow + def mock_workflow(ctx, graph): + op = self._op(node, operation_name, + arguments={'failure_count': 1}, + max_attempts=-1) + graph.add_tasks(op) + self._execute( + workflow_func=mock_workflow, + workflow_context=workflow_context, + executor=executor) + assert workflow_context.states == ['start', 'success'] + assert workflow_context.exception is None + assert len(global_test_holder.get('invocations', [])) == 2 + assert global_test_holder.get('sent_task_signal_calls') == 2 + + def test_retry_interval_float(self, workflow_context, executor): + self._test_retry_interval(retry_interval=0.3, + workflow_context=workflow_context, + executor=executor) + + def test_retry_interval_int(self, workflow_context, executor): + self._test_retry_interval(retry_interval=1, + workflow_context=workflow_context, + executor=executor) + + def _test_retry_interval(self, retry_interval, workflow_context, executor): + node, _, operation_name = self._create_interface( + workflow_context, mock_conditional_failure_task, {'failure_count': 1}) + @workflow + def mock_workflow(ctx, graph): + op = self._op(node, operation_name, + arguments={'failure_count': 1}, + max_attempts=2, + retry_interval=retry_interval) + graph.add_tasks(op) + self._execute( + workflow_func=mock_workflow, + workflow_context=workflow_context, + executor=executor) + assert workflow_context.states == ['start', 'success'] + assert workflow_context.exception is None + invocations = global_test_holder.get('invocations', []) + assert len(invocations) == 2 + invocation1, invocation2 = invocations + assert invocation2 - invocation1 >= retry_interval + assert global_test_holder.get('sent_task_signal_calls') == 2 + + def test_ignore_failure(self, workflow_context, executor): + node, _, operation_name = self._create_interface( + workflow_context, mock_conditional_failure_task, {'failure_count': 1}) + @workflow + def mock_workflow(ctx, graph): + op = self._op(node, operation_name, + ignore_failure=True, + arguments={'failure_count': 100}, + max_attempts=100) + graph.add_tasks(op) + self._execute( + workflow_func=mock_workflow, + workflow_context=workflow_context, + executor=executor) + assert workflow_context.states == ['start', 'success'] + assert workflow_context.exception is None + invocations = global_test_holder.get('invocations', []) + assert len(invocations) == 1 + assert global_test_holder.get('sent_task_signal_calls') == 1 + + +class TestTaskRetryAndAbort(BaseTest): + message = 'EXPECTED_ERROR' + + def test_task_retry_default_interval(self, workflow_context, executor): + default_retry_interval = 0.1 + node, _, operation_name = self._create_interface( + workflow_context, mock_task_retry, {'message': self.message}) + + @workflow + def mock_workflow(ctx, graph): + op = self._op(node, operation_name, + arguments={'message': self.message}, + retry_interval=default_retry_interval, + max_attempts=2) + graph.add_tasks(op) + with pytest.raises(exceptions.ExecutorException): + self._execute( + workflow_func=mock_workflow, + workflow_context=workflow_context, + executor=executor) + assert workflow_context.states == ['start', 'failure'] + assert isinstance(workflow_context.exception, exceptions.ExecutorException) + invocations = global_test_holder.get('invocations', []) + assert len(invocations) == 2 + invocation1, invocation2 = invocations + assert invocation2 - invocation1 >= default_retry_interval + assert global_test_holder.get('sent_task_signal_calls') == 2 + + def test_task_retry_custom_interval(self, workflow_context, executor): + default_retry_interval = 100 + custom_retry_interval = 0.1 + node, _, operation_name = self._create_interface( + workflow_context, mock_task_retry, {'message': self.message, + 'retry_interval': custom_retry_interval}) + + @workflow + def mock_workflow(ctx, graph): + op = self._op(node, operation_name, + arguments={'message': self.message, + 'retry_interval': custom_retry_interval}, + retry_interval=default_retry_interval, + max_attempts=2) + graph.add_tasks(op) + execution_start = time.time() + with pytest.raises(exceptions.ExecutorException): + self._execute( + workflow_func=mock_workflow, + workflow_context=workflow_context, + executor=executor) + execution_end = time.time() + assert workflow_context.states == ['start', 'failure'] + assert isinstance(workflow_context.exception, exceptions.ExecutorException) + invocations = global_test_holder.get('invocations', []) + assert len(invocations) == 2 + assert (execution_end - execution_start) < default_retry_interval + assert global_test_holder.get('sent_task_signal_calls') == 2 + + def test_task_abort(self, workflow_context, executor): + node, _, operation_name = self._create_interface( + workflow_context, mock_task_abort, {'message': self.message}) + @workflow + def mock_workflow(ctx, graph): + op = self._op(node, operation_name, + arguments={'message': self.message}, + retry_interval=100, + max_attempts=100) + graph.add_tasks(op) + with pytest.raises(exceptions.ExecutorException): + self._execute( + workflow_func=mock_workflow, + workflow_context=workflow_context, + executor=executor) + assert workflow_context.states == ['start', 'failure'] + assert isinstance(workflow_context.exception, exceptions.ExecutorException) + invocations = global_test_holder.get('invocations', []) + assert len(invocations) == 1 + assert global_test_holder.get('sent_task_signal_calls') == 1 + + +@operation +def mock_success_task(**_): + pass + + +@operation +def mock_failed_task(**_): + raise RuntimeError + + +@operation +def mock_ordered_task(counter, **_): + invocations = global_test_holder.setdefault('invocations', []) + invocations.append(counter) + + +@operation +def mock_conditional_failure_task(failure_count, **_): + invocations = global_test_holder.setdefault('invocations', []) + try: + if len(invocations) < failure_count: + raise RuntimeError + finally: + invocations.append(time.time()) + + +@operation +def mock_sleep_task(seconds, **_): + _add_invocation_timestamp() + time.sleep(seconds) + + +@operation +def mock_task_retry(ctx, message, retry_interval=None, **_): + _add_invocation_timestamp() + retry_kwargs = {} + if retry_interval is not None: + retry_kwargs['retry_interval'] = retry_interval + ctx.task.retry(message, **retry_kwargs) + + +@operation +def mock_task_abort(ctx, message, **_): + _add_invocation_timestamp() + ctx.task.abort(message) + + +def _add_invocation_timestamp(): + invocations = global_test_holder.setdefault('invocations', []) + invocations.append(time.time()) diff --git a/azure/aria/aria-extension-cloudify/src/aria/tests/orchestrator/workflows/core/test_events.py b/azure/aria/aria-extension-cloudify/src/aria/tests/orchestrator/workflows/core/test_events.py new file mode 100644 index 0000000..d804de5 --- /dev/null +++ b/azure/aria/aria-extension-cloudify/src/aria/tests/orchestrator/workflows/core/test_events.py @@ -0,0 +1,171 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import pytest + +from aria.orchestrator.decorators import operation, workflow +from aria.orchestrator.workflows.core import engine, graph_compiler +from aria.orchestrator.workflows.executor.thread import ThreadExecutor +from aria.orchestrator.workflows import api +from aria.modeling.service_instance import NodeBase + +from tests import mock, storage + +global_test_dict = {} # used to capture transitional node state changes + + +@pytest.fixture +def ctx(tmpdir): + context = mock.context.simple(str(tmpdir)) + yield context + storage.release_sqlite_storage(context.model) + +# TODO another possible approach of writing these tests: +# Don't create a ctx for every test. +# Problem is, that if for every test we create a workflow that contains just one standard +# lifecycle operation, then by the time we try to run the second test, the workflow failes since +# the execution tries to go from 'terminated' to 'pending'. +# And if we write a workflow that contains all the lifecycle operations, then first we need to +# change the api of `mock.models.create_interface`, which a lot of other tests use, and second how +# do we check all the state transition during the workflow execution in a convenient way. + +TYPE_URI_NAME = 'tosca.interfaces.node.lifecycle.Standard' +SHORTHAND_NAME = 'Standard' + + +def test_node_state_changes_as_a_result_of_standard_lifecycle_create(ctx, executor): + node = run_operation_on_node( + ctx, interface_name=TYPE_URI_NAME, op_name='create', executor=executor) + _assert_node_state_changed_as_a_result_of_standard_lifecycle_operation(node, 'create') + + +def test_node_state_changes_as_a_result_of_standard_lifecycle_configure(ctx, executor): + node = run_operation_on_node( + ctx, interface_name=TYPE_URI_NAME, op_name='configure', executor=executor) + _assert_node_state_changed_as_a_result_of_standard_lifecycle_operation(node, 'configure') + + +def test_node_state_changes_as_a_result_of_standard_lifecycle_start(ctx, executor): + node = run_operation_on_node( + ctx, interface_name=TYPE_URI_NAME, op_name='start', executor=executor) + _assert_node_state_changed_as_a_result_of_standard_lifecycle_operation(node, 'start') + + +def test_node_state_changes_as_a_result_of_standard_lifecycle_stop(ctx, executor): + node = run_operation_on_node( + ctx, interface_name=TYPE_URI_NAME, op_name='stop', executor=executor) + _assert_node_state_changed_as_a_result_of_standard_lifecycle_operation(node, 'stop') + + +def test_node_state_changes_as_a_result_of_standard_lifecycle_delete(ctx, executor): + node = run_operation_on_node( + ctx, interface_name=TYPE_URI_NAME, op_name='delete', executor=executor) + _assert_node_state_changed_as_a_result_of_standard_lifecycle_operation(node, 'delete') + + +def test_node_state_changes_as_a_result_of_standard_lifecycle_create_shorthand_name(ctx, executor): + node = run_operation_on_node( + ctx, interface_name=SHORTHAND_NAME, op_name='create', executor=executor) + _assert_node_state_changed_as_a_result_of_standard_lifecycle_operation(node, 'create') + + +def test_node_state_changes_as_a_result_of_standard_lifecycle_configure_shorthand_name( + ctx, executor): + node = run_operation_on_node( + ctx, interface_name=SHORTHAND_NAME, op_name='configure', executor=executor) + _assert_node_state_changed_as_a_result_of_standard_lifecycle_operation(node, 'configure') + + +def test_node_state_changes_as_a_result_of_standard_lifecycle_start_shorthand_name(ctx, executor): + node = run_operation_on_node( + ctx, interface_name=SHORTHAND_NAME, op_name='start', executor=executor) + _assert_node_state_changed_as_a_result_of_standard_lifecycle_operation(node, 'start') + + +def test_node_state_changes_as_a_result_of_standard_lifecycle_stop_shorthand_name(ctx, executor): + node = run_operation_on_node( + ctx, interface_name=SHORTHAND_NAME, op_name='stop', executor=executor) + _assert_node_state_changed_as_a_result_of_standard_lifecycle_operation(node, 'stop') + + +def test_node_state_changes_as_a_result_of_standard_lifecycle_delete_shorthand_name(ctx, executor): + node = run_operation_on_node( + ctx, interface_name=SHORTHAND_NAME, op_name='delete', executor=executor) + _assert_node_state_changed_as_a_result_of_standard_lifecycle_operation(node, 'delete') + + +def test_node_state_doesnt_change_as_a_result_of_an_operation_that_is_not_standard_lifecycle1( + ctx, executor): + node = run_operation_on_node( + ctx, interface_name='interface_name', op_name='op_name', executor=executor) + assert node.state == node.INITIAL + + +def test_node_state_doesnt_change_as_a_result_of_an_operation_that_is_not_standard_lifecycle2( + ctx, executor): + node = run_operation_on_node( + ctx, interface_name='interface_name', op_name='create', executor=executor) + assert node.state == node.INITIAL + + +def run_operation_on_node(ctx, op_name, interface_name, executor): + node = ctx.model.node.get_by_name(mock.models.DEPENDENCY_NODE_NAME) + interface = mock.models.create_interface( + service=node.service, + interface_name=interface_name, + operation_name=op_name, + operation_kwargs=dict(function='{name}.{func.__name__}'.format(name=__name__, func=func))) + node.interfaces[interface.name] = interface + graph_compiler.GraphCompiler(ctx, ThreadExecutor).compile( + single_operation_workflow(ctx, node=node, interface_name=interface_name, op_name=op_name) + ) + + eng = engine.Engine(executors={executor.__class__: executor}) + eng.execute(ctx) + return node + + +def run_standard_lifecycle_operation_on_node(ctx, op_name, executor): + return run_operation_on_node(ctx, + interface_name='aria.interfaces.lifecycle.Standard', + op_name=op_name, + executor=executor) + + +def _assert_node_state_changed_as_a_result_of_standard_lifecycle_operation(node, op_name): + assert global_test_dict['transitional_state'] == NodeBase._OP_TO_STATE[op_name]['transitional'] + assert node.state == NodeBase._OP_TO_STATE[op_name]['finished'] + + +@workflow +def single_operation_workflow(graph, node, interface_name, op_name, **_): + graph.add_tasks(api.task.OperationTask( + node, + interface_name=interface_name, + operation_name=op_name)) + + +@operation +def func(ctx): + global_test_dict['transitional_state'] = ctx.node.state + + +@pytest.fixture +def executor(): + result = ThreadExecutor() + try: + yield result + finally: + result.close() diff --git a/azure/aria/aria-extension-cloudify/src/aria/tests/orchestrator/workflows/core/test_task.py b/azure/aria/aria-extension-cloudify/src/aria/tests/orchestrator/workflows/core/test_task.py new file mode 100644 index 0000000..2b3f7d7 --- /dev/null +++ b/azure/aria/aria-extension-cloudify/src/aria/tests/orchestrator/workflows/core/test_task.py @@ -0,0 +1,153 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +from datetime import ( + datetime, + timedelta +) + +import pytest + +from aria.orchestrator.context import workflow as workflow_context +from aria.orchestrator.workflows import ( + api, + exceptions, +) +from aria.modeling import models + +from tests import mock, storage + +NODE_INTERFACE_NAME = 'Standard' +NODE_OPERATION_NAME = 'create' +RELATIONSHIP_INTERFACE_NAME = 'Configure' +RELATIONSHIP_OPERATION_NAME = 'pre_configure' + + +@pytest.fixture +def ctx(tmpdir): + context = mock.context.simple(str(tmpdir)) + + relationship = context.model.relationship.list()[0] + interface = mock.models.create_interface( + relationship.source_node.service, + RELATIONSHIP_INTERFACE_NAME, + RELATIONSHIP_OPERATION_NAME, + operation_kwargs=dict(function='test') + ) + relationship.interfaces[interface.name] = interface + context.model.relationship.update(relationship) + + node = context.model.node.get_by_name(mock.models.DEPENDENCY_NODE_NAME) + interface = mock.models.create_interface( + node.service, + NODE_INTERFACE_NAME, + NODE_OPERATION_NAME, + operation_kwargs=dict(function='test') + ) + node.interfaces[interface.name] = interface + context.model.node.update(node) + + yield context + storage.release_sqlite_storage(context.model) + + +class TestOperationTask(object): + + def _create_node_operation_task(self, ctx, node): + with workflow_context.current.push(ctx): + api_task = api.task.OperationTask( + node, + interface_name=NODE_INTERFACE_NAME, + operation_name=NODE_OPERATION_NAME) + model_task = models.Task.from_api_task(api_task, None) + return api_task, model_task + + def _create_relationship_operation_task(self, ctx, relationship): + with workflow_context.current.push(ctx): + api_task = api.task.OperationTask( + relationship, + interface_name=RELATIONSHIP_INTERFACE_NAME, + operation_name=RELATIONSHIP_OPERATION_NAME) + core_task = models.Task.from_api_task(api_task, None) + return api_task, core_task + + def test_node_operation_task_creation(self, ctx): + storage_plugin = mock.models.create_plugin('p1', '0.1') + storage_plugin_other = mock.models.create_plugin('p0', '0.0') + ctx.model.plugin.put(storage_plugin) + ctx.model.plugin.put(storage_plugin_other) + node = ctx.model.node.get_by_name(mock.models.DEPENDENCY_NODE_NAME) + interface = mock.models.create_interface( + node.service, + NODE_INTERFACE_NAME, + NODE_OPERATION_NAME, + operation_kwargs=dict(plugin=storage_plugin, function='test') + ) + node.interfaces[interface.name] = interface + ctx.model.node.update(node) + api_task, model_task = self._create_node_operation_task(ctx, node) + assert model_task.name == api_task.name + assert model_task.function == api_task.function + assert model_task.actor == api_task.actor == node + assert model_task.arguments == api_task.arguments + assert model_task.plugin == storage_plugin + + def test_relationship_operation_task_creation(self, ctx): + relationship = ctx.model.relationship.list()[0] + ctx.model.relationship.update(relationship) + _, model_task = self._create_relationship_operation_task( + ctx, relationship) + assert model_task.actor == relationship + + @pytest.mark.skip("Currently not supported for model tasks") + def test_operation_task_edit_locked_attribute(self, ctx): + node = ctx.model.node.get_by_name(mock.models.DEPENDENCY_NODE_NAME) + + _, core_task = self._create_node_operation_task(ctx, node) + now = datetime.utcnow() + with pytest.raises(exceptions.TaskException): + core_task.status = core_task.STARTED + with pytest.raises(exceptions.TaskException): + core_task.started_at = now + with pytest.raises(exceptions.TaskException): + core_task.ended_at = now + with pytest.raises(exceptions.TaskException): + core_task.attempts_count = 2 + with pytest.raises(exceptions.TaskException): + core_task.due_at = now + + @pytest.mark.skip("Currently not supported for model tasks") + def test_operation_task_edit_attributes(self, ctx): + node = ctx.model.node.get_by_name(mock.models.DEPENDENCY_NODE_NAME) + + _, core_task = self._create_node_operation_task(ctx, node) + future_time = datetime.utcnow() + timedelta(seconds=3) + + with core_task._update(): + core_task.status = core_task.STARTED + core_task.started_at = future_time + core_task.ended_at = future_time + core_task.attempts_count = 2 + core_task.due_at = future_time + assert core_task.status != core_task.STARTED + assert core_task.started_at != future_time + assert core_task.ended_at != future_time + assert core_task.attempts_count != 2 + assert core_task.due_at != future_time + + assert core_task.status == core_task.STARTED + assert core_task.started_at == future_time + assert core_task.ended_at == future_time + assert core_task.attempts_count == 2 + assert core_task.due_at == future_time diff --git a/azure/aria/aria-extension-cloudify/src/aria/tests/orchestrator/workflows/core/test_task_graph_into_execution_graph.py b/azure/aria/aria-extension-cloudify/src/aria/tests/orchestrator/workflows/core/test_task_graph_into_execution_graph.py new file mode 100644 index 0000000..e24c901 --- /dev/null +++ b/azure/aria/aria-extension-cloudify/src/aria/tests/orchestrator/workflows/core/test_task_graph_into_execution_graph.py @@ -0,0 +1,172 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from networkx import topological_sort, DiGraph + +from aria.modeling import models +from aria.orchestrator import context +from aria.orchestrator.workflows import api +from aria.orchestrator.workflows.core import graph_compiler +from aria.orchestrator.workflows.executor import base +from tests import mock +from tests import storage + + +def test_task_graph_into_execution_graph(tmpdir): + interface_name = 'Standard' + op1_name, op2_name, op3_name = 'create', 'configure', 'start' + workflow_context = mock.context.simple(str(tmpdir)) + node = workflow_context.model.node.get_by_name(mock.models.DEPENDENCY_NODE_NAME) + interface = mock.models.create_interface( + node.service, + interface_name, + op1_name, + operation_kwargs=dict(function='test') + ) + interface.operations[op2_name] = mock.models.create_operation(op2_name) # pylint: disable=unsubscriptable-object + interface.operations[op3_name] = mock.models.create_operation(op3_name) # pylint: disable=unsubscriptable-object + node.interfaces[interface.name] = interface + workflow_context.model.node.update(node) + + def sub_workflow(name, **_): + return api.task_graph.TaskGraph(name) + + with context.workflow.current.push(workflow_context): + test_task_graph = api.task.WorkflowTask(sub_workflow, name='test_task_graph') + simple_before_task = api.task.OperationTask( + node, + interface_name=interface_name, + operation_name=op1_name) + simple_after_task = api.task.OperationTask( + node, + interface_name=interface_name, + operation_name=op1_name) + + inner_task_graph = api.task.WorkflowTask(sub_workflow, name='test_inner_task_graph') + inner_task_1 = api.task.OperationTask( + node, + interface_name=interface_name, + operation_name=op1_name) + inner_task_2 = api.task.OperationTask( + node, + interface_name=interface_name, + operation_name=op2_name) + inner_task_3 = api.task.OperationTask( + node, + interface_name=interface_name, + operation_name=op3_name) + inner_task_graph.add_tasks(inner_task_1) + inner_task_graph.add_tasks(inner_task_2) + inner_task_graph.add_tasks(inner_task_3) + inner_task_graph.add_dependency(inner_task_2, inner_task_1) + inner_task_graph.add_dependency(inner_task_3, inner_task_1) + inner_task_graph.add_dependency(inner_task_3, inner_task_2) + + test_task_graph.add_tasks(simple_before_task) + test_task_graph.add_tasks(simple_after_task) + test_task_graph.add_tasks(inner_task_graph) + test_task_graph.add_dependency(inner_task_graph, simple_before_task) + test_task_graph.add_dependency(simple_after_task, inner_task_graph) + + compiler = graph_compiler.GraphCompiler(workflow_context, base.StubTaskExecutor) + compiler.compile(test_task_graph) + + execution_tasks = topological_sort(_graph(workflow_context.execution.tasks)) + + assert len(execution_tasks) == 9 + + expected_tasks_names = [ + '{0}-Start'.format(test_task_graph.id), + simple_before_task.id, + '{0}-Start'.format(inner_task_graph.id), + inner_task_1.id, + inner_task_2.id, + inner_task_3.id, + '{0}-End'.format(inner_task_graph.id), + simple_after_task.id, + '{0}-End'.format(test_task_graph.id) + ] + + assert expected_tasks_names == [compiler._model_to_api_id[t.id] for t in execution_tasks] + assert all(isinstance(task, models.Task) for task in execution_tasks) + execution_tasks = iter(execution_tasks) + + _assert_tasks( + iter(execution_tasks), + iter([simple_after_task, inner_task_1, inner_task_2, inner_task_3, simple_after_task]) + ) + storage.release_sqlite_storage(workflow_context.model) + + +def _assert_tasks(execution_tasks, api_tasks): + start_workflow_exec_task = next(execution_tasks) + assert start_workflow_exec_task._stub_type == models.Task.START_WORKFLOW + + before_exec_task = next(execution_tasks) + simple_before_task = next(api_tasks) + _assert_execution_is_api_task(before_exec_task, simple_before_task) + assert before_exec_task.dependencies == [start_workflow_exec_task] + + start_subworkflow_exec_task = next(execution_tasks) + assert start_subworkflow_exec_task._stub_type == models.Task.START_SUBWROFKLOW + assert start_subworkflow_exec_task.dependencies == [before_exec_task] + + inner_exec_task_1 = next(execution_tasks) + inner_task_1 = next(api_tasks) + _assert_execution_is_api_task(inner_exec_task_1, inner_task_1) + assert inner_exec_task_1.dependencies == [start_subworkflow_exec_task] + + inner_exec_task_2 = next(execution_tasks) + inner_task_2 = next(api_tasks) + _assert_execution_is_api_task(inner_exec_task_2, inner_task_2) + assert inner_exec_task_2.dependencies == [inner_exec_task_1] + + inner_exec_task_3 = next(execution_tasks) + inner_task_3 = next(api_tasks) + _assert_execution_is_api_task(inner_exec_task_3, inner_task_3) + assert sorted(inner_exec_task_3.dependencies) == sorted([inner_exec_task_1, inner_exec_task_2]) + + end_subworkflow_exec_task = next(execution_tasks) + assert end_subworkflow_exec_task._stub_type == models.Task.END_SUBWORKFLOW + assert end_subworkflow_exec_task.dependencies == [inner_exec_task_3] + + after_exec_task = next(execution_tasks) + simple_after_task = next(api_tasks) + _assert_execution_is_api_task(after_exec_task, simple_after_task) + assert after_exec_task.dependencies == [end_subworkflow_exec_task] + + end_workflow_exec_task = next(execution_tasks) + assert end_workflow_exec_task._stub_type == models.Task.END_WORKFLOW + assert end_workflow_exec_task.dependencies == [after_exec_task] + + +def _assert_execution_is_api_task(execution_task, api_task): + assert execution_task.name == api_task.name + assert execution_task.function == api_task.function + assert execution_task.actor == api_task.actor + assert execution_task.arguments == api_task.arguments + + +def _get_task_by_name(task_name, graph): + return graph.node[task_name]['task'] + + +def _graph(tasks): + graph = DiGraph() + for task in tasks: + for dependency in task.dependencies: + graph.add_edge(dependency, task) + + return graph diff --git a/azure/aria/aria-extension-cloudify/src/aria/tests/orchestrator/workflows/executor/__init__.py b/azure/aria/aria-extension-cloudify/src/aria/tests/orchestrator/workflows/executor/__init__.py new file mode 100644 index 0000000..99d0b39 --- /dev/null +++ b/azure/aria/aria-extension-cloudify/src/aria/tests/orchestrator/workflows/executor/__init__.py @@ -0,0 +1,98 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +import logging +import uuid +from contextlib import contextmanager + +import aria +from aria.modeling import models +from aria.orchestrator.context.common import BaseContext + + +class MockContext(object): + + INSTRUMENTATION_FIELDS = BaseContext.INSTRUMENTATION_FIELDS + + def __init__(self, storage, task_kwargs=None): + self.logger = logging.getLogger('mock_logger') + self._task_kwargs = task_kwargs or {} + self._storage = storage + self.task = MockTask(storage, **task_kwargs) + self.states = [] + self.exception = None + + @property + def serialization_dict(self): + return { + 'context_cls': self.__class__, + 'context': { + 'storage_kwargs': self._storage.serialization_dict, + 'task_kwargs': self._task_kwargs + } + } + + def __getattr__(self, item): + return None + + def close(self): + pass + + @property + def model(self): + return self._storage + + @classmethod + def instantiate_from_dict(cls, storage_kwargs=None, task_kwargs=None): + return cls(storage=aria.application_model_storage(**(storage_kwargs or {})), + task_kwargs=(task_kwargs or {})) + + @property + @contextmanager + def persist_changes(self): + yield + + +class MockActor(object): + def __init__(self): + self.name = 'actor_name' + + +class MockTask(object): + + INFINITE_RETRIES = models.Task.INFINITE_RETRIES + + def __init__(self, model, function, arguments=None, plugin_fk=None): + self.function = self.name = function + self.plugin_fk = plugin_fk + self.arguments = arguments or {} + self.states = [] + self.exception = None + self.id = str(uuid.uuid4()) + self.logger = logging.getLogger() + self.attempts_count = 1 + self.max_attempts = 1 + self.ignore_failure = False + self.interface_name = 'interface_name' + self.operation_name = 'operation_name' + self.actor = MockActor() + self.node = self.actor + self.model = model + + for state in models.Task.STATES: + setattr(self, state.upper(), state) + + @property + def plugin(self): + return self.model.plugin.get(self.plugin_fk) if self.plugin_fk else None diff --git a/azure/aria/aria-extension-cloudify/src/aria/tests/orchestrator/workflows/executor/test_executor.py b/azure/aria/aria-extension-cloudify/src/aria/tests/orchestrator/workflows/executor/test_executor.py new file mode 100644 index 0000000..32a68e0 --- /dev/null +++ b/azure/aria/aria-extension-cloudify/src/aria/tests/orchestrator/workflows/executor/test_executor.py @@ -0,0 +1,149 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +import pytest +import retrying + +try: + import celery as _celery + app = _celery.Celery() + app.conf.update(CELERY_RESULT_BACKEND='amqp://') +except ImportError: + _celery = None + app = None + +import aria +from aria.modeling import models +from aria.orchestrator import events +from aria.orchestrator.workflows.executor import ( + thread, + process, + # celery +) + +import tests +from . import MockContext + + +def _get_function(func): + return '{module}.{func.__name__}'.format(module=__name__, func=func) + + +def execute_and_assert(executor, storage=None): + expected_value = 'value' + successful_task = MockContext( + storage, task_kwargs=dict(function=_get_function(mock_successful_task)) + ) + failing_task = MockContext( + storage, task_kwargs=dict(function=_get_function(mock_failing_task)) + ) + task_with_inputs = MockContext( + storage, + task_kwargs=dict(function=_get_function(mock_task_with_input), + arguments={'input': models.Argument.wrap('input', 'value')}) + ) + + for task in [successful_task, failing_task, task_with_inputs]: + executor.execute(task) + + @retrying.retry(stop_max_delay=10000, wait_fixed=100) + def assertion(): + assert successful_task.states == ['start', 'success'] + assert failing_task.states == ['start', 'failure'] + assert task_with_inputs.states == ['start', 'failure'] + assert isinstance(failing_task.exception, MockException) + assert isinstance(task_with_inputs.exception, MockException) + assert task_with_inputs.exception.message == expected_value + assertion() + + +def test_thread_execute(thread_executor): + execute_and_assert(thread_executor) + + +def test_process_execute(process_executor, storage): + execute_and_assert(process_executor, storage) + + +def mock_successful_task(**_): + pass + + +def mock_failing_task(**_): + raise MockException + + +def mock_task_with_input(input, **_): + raise MockException(input) + +if app: + mock_successful_task = app.task(mock_successful_task) + mock_failing_task = app.task(mock_failing_task) + mock_task_with_input = app.task(mock_task_with_input) + + +class MockException(Exception): + pass + + +@pytest.fixture +def storage(tmpdir): + _storage = aria.application_model_storage(aria.storage.sql_mapi.SQLAlchemyModelAPI, + initiator_kwargs=dict(base_dir=str(tmpdir))) + yield _storage + tests.storage.release_sqlite_storage(_storage) + + +@pytest.fixture(params=[ + (thread.ThreadExecutor, {'pool_size': 1}), + (thread.ThreadExecutor, {'pool_size': 2}), + # subprocess needs to load a tests module so we explicitly add the root directory as if + # the project has been installed in editable mode + # (celery.CeleryExecutor, {'app': app}) +]) +def thread_executor(request): + executor_cls, executor_kwargs = request.param + result = executor_cls(**executor_kwargs) + yield result + result.close() + + +@pytest.fixture +def process_executor(): + result = process.ProcessExecutor(python_path=tests.ROOT_DIR) + yield result + result.close() + + +@pytest.fixture(autouse=True) +def register_signals(): + def start_handler(task, *args, **kwargs): + task.states.append('start') + + def success_handler(task, *args, **kwargs): + task.states.append('success') + + def failure_handler(task, exception, *args, **kwargs): + task.states.append('failure') + task.exception = exception + + events.start_task_signal.connect(start_handler) + events.on_success_task_signal.connect(success_handler) + events.on_failure_task_signal.connect(failure_handler) + yield + events.start_task_signal.disconnect(start_handler) + events.on_success_task_signal.disconnect(success_handler) + events.on_failure_task_signal.disconnect(failure_handler) diff --git a/azure/aria/aria-extension-cloudify/src/aria/tests/orchestrator/workflows/executor/test_process_executor.py b/azure/aria/aria-extension-cloudify/src/aria/tests/orchestrator/workflows/executor/test_process_executor.py new file mode 100644 index 0000000..e050d18 --- /dev/null +++ b/azure/aria/aria-extension-cloudify/src/aria/tests/orchestrator/workflows/executor/test_process_executor.py @@ -0,0 +1,172 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import os +import sys +import time +import Queue +import subprocess + +import pytest +import psutil +import retrying + +import aria + +from aria import operation +from aria.modeling import models +from aria.orchestrator import events +from aria.utils.plugin import create as create_plugin +from aria.orchestrator.workflows.executor import process + +import tests.storage +import tests.resources +from tests.helpers import FilesystemDataHolder +from tests.fixtures import ( # pylint: disable=unused-import + plugins_dir, + plugin_manager, +) +from . import MockContext + + +class TestProcessExecutor(object): + + def test_plugin_execution(self, executor, mock_plugin, model, queue): + ctx = MockContext( + model, + task_kwargs=dict(function='mock_plugin1.operation', plugin_fk=mock_plugin.id) + ) + + executor.execute(ctx) + error = queue.get(timeout=60) + # tests/resources/plugins/mock-plugin1 is the plugin installed + # during this tests setup. The module mock_plugin1 contains a single + # operation named "operation" which calls an entry point defined in the plugin's + # setup.py. This entry points simply prints 'mock-plugin-output' to stdout. + # The "operation" operation that called this subprocess, then raises a RuntimeError + # with that subprocess output as the error message. + # This is what we assert here. This tests checks that both the PYTHONPATH (operation) + # and PATH (entry point) are properly updated in the subprocess in which the task is + # running. + assert isinstance(error, RuntimeError) + assert error.message == 'mock-plugin-output' + + def test_closed(self, executor, model): + executor.close() + with pytest.raises(RuntimeError) as exc_info: + executor.execute(MockContext(model, task_kwargs=dict(function='some.function'))) + assert 'closed' in exc_info.value.message + + def test_process_termination(self, executor, model, fs_test_holder, tmpdir): + freeze_script_path = str(tmpdir.join('freeze_script')) + with open(freeze_script_path, 'w+b') as f: + f.write( + '''import time +while True: + time.sleep(5) + ''' + ) + holder_path_argument = models.Argument.wrap('holder_path', fs_test_holder._path) + script_path_argument = models.Argument.wrap('freezing_script_path', + str(tmpdir.join('freeze_script'))) + + model.argument.put(holder_path_argument) + model.argument.put(script_path_argument) + ctx = MockContext( + model, + task_kwargs=dict( + function='{0}.{1}'.format(__name__, freezing_task.__name__), + arguments=dict(holder_path=holder_path_argument, + freezing_script_path=script_path_argument)), + ) + + executor.execute(ctx) + + @retrying.retry(retry_on_result=lambda r: r is False, stop_max_delay=60000, wait_fixed=500) + def wait_for_extra_process_id(): + return fs_test_holder.get('subproc', False) + + task_pid = executor._tasks[ctx.task.id].proc.pid + extra_process_pid = wait_for_extra_process_id() + + assert set([task_pid, extra_process_pid]).issubset(set(psutil.pids())) + executor.terminate(ctx.task.id) + + # Give a chance to the processes to terminate + time.sleep(2) + + # all processes should be either zombies or non existent + pids = [task_pid, extra_process_pid] + for pid in pids: + if pid in psutil.pids(): + assert psutil.Process(pid).status() == psutil.STATUS_ZOMBIE + else: + # making the test more readable + assert pid not in psutil.pids() + + +@pytest.fixture +def queue(): + _queue = Queue.Queue() + + def handler(_, exception=None, **kwargs): + _queue.put(exception) + + events.on_success_task_signal.connect(handler) + events.on_failure_task_signal.connect(handler) + try: + yield _queue + finally: + events.on_success_task_signal.disconnect(handler) + events.on_failure_task_signal.disconnect(handler) + + +@pytest.fixture +def fs_test_holder(tmpdir): + dataholder_path = str(tmpdir.join('dataholder')) + holder = FilesystemDataHolder(dataholder_path) + return holder + + +@pytest.fixture +def executor(plugin_manager): + result = process.ProcessExecutor(plugin_manager=plugin_manager, python_path=[tests.ROOT_DIR]) + try: + yield result + finally: + result.close() + + +@pytest.fixture +def mock_plugin(plugin_manager, tmpdir): + source = os.path.join(tests.resources.DIR, 'plugins', 'mock-plugin1') + plugin_path = create_plugin(source=source, destination_dir=str(tmpdir)) + return plugin_manager.install(source=plugin_path) + + +@pytest.fixture +def model(tmpdir): + _storage = aria.application_model_storage(aria.storage.sql_mapi.SQLAlchemyModelAPI, + initiator_kwargs=dict(base_dir=str(tmpdir))) + yield _storage + tests.storage.release_sqlite_storage(_storage) + + +@operation +def freezing_task(holder_path, freezing_script_path, **_): + holder = FilesystemDataHolder(holder_path) + holder['subproc'] = subprocess.Popen([sys.executable, freezing_script_path], shell=True).pid + while True: + time.sleep(5) diff --git a/azure/aria/aria-extension-cloudify/src/aria/tests/orchestrator/workflows/executor/test_process_executor_concurrent_modifications.py b/azure/aria/aria-extension-cloudify/src/aria/tests/orchestrator/workflows/executor/test_process_executor_concurrent_modifications.py new file mode 100644 index 0000000..86a2edf --- /dev/null +++ b/azure/aria/aria-extension-cloudify/src/aria/tests/orchestrator/workflows/executor/test_process_executor_concurrent_modifications.py @@ -0,0 +1,167 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +import time + +import fasteners +import pytest + +from aria.orchestrator import events +from aria.orchestrator.workflows.exceptions import ExecutorException +from aria.orchestrator.workflows import api +from aria.orchestrator.workflows.executor import process +from aria.orchestrator import workflow, operation + +import tests +from tests.orchestrator.context import execute as execute_workflow +from tests.orchestrator.workflows.helpers import events_collector +from tests import mock +from tests import storage +from tests import helpers + + +@pytest.fixture +def dataholder(tmpdir): + dataholder_path = str(tmpdir.join('dataholder')) + holder = helpers.FilesystemDataHolder(dataholder_path) + return holder + + +def test_concurrent_modification_on_task_succeeded(context, executor, lock_files, dataholder): + _test(context, executor, lock_files, _test_task_succeeded, dataholder, expected_failure=False) + + +@operation +def _test_task_succeeded(ctx, lock_files, key, first_value, second_value, holder_path): + _concurrent_update(lock_files, ctx.node, key, first_value, second_value, holder_path) + + +def test_concurrent_modification_on_task_failed(context, executor, lock_files, dataholder): + _test(context, executor, lock_files, _test_task_failed, dataholder, expected_failure=True) + + +@operation +def _test_task_failed(ctx, lock_files, key, first_value, second_value, holder_path): + first = _concurrent_update(lock_files, ctx.node, key, first_value, second_value, holder_path) + if not first: + raise RuntimeError('MESSAGE') + + +def _test(context, executor, lock_files, func, dataholder, expected_failure): + def _node(ctx): + return ctx.model.node.get_by_name(mock.models.DEPENDENCY_NODE_NAME) + + interface_name, operation_name = mock.operations.NODE_OPERATIONS_INSTALL[0] + + key = 'key' + first_value = 'value1' + second_value = 'value2' + arguments = { + 'lock_files': lock_files, + 'key': key, + 'first_value': first_value, + 'second_value': second_value, + 'holder_path': dataholder.path + } + + node = _node(context) + interface = mock.models.create_interface( + node.service, + interface_name, + operation_name, + operation_kwargs=dict(function='{0}.{1}'.format(__name__, func.__name__), + arguments=arguments) + ) + node.interfaces[interface.name] = interface + context.model.node.update(node) + + @workflow + def mock_workflow(graph, **_): + graph.add_tasks( + api.task.OperationTask( + node, + interface_name=interface_name, + operation_name=operation_name, + arguments=arguments), + api.task.OperationTask( + node, + interface_name=interface_name, + operation_name=operation_name, + arguments=arguments) + ) + + signal = events.on_failure_task_signal + with events_collector(signal) as collected: + try: + execute_workflow(mock_workflow, context, executor) + except ExecutorException: + pass + + props = _node(context).attributes + assert dataholder['invocations'] == 2 + assert props[key].value == dataholder[key] + + exceptions = [event['kwargs']['exception'] for event in collected.get(signal, [])] + if expected_failure: + assert exceptions + + +@pytest.fixture +def executor(): + result = process.ProcessExecutor(python_path=[tests.ROOT_DIR]) + try: + yield result + finally: + result.close() + + +@pytest.fixture +def context(tmpdir): + result = mock.context.simple(str(tmpdir)) + yield result + storage.release_sqlite_storage(result.model) + + +@pytest.fixture +def lock_files(tmpdir): + return str(tmpdir.join('first_lock_file')), str(tmpdir.join('second_lock_file')) + + +def _concurrent_update(lock_files, node, key, first_value, second_value, holder_path): + holder = helpers.FilesystemDataHolder(holder_path) + locker1 = fasteners.InterProcessLock(lock_files[0]) + locker2 = fasteners.InterProcessLock(lock_files[1]) + + first = locker1.acquire(blocking=False) + + if first: + # Give chance for both processes to acquire locks + while locker2.acquire(blocking=False): + locker2.release() + time.sleep(0.1) + else: + locker2.acquire() + + node.attributes[key] = first_value if first else second_value + holder['key'] = first_value if first else second_value + holder.setdefault('invocations', 0) + holder['invocations'] += 1 + + if first: + locker1.release() + else: + with locker1: + locker2.release() + + return first diff --git a/azure/aria/aria-extension-cloudify/src/aria/tests/orchestrator/workflows/executor/test_process_executor_extension.py b/azure/aria/aria-extension-cloudify/src/aria/tests/orchestrator/workflows/executor/test_process_executor_extension.py new file mode 100644 index 0000000..b26fa43 --- /dev/null +++ b/azure/aria/aria-extension-cloudify/src/aria/tests/orchestrator/workflows/executor/test_process_executor_extension.py @@ -0,0 +1,99 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import pytest + +from aria import extension +from aria.orchestrator.workflows import api +from aria.orchestrator.workflows.core import engine, graph_compiler +from aria.orchestrator.workflows.executor import process +from aria.orchestrator import workflow, operation + +import tests +from tests import mock +from tests import storage + + +def test_decorate_extension(context, executor): + arguments = {'arg1': 1, 'arg2': 2} + + def get_node(ctx): + return ctx.model.node.get_by_name(mock.models.DEPENDENCY_NODE_NAME) + + node = get_node(context) + interface_name = 'test_interface' + operation_name = 'operation' + interface = mock.models.create_interface( + context.service, + interface_name, + operation_name, + operation_kwargs=dict(function='{0}.{1}'.format(__name__, _mock_operation.__name__), + arguments=arguments) + ) + node.interfaces[interface.name] = interface + context.model.node.update(node) + + + @workflow + def mock_workflow(ctx, graph): + node = get_node(ctx) + task = api.task.OperationTask( + node, + interface_name=interface_name, + operation_name=operation_name, + arguments=arguments) + graph.add_tasks(task) + return graph + graph = mock_workflow(ctx=context) # pylint: disable=no-value-for-parameter + graph_compiler.GraphCompiler(context, executor.__class__).compile(graph) + eng = engine.Engine({executor.__class__: executor}) + eng.execute(context) + out = get_node(context).attributes.get('out').value + assert out['wrapper_arguments'] == arguments + assert out['function_arguments'] == arguments + + +@extension.process_executor +class MockProcessExecutorExtension(object): + + def decorate(self): + def decorator(function): + def wrapper(ctx, **operation_arguments): + with ctx.model.instrument(ctx.model.node.model_cls.attributes): + ctx.node.attributes['out'] = {'wrapper_arguments': operation_arguments} + function(ctx=ctx, **operation_arguments) + return wrapper + return decorator + + +@operation +def _mock_operation(ctx, **operation_arguments): + ctx.node.attributes['out']['function_arguments'] = operation_arguments + + +@pytest.fixture +def executor(): + result = process.ProcessExecutor(python_path=[tests.ROOT_DIR]) + try: + yield result + finally: + result.close() + + +@pytest.fixture +def context(tmpdir): + result = mock.context.simple(str(tmpdir)) + yield result + storage.release_sqlite_storage(result.model) diff --git a/azure/aria/aria-extension-cloudify/src/aria/tests/orchestrator/workflows/executor/test_process_executor_tracked_changes.py b/azure/aria/aria-extension-cloudify/src/aria/tests/orchestrator/workflows/executor/test_process_executor_tracked_changes.py new file mode 100644 index 0000000..47ee2f7 --- /dev/null +++ b/azure/aria/aria-extension-cloudify/src/aria/tests/orchestrator/workflows/executor/test_process_executor_tracked_changes.py @@ -0,0 +1,168 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import copy + +import pytest + +from aria.orchestrator.workflows import api +from aria.orchestrator.workflows.core import engine, graph_compiler +from aria.orchestrator.workflows.executor import process +from aria.orchestrator import workflow, operation +from aria.orchestrator.workflows import exceptions + +import tests +from tests import mock +from tests import storage + + +_TEST_ATTRIBUTES = { + 'some': 'values', 'that': 'are', 'most': 'likely', 'only': 'set', 'here': 'yo' +} + + +def test_track_changes_of_successful_operation(context, executor): + _run_workflow(context=context, executor=executor, op_func=_mock_success_operation) + _assert_tracked_changes_are_applied(context) + + +def test_track_changes_of_failed_operation(context, executor): + with pytest.raises(exceptions.ExecutorException): + _run_workflow(context=context, executor=executor, op_func=_mock_fail_operation) + _assert_tracked_changes_are_applied(context) + + +def _assert_tracked_changes_are_applied(context): + instance = context.model.node.get_by_name(mock.models.DEPENDENCY_NODE_NAME) + assert all(instance.attributes[key].value == value + for key, value in _TEST_ATTRIBUTES.items()) + + +def _update_attributes(context): + context.node.attributes.clear() + context.node.attributes.update(_TEST_ATTRIBUTES) + + +def test_refresh_state_of_tracked_attributes(context, executor): + out = _run_workflow(context=context, executor=executor, op_func=_mock_refreshing_operation) + assert out['after_refresh'] == out['after_change'] + assert out['initial'] != out['after_change'] + + +def test_apply_tracked_changes_during_an_operation(context, executor): + arguments = { + 'committed': {'some': 'new', 'properties': 'right here'}, + 'changed_but_refreshed': {'some': 'newer', 'properties': 'right there'} + } + + expected_initial = context.model.node.get_by_name(mock.models.DEPENDENCY_NODE_NAME).attributes + out = _run_workflow( + context=context, executor=executor, op_func=_mock_updating_operation, arguments=arguments) + + expected_after_update = expected_initial.copy() + expected_after_update.update(arguments['committed']) # pylint: disable=no-member + expected_after_change = expected_after_update.copy() + expected_after_change.update(arguments['changed_but_refreshed']) # pylint: disable=no-member + + assert out['initial'] == expected_initial + assert out['after_update'] == expected_after_update + assert out['after_change'] == expected_after_change + assert out['after_refresh'] == expected_after_change + + +def _run_workflow(context, executor, op_func, arguments=None): + node = context.model.node.get_by_name(mock.models.DEPENDENCY_NODE_NAME) + interface_name = 'test_interface' + operation_name = 'operation' + wf_arguments = arguments or {} + interface = mock.models.create_interface( + context.service, + interface_name, + operation_name, + operation_kwargs=dict(function=_operation_mapping(op_func), + arguments=wf_arguments) + ) + node.interfaces[interface.name] = interface + context.model.node.update(node) + + @workflow + def mock_workflow(ctx, graph): + task = api.task.OperationTask( + node, + interface_name=interface_name, + operation_name=operation_name, + arguments=wf_arguments) + graph.add_tasks(task) + return graph + graph = mock_workflow(ctx=context) # pylint: disable=no-value-for-parameter + graph_compiler.GraphCompiler(context, executor.__class__).compile(graph) + eng = engine.Engine({executor.__class__: executor}) + eng.execute(context) + out = context.model.node.get_by_name(mock.models.DEPENDENCY_NODE_NAME).attributes.get('out') + return out.value if out else None + + +@operation +def _mock_success_operation(ctx): + _update_attributes(ctx) + + +@operation +def _mock_fail_operation(ctx): + _update_attributes(ctx) + raise RuntimeError + + +@operation +def _mock_refreshing_operation(ctx): + out = {'initial': copy.deepcopy(ctx.node.attributes)} + ctx.node.attributes.update({'some': 'new', 'properties': 'right here'}) + out['after_change'] = copy.deepcopy(ctx.node.attributes) + ctx.model.node.refresh(ctx.node) + out['after_refresh'] = copy.deepcopy(ctx.node.attributes) + ctx.node.attributes['out'] = out + + +@operation +def _mock_updating_operation(ctx, committed, changed_but_refreshed): + out = {'initial': copy.deepcopy(ctx.node.attributes)} + ctx.node.attributes.update(committed) + ctx.model.node.update(ctx.node) + out['after_update'] = copy.deepcopy(ctx.node.attributes) + ctx.node.attributes.update(changed_but_refreshed) + out['after_change'] = copy.deepcopy(ctx.node.attributes) + ctx.model.node.refresh(ctx.node) + out['after_refresh'] = copy.deepcopy(ctx.node.attributes) + ctx.node.attributes['out'] = out + + +def _operation_mapping(func): + return '{name}.{func.__name__}'.format(name=__name__, func=func) + + +@pytest.fixture +def executor(): + result = process.ProcessExecutor(python_path=[tests.ROOT_DIR]) + try: + yield result + finally: + result.close() + + +@pytest.fixture +def context(tmpdir): + result = mock.context.simple(str(tmpdir)) + yield result + storage.release_sqlite_storage(result.model) diff --git a/azure/aria/aria-extension-cloudify/src/aria/tests/orchestrator/workflows/helpers.py b/azure/aria/aria-extension-cloudify/src/aria/tests/orchestrator/workflows/helpers.py new file mode 100644 index 0000000..8e3f9b1 --- /dev/null +++ b/azure/aria/aria-extension-cloudify/src/aria/tests/orchestrator/workflows/helpers.py @@ -0,0 +1,37 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from contextlib import contextmanager + + +@contextmanager +def events_collector(*signals): + handlers = {} + collected = {} + + def handler_factory(key): + def handler(*args, **kwargs): + signal_events = collected.setdefault(key, []) + signal_events.append({'args': args, 'kwargs': kwargs}) + handlers[signal] = handler + return handler + + for signal in signals: + signal.connect(handler_factory(signal)) + try: + yield collected + finally: + for signal in signals: + signal.disconnect(handlers[signal]) |