diff options
Diffstat (limited to 'azure/aria/aria-extension-cloudify/src/aria/tests/orchestrator/workflows/core')
5 files changed, 1074 insertions, 0 deletions
diff --git a/azure/aria/aria-extension-cloudify/src/aria/tests/orchestrator/workflows/core/__init__.py b/azure/aria/aria-extension-cloudify/src/aria/tests/orchestrator/workflows/core/__init__.py new file mode 100644 index 0000000..ae1e83e --- /dev/null +++ b/azure/aria/aria-extension-cloudify/src/aria/tests/orchestrator/workflows/core/__init__.py @@ -0,0 +1,14 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. diff --git a/azure/aria/aria-extension-cloudify/src/aria/tests/orchestrator/workflows/core/test_engine.py b/azure/aria/aria-extension-cloudify/src/aria/tests/orchestrator/workflows/core/test_engine.py new file mode 100644 index 0000000..0c704f5 --- /dev/null +++ b/azure/aria/aria-extension-cloudify/src/aria/tests/orchestrator/workflows/core/test_engine.py @@ -0,0 +1,564 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +import time +import threading +from datetime import datetime + +import pytest + +from aria.orchestrator import ( + events, + workflow, + operation, +) +from aria.modeling import models +from aria.orchestrator.workflows import ( + api, + exceptions, +) +from aria.orchestrator.workflows.core import engine, graph_compiler +from aria.orchestrator.workflows.executor import thread + +from tests import mock, storage + + +global_test_holder = {} + + +class BaseTest(object): + + @classmethod + def _execute(cls, workflow_func, workflow_context, executor): + eng = cls._engine(workflow_func=workflow_func, + workflow_context=workflow_context, + executor=executor) + eng.execute(ctx=workflow_context) + return eng + + @staticmethod + def _engine(workflow_func, workflow_context, executor): + graph = workflow_func(ctx=workflow_context) + graph_compiler.GraphCompiler(workflow_context, executor.__class__).compile(graph) + + return engine.Engine(executors={executor.__class__: executor}) + + @staticmethod + def _create_interface(ctx, func, arguments=None): + node = ctx.model.node.get_by_name(mock.models.DEPENDENCY_NODE_NAME) + interface_name = 'aria.interfaces.lifecycle' + operation_kwargs = dict(function='{name}.{func.__name__}'.format( + name=__name__, func=func)) + if arguments: + # the operation has to declare the arguments before those may be passed + operation_kwargs['arguments'] = arguments + operation_name = 'create' + interface = mock.models.create_interface(node.service, interface_name, operation_name, + operation_kwargs=operation_kwargs) + node.interfaces[interface.name] = interface + ctx.model.node.update(node) + + return node, interface_name, operation_name + + @staticmethod + def _op(node, + operation_name, + arguments=None, + max_attempts=None, + retry_interval=None, + ignore_failure=None): + + return api.task.OperationTask( + node, + interface_name='aria.interfaces.lifecycle', + operation_name=operation_name, + arguments=arguments, + max_attempts=max_attempts, + retry_interval=retry_interval, + ignore_failure=ignore_failure, + ) + + @pytest.fixture(autouse=True) + def globals_cleanup(self): + try: + yield + finally: + global_test_holder.clear() + + @pytest.fixture(autouse=True) + def signals_registration(self, ): + def sent_task_handler(ctx, *args, **kwargs): + if ctx.task._stub_type is None: + calls = global_test_holder.setdefault('sent_task_signal_calls', 0) + global_test_holder['sent_task_signal_calls'] = calls + 1 + + def start_workflow_handler(workflow_context, *args, **kwargs): + workflow_context.states.append('start') + + def success_workflow_handler(workflow_context, *args, **kwargs): + workflow_context.states.append('success') + + def failure_workflow_handler(workflow_context, exception, *args, **kwargs): + workflow_context.states.append('failure') + workflow_context.exception = exception + + def cancel_workflow_handler(workflow_context, *args, **kwargs): + workflow_context.states.append('cancel') + + events.start_workflow_signal.connect(start_workflow_handler) + events.on_success_workflow_signal.connect(success_workflow_handler) + events.on_failure_workflow_signal.connect(failure_workflow_handler) + events.on_cancelled_workflow_signal.connect(cancel_workflow_handler) + events.sent_task_signal.connect(sent_task_handler) + try: + yield + finally: + events.start_workflow_signal.disconnect(start_workflow_handler) + events.on_success_workflow_signal.disconnect(success_workflow_handler) + events.on_failure_workflow_signal.disconnect(failure_workflow_handler) + events.on_cancelled_workflow_signal.disconnect(cancel_workflow_handler) + events.sent_task_signal.disconnect(sent_task_handler) + + @pytest.fixture + def executor(self): + result = thread.ThreadExecutor() + try: + yield result + finally: + result.close() + + @pytest.fixture + def workflow_context(self, tmpdir): + workflow_context = mock.context.simple(str(tmpdir)) + workflow_context.states = [] + workflow_context.exception = None + yield workflow_context + storage.release_sqlite_storage(workflow_context.model) + + +class TestEngine(BaseTest): + + def test_empty_graph_execution(self, workflow_context, executor): + @workflow + def mock_workflow(**_): + pass + self._execute(workflow_func=mock_workflow, + workflow_context=workflow_context, + executor=executor) + assert workflow_context.states == ['start', 'success'] + assert workflow_context.exception is None + assert 'sent_task_signal_calls' not in global_test_holder + execution = workflow_context.execution + assert execution.started_at <= execution.ended_at <= datetime.utcnow() + assert execution.error is None + assert execution.status == models.Execution.SUCCEEDED + + def test_single_task_successful_execution(self, workflow_context, executor): + node, _, operation_name = self._create_interface(workflow_context, mock_success_task) + + @workflow + def mock_workflow(ctx, graph): + graph.add_tasks(self._op(node, operation_name)) + self._execute( + workflow_func=mock_workflow, + workflow_context=workflow_context, + executor=executor) + assert workflow_context.states == ['start', 'success'] + assert workflow_context.exception is None + assert global_test_holder.get('sent_task_signal_calls') == 1 + + def test_single_task_failed_execution(self, workflow_context, executor): + node, _, operation_name = self._create_interface(workflow_context, mock_failed_task) + + @workflow + def mock_workflow(ctx, graph): + graph.add_tasks(self._op(node, operation_name)) + with pytest.raises(exceptions.ExecutorException): + self._execute( + workflow_func=mock_workflow, + workflow_context=workflow_context, + executor=executor) + assert workflow_context.states == ['start', 'failure'] + assert isinstance(workflow_context.exception, exceptions.ExecutorException) + assert global_test_holder.get('sent_task_signal_calls') == 1 + execution = workflow_context.execution + assert execution.started_at <= execution.ended_at <= datetime.utcnow() + assert execution.error is not None + assert execution.status == models.Execution.FAILED + + def test_two_tasks_execution_order(self, workflow_context, executor): + node, _, operation_name = self._create_interface( + workflow_context, mock_ordered_task, {'counter': 1}) + + @workflow + def mock_workflow(ctx, graph): + op1 = self._op(node, operation_name, arguments={'counter': 1}) + op2 = self._op(node, operation_name, arguments={'counter': 2}) + graph.sequence(op1, op2) + self._execute( + workflow_func=mock_workflow, + workflow_context=workflow_context, + executor=executor) + assert workflow_context.states == ['start', 'success'] + assert workflow_context.exception is None + assert global_test_holder.get('invocations') == [1, 2] + assert global_test_holder.get('sent_task_signal_calls') == 2 + + def test_stub_and_subworkflow_execution(self, workflow_context, executor): + node, _, operation_name = self._create_interface( + workflow_context, mock_ordered_task, {'counter': 1}) + + @workflow + def sub_workflow(ctx, graph): + op1 = self._op(node, operation_name, arguments={'counter': 1}) + op2 = api.task.StubTask() + op3 = self._op(node, operation_name, arguments={'counter': 2}) + graph.sequence(op1, op2, op3) + + @workflow + def mock_workflow(ctx, graph): + graph.add_tasks(api.task.WorkflowTask(sub_workflow, ctx=ctx)) + self._execute(workflow_func=mock_workflow, + workflow_context=workflow_context, + executor=executor) + assert workflow_context.states == ['start', 'success'] + assert workflow_context.exception is None + assert global_test_holder.get('invocations') == [1, 2] + assert global_test_holder.get('sent_task_signal_calls') == 2 + + +class TestCancel(BaseTest): + + def test_cancel_started_execution(self, workflow_context, executor): + number_of_tasks = 100 + node, _, operation_name = self._create_interface( + workflow_context, mock_sleep_task, {'seconds': 0.1}) + + @workflow + def mock_workflow(ctx, graph): + operations = ( + self._op(node, operation_name, arguments=dict(seconds=0.1)) + for _ in range(number_of_tasks) + ) + return graph.sequence(*operations) + + eng = self._engine(workflow_func=mock_workflow, + workflow_context=workflow_context, + executor=executor) + t = threading.Thread(target=eng.execute, kwargs=dict(ctx=workflow_context)) + t.daemon = True + t.start() + time.sleep(10) + eng.cancel_execution(workflow_context) + t.join(timeout=60) # we need to give this a *lot* of time because Travis can be *very* slow + assert not t.is_alive() # if join is timed out it will not raise an exception + assert workflow_context.states == ['start', 'cancel'] + assert workflow_context.exception is None + invocations = global_test_holder.get('invocations', []) + assert 0 < len(invocations) < number_of_tasks + execution = workflow_context.execution + assert execution.started_at <= execution.ended_at <= datetime.utcnow() + assert execution.error is None + assert execution.status == models.Execution.CANCELLED + + def test_cancel_pending_execution(self, workflow_context, executor): + @workflow + def mock_workflow(graph, **_): + return graph + eng = self._engine(workflow_func=mock_workflow, + workflow_context=workflow_context, + executor=executor) + eng.cancel_execution(workflow_context) + execution = workflow_context.execution + assert execution.status == models.Execution.CANCELLED + + +class TestRetries(BaseTest): + + def test_two_max_attempts_and_success_on_retry(self, workflow_context, executor): + node, _, operation_name = self._create_interface( + workflow_context, mock_conditional_failure_task, {'failure_count': 1}) + + @workflow + def mock_workflow(ctx, graph): + op = self._op(node, operation_name, + arguments={'failure_count': 1}, + max_attempts=2) + graph.add_tasks(op) + self._execute( + workflow_func=mock_workflow, + workflow_context=workflow_context, + executor=executor) + assert workflow_context.states == ['start', 'success'] + assert workflow_context.exception is None + assert len(global_test_holder.get('invocations', [])) == 2 + assert global_test_holder.get('sent_task_signal_calls') == 2 + + def test_two_max_attempts_and_failure_on_retry(self, workflow_context, executor): + node, _, operation_name = self._create_interface( + workflow_context, mock_conditional_failure_task, {'failure_count': 1}) + + @workflow + def mock_workflow(ctx, graph): + op = self._op(node, operation_name, + arguments={'failure_count': 2}, + max_attempts=2) + graph.add_tasks(op) + with pytest.raises(exceptions.ExecutorException): + self._execute( + workflow_func=mock_workflow, + workflow_context=workflow_context, + executor=executor) + assert workflow_context.states == ['start', 'failure'] + assert isinstance(workflow_context.exception, exceptions.ExecutorException) + assert len(global_test_holder.get('invocations', [])) == 2 + assert global_test_holder.get('sent_task_signal_calls') == 2 + + def test_three_max_attempts_and_success_on_first_retry(self, workflow_context, executor): + node, _, operation_name = self._create_interface( + workflow_context, mock_conditional_failure_task, {'failure_count': 1}) + @workflow + def mock_workflow(ctx, graph): + op = self._op(node, operation_name, + arguments={'failure_count': 1}, + max_attempts=3) + graph.add_tasks(op) + self._execute( + workflow_func=mock_workflow, + workflow_context=workflow_context, + executor=executor) + assert workflow_context.states == ['start', 'success'] + assert workflow_context.exception is None + assert len(global_test_holder.get('invocations', [])) == 2 + assert global_test_holder.get('sent_task_signal_calls') == 2 + + def test_three_max_attempts_and_success_on_second_retry(self, workflow_context, executor): + node, _, operation_name = self._create_interface( + workflow_context, mock_conditional_failure_task, {'failure_count': 1}) + + @workflow + def mock_workflow(ctx, graph): + op = self._op(node, operation_name, + arguments={'failure_count': 2}, + max_attempts=3) + graph.add_tasks(op) + self._execute( + workflow_func=mock_workflow, + workflow_context=workflow_context, + executor=executor) + assert workflow_context.states == ['start', 'success'] + assert workflow_context.exception is None + assert len(global_test_holder.get('invocations', [])) == 3 + assert global_test_holder.get('sent_task_signal_calls') == 3 + + def test_infinite_retries(self, workflow_context, executor): + node, _, operation_name = self._create_interface( + workflow_context, mock_conditional_failure_task, {'failure_count': 1}) + @workflow + def mock_workflow(ctx, graph): + op = self._op(node, operation_name, + arguments={'failure_count': 1}, + max_attempts=-1) + graph.add_tasks(op) + self._execute( + workflow_func=mock_workflow, + workflow_context=workflow_context, + executor=executor) + assert workflow_context.states == ['start', 'success'] + assert workflow_context.exception is None + assert len(global_test_holder.get('invocations', [])) == 2 + assert global_test_holder.get('sent_task_signal_calls') == 2 + + def test_retry_interval_float(self, workflow_context, executor): + self._test_retry_interval(retry_interval=0.3, + workflow_context=workflow_context, + executor=executor) + + def test_retry_interval_int(self, workflow_context, executor): + self._test_retry_interval(retry_interval=1, + workflow_context=workflow_context, + executor=executor) + + def _test_retry_interval(self, retry_interval, workflow_context, executor): + node, _, operation_name = self._create_interface( + workflow_context, mock_conditional_failure_task, {'failure_count': 1}) + @workflow + def mock_workflow(ctx, graph): + op = self._op(node, operation_name, + arguments={'failure_count': 1}, + max_attempts=2, + retry_interval=retry_interval) + graph.add_tasks(op) + self._execute( + workflow_func=mock_workflow, + workflow_context=workflow_context, + executor=executor) + assert workflow_context.states == ['start', 'success'] + assert workflow_context.exception is None + invocations = global_test_holder.get('invocations', []) + assert len(invocations) == 2 + invocation1, invocation2 = invocations + assert invocation2 - invocation1 >= retry_interval + assert global_test_holder.get('sent_task_signal_calls') == 2 + + def test_ignore_failure(self, workflow_context, executor): + node, _, operation_name = self._create_interface( + workflow_context, mock_conditional_failure_task, {'failure_count': 1}) + @workflow + def mock_workflow(ctx, graph): + op = self._op(node, operation_name, + ignore_failure=True, + arguments={'failure_count': 100}, + max_attempts=100) + graph.add_tasks(op) + self._execute( + workflow_func=mock_workflow, + workflow_context=workflow_context, + executor=executor) + assert workflow_context.states == ['start', 'success'] + assert workflow_context.exception is None + invocations = global_test_holder.get('invocations', []) + assert len(invocations) == 1 + assert global_test_holder.get('sent_task_signal_calls') == 1 + + +class TestTaskRetryAndAbort(BaseTest): + message = 'EXPECTED_ERROR' + + def test_task_retry_default_interval(self, workflow_context, executor): + default_retry_interval = 0.1 + node, _, operation_name = self._create_interface( + workflow_context, mock_task_retry, {'message': self.message}) + + @workflow + def mock_workflow(ctx, graph): + op = self._op(node, operation_name, + arguments={'message': self.message}, + retry_interval=default_retry_interval, + max_attempts=2) + graph.add_tasks(op) + with pytest.raises(exceptions.ExecutorException): + self._execute( + workflow_func=mock_workflow, + workflow_context=workflow_context, + executor=executor) + assert workflow_context.states == ['start', 'failure'] + assert isinstance(workflow_context.exception, exceptions.ExecutorException) + invocations = global_test_holder.get('invocations', []) + assert len(invocations) == 2 + invocation1, invocation2 = invocations + assert invocation2 - invocation1 >= default_retry_interval + assert global_test_holder.get('sent_task_signal_calls') == 2 + + def test_task_retry_custom_interval(self, workflow_context, executor): + default_retry_interval = 100 + custom_retry_interval = 0.1 + node, _, operation_name = self._create_interface( + workflow_context, mock_task_retry, {'message': self.message, + 'retry_interval': custom_retry_interval}) + + @workflow + def mock_workflow(ctx, graph): + op = self._op(node, operation_name, + arguments={'message': self.message, + 'retry_interval': custom_retry_interval}, + retry_interval=default_retry_interval, + max_attempts=2) + graph.add_tasks(op) + execution_start = time.time() + with pytest.raises(exceptions.ExecutorException): + self._execute( + workflow_func=mock_workflow, + workflow_context=workflow_context, + executor=executor) + execution_end = time.time() + assert workflow_context.states == ['start', 'failure'] + assert isinstance(workflow_context.exception, exceptions.ExecutorException) + invocations = global_test_holder.get('invocations', []) + assert len(invocations) == 2 + assert (execution_end - execution_start) < default_retry_interval + assert global_test_holder.get('sent_task_signal_calls') == 2 + + def test_task_abort(self, workflow_context, executor): + node, _, operation_name = self._create_interface( + workflow_context, mock_task_abort, {'message': self.message}) + @workflow + def mock_workflow(ctx, graph): + op = self._op(node, operation_name, + arguments={'message': self.message}, + retry_interval=100, + max_attempts=100) + graph.add_tasks(op) + with pytest.raises(exceptions.ExecutorException): + self._execute( + workflow_func=mock_workflow, + workflow_context=workflow_context, + executor=executor) + assert workflow_context.states == ['start', 'failure'] + assert isinstance(workflow_context.exception, exceptions.ExecutorException) + invocations = global_test_holder.get('invocations', []) + assert len(invocations) == 1 + assert global_test_holder.get('sent_task_signal_calls') == 1 + + +@operation +def mock_success_task(**_): + pass + + +@operation +def mock_failed_task(**_): + raise RuntimeError + + +@operation +def mock_ordered_task(counter, **_): + invocations = global_test_holder.setdefault('invocations', []) + invocations.append(counter) + + +@operation +def mock_conditional_failure_task(failure_count, **_): + invocations = global_test_holder.setdefault('invocations', []) + try: + if len(invocations) < failure_count: + raise RuntimeError + finally: + invocations.append(time.time()) + + +@operation +def mock_sleep_task(seconds, **_): + _add_invocation_timestamp() + time.sleep(seconds) + + +@operation +def mock_task_retry(ctx, message, retry_interval=None, **_): + _add_invocation_timestamp() + retry_kwargs = {} + if retry_interval is not None: + retry_kwargs['retry_interval'] = retry_interval + ctx.task.retry(message, **retry_kwargs) + + +@operation +def mock_task_abort(ctx, message, **_): + _add_invocation_timestamp() + ctx.task.abort(message) + + +def _add_invocation_timestamp(): + invocations = global_test_holder.setdefault('invocations', []) + invocations.append(time.time()) diff --git a/azure/aria/aria-extension-cloudify/src/aria/tests/orchestrator/workflows/core/test_events.py b/azure/aria/aria-extension-cloudify/src/aria/tests/orchestrator/workflows/core/test_events.py new file mode 100644 index 0000000..d804de5 --- /dev/null +++ b/azure/aria/aria-extension-cloudify/src/aria/tests/orchestrator/workflows/core/test_events.py @@ -0,0 +1,171 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import pytest + +from aria.orchestrator.decorators import operation, workflow +from aria.orchestrator.workflows.core import engine, graph_compiler +from aria.orchestrator.workflows.executor.thread import ThreadExecutor +from aria.orchestrator.workflows import api +from aria.modeling.service_instance import NodeBase + +from tests import mock, storage + +global_test_dict = {} # used to capture transitional node state changes + + +@pytest.fixture +def ctx(tmpdir): + context = mock.context.simple(str(tmpdir)) + yield context + storage.release_sqlite_storage(context.model) + +# TODO another possible approach of writing these tests: +# Don't create a ctx for every test. +# Problem is, that if for every test we create a workflow that contains just one standard +# lifecycle operation, then by the time we try to run the second test, the workflow failes since +# the execution tries to go from 'terminated' to 'pending'. +# And if we write a workflow that contains all the lifecycle operations, then first we need to +# change the api of `mock.models.create_interface`, which a lot of other tests use, and second how +# do we check all the state transition during the workflow execution in a convenient way. + +TYPE_URI_NAME = 'tosca.interfaces.node.lifecycle.Standard' +SHORTHAND_NAME = 'Standard' + + +def test_node_state_changes_as_a_result_of_standard_lifecycle_create(ctx, executor): + node = run_operation_on_node( + ctx, interface_name=TYPE_URI_NAME, op_name='create', executor=executor) + _assert_node_state_changed_as_a_result_of_standard_lifecycle_operation(node, 'create') + + +def test_node_state_changes_as_a_result_of_standard_lifecycle_configure(ctx, executor): + node = run_operation_on_node( + ctx, interface_name=TYPE_URI_NAME, op_name='configure', executor=executor) + _assert_node_state_changed_as_a_result_of_standard_lifecycle_operation(node, 'configure') + + +def test_node_state_changes_as_a_result_of_standard_lifecycle_start(ctx, executor): + node = run_operation_on_node( + ctx, interface_name=TYPE_URI_NAME, op_name='start', executor=executor) + _assert_node_state_changed_as_a_result_of_standard_lifecycle_operation(node, 'start') + + +def test_node_state_changes_as_a_result_of_standard_lifecycle_stop(ctx, executor): + node = run_operation_on_node( + ctx, interface_name=TYPE_URI_NAME, op_name='stop', executor=executor) + _assert_node_state_changed_as_a_result_of_standard_lifecycle_operation(node, 'stop') + + +def test_node_state_changes_as_a_result_of_standard_lifecycle_delete(ctx, executor): + node = run_operation_on_node( + ctx, interface_name=TYPE_URI_NAME, op_name='delete', executor=executor) + _assert_node_state_changed_as_a_result_of_standard_lifecycle_operation(node, 'delete') + + +def test_node_state_changes_as_a_result_of_standard_lifecycle_create_shorthand_name(ctx, executor): + node = run_operation_on_node( + ctx, interface_name=SHORTHAND_NAME, op_name='create', executor=executor) + _assert_node_state_changed_as_a_result_of_standard_lifecycle_operation(node, 'create') + + +def test_node_state_changes_as_a_result_of_standard_lifecycle_configure_shorthand_name( + ctx, executor): + node = run_operation_on_node( + ctx, interface_name=SHORTHAND_NAME, op_name='configure', executor=executor) + _assert_node_state_changed_as_a_result_of_standard_lifecycle_operation(node, 'configure') + + +def test_node_state_changes_as_a_result_of_standard_lifecycle_start_shorthand_name(ctx, executor): + node = run_operation_on_node( + ctx, interface_name=SHORTHAND_NAME, op_name='start', executor=executor) + _assert_node_state_changed_as_a_result_of_standard_lifecycle_operation(node, 'start') + + +def test_node_state_changes_as_a_result_of_standard_lifecycle_stop_shorthand_name(ctx, executor): + node = run_operation_on_node( + ctx, interface_name=SHORTHAND_NAME, op_name='stop', executor=executor) + _assert_node_state_changed_as_a_result_of_standard_lifecycle_operation(node, 'stop') + + +def test_node_state_changes_as_a_result_of_standard_lifecycle_delete_shorthand_name(ctx, executor): + node = run_operation_on_node( + ctx, interface_name=SHORTHAND_NAME, op_name='delete', executor=executor) + _assert_node_state_changed_as_a_result_of_standard_lifecycle_operation(node, 'delete') + + +def test_node_state_doesnt_change_as_a_result_of_an_operation_that_is_not_standard_lifecycle1( + ctx, executor): + node = run_operation_on_node( + ctx, interface_name='interface_name', op_name='op_name', executor=executor) + assert node.state == node.INITIAL + + +def test_node_state_doesnt_change_as_a_result_of_an_operation_that_is_not_standard_lifecycle2( + ctx, executor): + node = run_operation_on_node( + ctx, interface_name='interface_name', op_name='create', executor=executor) + assert node.state == node.INITIAL + + +def run_operation_on_node(ctx, op_name, interface_name, executor): + node = ctx.model.node.get_by_name(mock.models.DEPENDENCY_NODE_NAME) + interface = mock.models.create_interface( + service=node.service, + interface_name=interface_name, + operation_name=op_name, + operation_kwargs=dict(function='{name}.{func.__name__}'.format(name=__name__, func=func))) + node.interfaces[interface.name] = interface + graph_compiler.GraphCompiler(ctx, ThreadExecutor).compile( + single_operation_workflow(ctx, node=node, interface_name=interface_name, op_name=op_name) + ) + + eng = engine.Engine(executors={executor.__class__: executor}) + eng.execute(ctx) + return node + + +def run_standard_lifecycle_operation_on_node(ctx, op_name, executor): + return run_operation_on_node(ctx, + interface_name='aria.interfaces.lifecycle.Standard', + op_name=op_name, + executor=executor) + + +def _assert_node_state_changed_as_a_result_of_standard_lifecycle_operation(node, op_name): + assert global_test_dict['transitional_state'] == NodeBase._OP_TO_STATE[op_name]['transitional'] + assert node.state == NodeBase._OP_TO_STATE[op_name]['finished'] + + +@workflow +def single_operation_workflow(graph, node, interface_name, op_name, **_): + graph.add_tasks(api.task.OperationTask( + node, + interface_name=interface_name, + operation_name=op_name)) + + +@operation +def func(ctx): + global_test_dict['transitional_state'] = ctx.node.state + + +@pytest.fixture +def executor(): + result = ThreadExecutor() + try: + yield result + finally: + result.close() diff --git a/azure/aria/aria-extension-cloudify/src/aria/tests/orchestrator/workflows/core/test_task.py b/azure/aria/aria-extension-cloudify/src/aria/tests/orchestrator/workflows/core/test_task.py new file mode 100644 index 0000000..2b3f7d7 --- /dev/null +++ b/azure/aria/aria-extension-cloudify/src/aria/tests/orchestrator/workflows/core/test_task.py @@ -0,0 +1,153 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +from datetime import ( + datetime, + timedelta +) + +import pytest + +from aria.orchestrator.context import workflow as workflow_context +from aria.orchestrator.workflows import ( + api, + exceptions, +) +from aria.modeling import models + +from tests import mock, storage + +NODE_INTERFACE_NAME = 'Standard' +NODE_OPERATION_NAME = 'create' +RELATIONSHIP_INTERFACE_NAME = 'Configure' +RELATIONSHIP_OPERATION_NAME = 'pre_configure' + + +@pytest.fixture +def ctx(tmpdir): + context = mock.context.simple(str(tmpdir)) + + relationship = context.model.relationship.list()[0] + interface = mock.models.create_interface( + relationship.source_node.service, + RELATIONSHIP_INTERFACE_NAME, + RELATIONSHIP_OPERATION_NAME, + operation_kwargs=dict(function='test') + ) + relationship.interfaces[interface.name] = interface + context.model.relationship.update(relationship) + + node = context.model.node.get_by_name(mock.models.DEPENDENCY_NODE_NAME) + interface = mock.models.create_interface( + node.service, + NODE_INTERFACE_NAME, + NODE_OPERATION_NAME, + operation_kwargs=dict(function='test') + ) + node.interfaces[interface.name] = interface + context.model.node.update(node) + + yield context + storage.release_sqlite_storage(context.model) + + +class TestOperationTask(object): + + def _create_node_operation_task(self, ctx, node): + with workflow_context.current.push(ctx): + api_task = api.task.OperationTask( + node, + interface_name=NODE_INTERFACE_NAME, + operation_name=NODE_OPERATION_NAME) + model_task = models.Task.from_api_task(api_task, None) + return api_task, model_task + + def _create_relationship_operation_task(self, ctx, relationship): + with workflow_context.current.push(ctx): + api_task = api.task.OperationTask( + relationship, + interface_name=RELATIONSHIP_INTERFACE_NAME, + operation_name=RELATIONSHIP_OPERATION_NAME) + core_task = models.Task.from_api_task(api_task, None) + return api_task, core_task + + def test_node_operation_task_creation(self, ctx): + storage_plugin = mock.models.create_plugin('p1', '0.1') + storage_plugin_other = mock.models.create_plugin('p0', '0.0') + ctx.model.plugin.put(storage_plugin) + ctx.model.plugin.put(storage_plugin_other) + node = ctx.model.node.get_by_name(mock.models.DEPENDENCY_NODE_NAME) + interface = mock.models.create_interface( + node.service, + NODE_INTERFACE_NAME, + NODE_OPERATION_NAME, + operation_kwargs=dict(plugin=storage_plugin, function='test') + ) + node.interfaces[interface.name] = interface + ctx.model.node.update(node) + api_task, model_task = self._create_node_operation_task(ctx, node) + assert model_task.name == api_task.name + assert model_task.function == api_task.function + assert model_task.actor == api_task.actor == node + assert model_task.arguments == api_task.arguments + assert model_task.plugin == storage_plugin + + def test_relationship_operation_task_creation(self, ctx): + relationship = ctx.model.relationship.list()[0] + ctx.model.relationship.update(relationship) + _, model_task = self._create_relationship_operation_task( + ctx, relationship) + assert model_task.actor == relationship + + @pytest.mark.skip("Currently not supported for model tasks") + def test_operation_task_edit_locked_attribute(self, ctx): + node = ctx.model.node.get_by_name(mock.models.DEPENDENCY_NODE_NAME) + + _, core_task = self._create_node_operation_task(ctx, node) + now = datetime.utcnow() + with pytest.raises(exceptions.TaskException): + core_task.status = core_task.STARTED + with pytest.raises(exceptions.TaskException): + core_task.started_at = now + with pytest.raises(exceptions.TaskException): + core_task.ended_at = now + with pytest.raises(exceptions.TaskException): + core_task.attempts_count = 2 + with pytest.raises(exceptions.TaskException): + core_task.due_at = now + + @pytest.mark.skip("Currently not supported for model tasks") + def test_operation_task_edit_attributes(self, ctx): + node = ctx.model.node.get_by_name(mock.models.DEPENDENCY_NODE_NAME) + + _, core_task = self._create_node_operation_task(ctx, node) + future_time = datetime.utcnow() + timedelta(seconds=3) + + with core_task._update(): + core_task.status = core_task.STARTED + core_task.started_at = future_time + core_task.ended_at = future_time + core_task.attempts_count = 2 + core_task.due_at = future_time + assert core_task.status != core_task.STARTED + assert core_task.started_at != future_time + assert core_task.ended_at != future_time + assert core_task.attempts_count != 2 + assert core_task.due_at != future_time + + assert core_task.status == core_task.STARTED + assert core_task.started_at == future_time + assert core_task.ended_at == future_time + assert core_task.attempts_count == 2 + assert core_task.due_at == future_time diff --git a/azure/aria/aria-extension-cloudify/src/aria/tests/orchestrator/workflows/core/test_task_graph_into_execution_graph.py b/azure/aria/aria-extension-cloudify/src/aria/tests/orchestrator/workflows/core/test_task_graph_into_execution_graph.py new file mode 100644 index 0000000..e24c901 --- /dev/null +++ b/azure/aria/aria-extension-cloudify/src/aria/tests/orchestrator/workflows/core/test_task_graph_into_execution_graph.py @@ -0,0 +1,172 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from networkx import topological_sort, DiGraph + +from aria.modeling import models +from aria.orchestrator import context +from aria.orchestrator.workflows import api +from aria.orchestrator.workflows.core import graph_compiler +from aria.orchestrator.workflows.executor import base +from tests import mock +from tests import storage + + +def test_task_graph_into_execution_graph(tmpdir): + interface_name = 'Standard' + op1_name, op2_name, op3_name = 'create', 'configure', 'start' + workflow_context = mock.context.simple(str(tmpdir)) + node = workflow_context.model.node.get_by_name(mock.models.DEPENDENCY_NODE_NAME) + interface = mock.models.create_interface( + node.service, + interface_name, + op1_name, + operation_kwargs=dict(function='test') + ) + interface.operations[op2_name] = mock.models.create_operation(op2_name) # pylint: disable=unsubscriptable-object + interface.operations[op3_name] = mock.models.create_operation(op3_name) # pylint: disable=unsubscriptable-object + node.interfaces[interface.name] = interface + workflow_context.model.node.update(node) + + def sub_workflow(name, **_): + return api.task_graph.TaskGraph(name) + + with context.workflow.current.push(workflow_context): + test_task_graph = api.task.WorkflowTask(sub_workflow, name='test_task_graph') + simple_before_task = api.task.OperationTask( + node, + interface_name=interface_name, + operation_name=op1_name) + simple_after_task = api.task.OperationTask( + node, + interface_name=interface_name, + operation_name=op1_name) + + inner_task_graph = api.task.WorkflowTask(sub_workflow, name='test_inner_task_graph') + inner_task_1 = api.task.OperationTask( + node, + interface_name=interface_name, + operation_name=op1_name) + inner_task_2 = api.task.OperationTask( + node, + interface_name=interface_name, + operation_name=op2_name) + inner_task_3 = api.task.OperationTask( + node, + interface_name=interface_name, + operation_name=op3_name) + inner_task_graph.add_tasks(inner_task_1) + inner_task_graph.add_tasks(inner_task_2) + inner_task_graph.add_tasks(inner_task_3) + inner_task_graph.add_dependency(inner_task_2, inner_task_1) + inner_task_graph.add_dependency(inner_task_3, inner_task_1) + inner_task_graph.add_dependency(inner_task_3, inner_task_2) + + test_task_graph.add_tasks(simple_before_task) + test_task_graph.add_tasks(simple_after_task) + test_task_graph.add_tasks(inner_task_graph) + test_task_graph.add_dependency(inner_task_graph, simple_before_task) + test_task_graph.add_dependency(simple_after_task, inner_task_graph) + + compiler = graph_compiler.GraphCompiler(workflow_context, base.StubTaskExecutor) + compiler.compile(test_task_graph) + + execution_tasks = topological_sort(_graph(workflow_context.execution.tasks)) + + assert len(execution_tasks) == 9 + + expected_tasks_names = [ + '{0}-Start'.format(test_task_graph.id), + simple_before_task.id, + '{0}-Start'.format(inner_task_graph.id), + inner_task_1.id, + inner_task_2.id, + inner_task_3.id, + '{0}-End'.format(inner_task_graph.id), + simple_after_task.id, + '{0}-End'.format(test_task_graph.id) + ] + + assert expected_tasks_names == [compiler._model_to_api_id[t.id] for t in execution_tasks] + assert all(isinstance(task, models.Task) for task in execution_tasks) + execution_tasks = iter(execution_tasks) + + _assert_tasks( + iter(execution_tasks), + iter([simple_after_task, inner_task_1, inner_task_2, inner_task_3, simple_after_task]) + ) + storage.release_sqlite_storage(workflow_context.model) + + +def _assert_tasks(execution_tasks, api_tasks): + start_workflow_exec_task = next(execution_tasks) + assert start_workflow_exec_task._stub_type == models.Task.START_WORKFLOW + + before_exec_task = next(execution_tasks) + simple_before_task = next(api_tasks) + _assert_execution_is_api_task(before_exec_task, simple_before_task) + assert before_exec_task.dependencies == [start_workflow_exec_task] + + start_subworkflow_exec_task = next(execution_tasks) + assert start_subworkflow_exec_task._stub_type == models.Task.START_SUBWROFKLOW + assert start_subworkflow_exec_task.dependencies == [before_exec_task] + + inner_exec_task_1 = next(execution_tasks) + inner_task_1 = next(api_tasks) + _assert_execution_is_api_task(inner_exec_task_1, inner_task_1) + assert inner_exec_task_1.dependencies == [start_subworkflow_exec_task] + + inner_exec_task_2 = next(execution_tasks) + inner_task_2 = next(api_tasks) + _assert_execution_is_api_task(inner_exec_task_2, inner_task_2) + assert inner_exec_task_2.dependencies == [inner_exec_task_1] + + inner_exec_task_3 = next(execution_tasks) + inner_task_3 = next(api_tasks) + _assert_execution_is_api_task(inner_exec_task_3, inner_task_3) + assert sorted(inner_exec_task_3.dependencies) == sorted([inner_exec_task_1, inner_exec_task_2]) + + end_subworkflow_exec_task = next(execution_tasks) + assert end_subworkflow_exec_task._stub_type == models.Task.END_SUBWORKFLOW + assert end_subworkflow_exec_task.dependencies == [inner_exec_task_3] + + after_exec_task = next(execution_tasks) + simple_after_task = next(api_tasks) + _assert_execution_is_api_task(after_exec_task, simple_after_task) + assert after_exec_task.dependencies == [end_subworkflow_exec_task] + + end_workflow_exec_task = next(execution_tasks) + assert end_workflow_exec_task._stub_type == models.Task.END_WORKFLOW + assert end_workflow_exec_task.dependencies == [after_exec_task] + + +def _assert_execution_is_api_task(execution_task, api_task): + assert execution_task.name == api_task.name + assert execution_task.function == api_task.function + assert execution_task.actor == api_task.actor + assert execution_task.arguments == api_task.arguments + + +def _get_task_by_name(task_name, graph): + return graph.node[task_name]['task'] + + +def _graph(tasks): + graph = DiGraph() + for task in tasks: + for dependency in task.dependencies: + graph.add_edge(dependency, task) + + return graph |