summaryrefslogtreecommitdiffstats
path: root/azure/aria/aria-extension-cloudify/src/aria/tests/orchestrator/workflows/core
diff options
context:
space:
mode:
Diffstat (limited to 'azure/aria/aria-extension-cloudify/src/aria/tests/orchestrator/workflows/core')
-rw-r--r--azure/aria/aria-extension-cloudify/src/aria/tests/orchestrator/workflows/core/__init__.py14
-rw-r--r--azure/aria/aria-extension-cloudify/src/aria/tests/orchestrator/workflows/core/test_engine.py564
-rw-r--r--azure/aria/aria-extension-cloudify/src/aria/tests/orchestrator/workflows/core/test_events.py171
-rw-r--r--azure/aria/aria-extension-cloudify/src/aria/tests/orchestrator/workflows/core/test_task.py153
-rw-r--r--azure/aria/aria-extension-cloudify/src/aria/tests/orchestrator/workflows/core/test_task_graph_into_execution_graph.py172
5 files changed, 1074 insertions, 0 deletions
diff --git a/azure/aria/aria-extension-cloudify/src/aria/tests/orchestrator/workflows/core/__init__.py b/azure/aria/aria-extension-cloudify/src/aria/tests/orchestrator/workflows/core/__init__.py
new file mode 100644
index 0000000..ae1e83e
--- /dev/null
+++ b/azure/aria/aria-extension-cloudify/src/aria/tests/orchestrator/workflows/core/__init__.py
@@ -0,0 +1,14 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
diff --git a/azure/aria/aria-extension-cloudify/src/aria/tests/orchestrator/workflows/core/test_engine.py b/azure/aria/aria-extension-cloudify/src/aria/tests/orchestrator/workflows/core/test_engine.py
new file mode 100644
index 0000000..0c704f5
--- /dev/null
+++ b/azure/aria/aria-extension-cloudify/src/aria/tests/orchestrator/workflows/core/test_engine.py
@@ -0,0 +1,564 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+import time
+import threading
+from datetime import datetime
+
+import pytest
+
+from aria.orchestrator import (
+ events,
+ workflow,
+ operation,
+)
+from aria.modeling import models
+from aria.orchestrator.workflows import (
+ api,
+ exceptions,
+)
+from aria.orchestrator.workflows.core import engine, graph_compiler
+from aria.orchestrator.workflows.executor import thread
+
+from tests import mock, storage
+
+
+global_test_holder = {}
+
+
+class BaseTest(object):
+
+ @classmethod
+ def _execute(cls, workflow_func, workflow_context, executor):
+ eng = cls._engine(workflow_func=workflow_func,
+ workflow_context=workflow_context,
+ executor=executor)
+ eng.execute(ctx=workflow_context)
+ return eng
+
+ @staticmethod
+ def _engine(workflow_func, workflow_context, executor):
+ graph = workflow_func(ctx=workflow_context)
+ graph_compiler.GraphCompiler(workflow_context, executor.__class__).compile(graph)
+
+ return engine.Engine(executors={executor.__class__: executor})
+
+ @staticmethod
+ def _create_interface(ctx, func, arguments=None):
+ node = ctx.model.node.get_by_name(mock.models.DEPENDENCY_NODE_NAME)
+ interface_name = 'aria.interfaces.lifecycle'
+ operation_kwargs = dict(function='{name}.{func.__name__}'.format(
+ name=__name__, func=func))
+ if arguments:
+ # the operation has to declare the arguments before those may be passed
+ operation_kwargs['arguments'] = arguments
+ operation_name = 'create'
+ interface = mock.models.create_interface(node.service, interface_name, operation_name,
+ operation_kwargs=operation_kwargs)
+ node.interfaces[interface.name] = interface
+ ctx.model.node.update(node)
+
+ return node, interface_name, operation_name
+
+ @staticmethod
+ def _op(node,
+ operation_name,
+ arguments=None,
+ max_attempts=None,
+ retry_interval=None,
+ ignore_failure=None):
+
+ return api.task.OperationTask(
+ node,
+ interface_name='aria.interfaces.lifecycle',
+ operation_name=operation_name,
+ arguments=arguments,
+ max_attempts=max_attempts,
+ retry_interval=retry_interval,
+ ignore_failure=ignore_failure,
+ )
+
+ @pytest.fixture(autouse=True)
+ def globals_cleanup(self):
+ try:
+ yield
+ finally:
+ global_test_holder.clear()
+
+ @pytest.fixture(autouse=True)
+ def signals_registration(self, ):
+ def sent_task_handler(ctx, *args, **kwargs):
+ if ctx.task._stub_type is None:
+ calls = global_test_holder.setdefault('sent_task_signal_calls', 0)
+ global_test_holder['sent_task_signal_calls'] = calls + 1
+
+ def start_workflow_handler(workflow_context, *args, **kwargs):
+ workflow_context.states.append('start')
+
+ def success_workflow_handler(workflow_context, *args, **kwargs):
+ workflow_context.states.append('success')
+
+ def failure_workflow_handler(workflow_context, exception, *args, **kwargs):
+ workflow_context.states.append('failure')
+ workflow_context.exception = exception
+
+ def cancel_workflow_handler(workflow_context, *args, **kwargs):
+ workflow_context.states.append('cancel')
+
+ events.start_workflow_signal.connect(start_workflow_handler)
+ events.on_success_workflow_signal.connect(success_workflow_handler)
+ events.on_failure_workflow_signal.connect(failure_workflow_handler)
+ events.on_cancelled_workflow_signal.connect(cancel_workflow_handler)
+ events.sent_task_signal.connect(sent_task_handler)
+ try:
+ yield
+ finally:
+ events.start_workflow_signal.disconnect(start_workflow_handler)
+ events.on_success_workflow_signal.disconnect(success_workflow_handler)
+ events.on_failure_workflow_signal.disconnect(failure_workflow_handler)
+ events.on_cancelled_workflow_signal.disconnect(cancel_workflow_handler)
+ events.sent_task_signal.disconnect(sent_task_handler)
+
+ @pytest.fixture
+ def executor(self):
+ result = thread.ThreadExecutor()
+ try:
+ yield result
+ finally:
+ result.close()
+
+ @pytest.fixture
+ def workflow_context(self, tmpdir):
+ workflow_context = mock.context.simple(str(tmpdir))
+ workflow_context.states = []
+ workflow_context.exception = None
+ yield workflow_context
+ storage.release_sqlite_storage(workflow_context.model)
+
+
+class TestEngine(BaseTest):
+
+ def test_empty_graph_execution(self, workflow_context, executor):
+ @workflow
+ def mock_workflow(**_):
+ pass
+ self._execute(workflow_func=mock_workflow,
+ workflow_context=workflow_context,
+ executor=executor)
+ assert workflow_context.states == ['start', 'success']
+ assert workflow_context.exception is None
+ assert 'sent_task_signal_calls' not in global_test_holder
+ execution = workflow_context.execution
+ assert execution.started_at <= execution.ended_at <= datetime.utcnow()
+ assert execution.error is None
+ assert execution.status == models.Execution.SUCCEEDED
+
+ def test_single_task_successful_execution(self, workflow_context, executor):
+ node, _, operation_name = self._create_interface(workflow_context, mock_success_task)
+
+ @workflow
+ def mock_workflow(ctx, graph):
+ graph.add_tasks(self._op(node, operation_name))
+ self._execute(
+ workflow_func=mock_workflow,
+ workflow_context=workflow_context,
+ executor=executor)
+ assert workflow_context.states == ['start', 'success']
+ assert workflow_context.exception is None
+ assert global_test_holder.get('sent_task_signal_calls') == 1
+
+ def test_single_task_failed_execution(self, workflow_context, executor):
+ node, _, operation_name = self._create_interface(workflow_context, mock_failed_task)
+
+ @workflow
+ def mock_workflow(ctx, graph):
+ graph.add_tasks(self._op(node, operation_name))
+ with pytest.raises(exceptions.ExecutorException):
+ self._execute(
+ workflow_func=mock_workflow,
+ workflow_context=workflow_context,
+ executor=executor)
+ assert workflow_context.states == ['start', 'failure']
+ assert isinstance(workflow_context.exception, exceptions.ExecutorException)
+ assert global_test_holder.get('sent_task_signal_calls') == 1
+ execution = workflow_context.execution
+ assert execution.started_at <= execution.ended_at <= datetime.utcnow()
+ assert execution.error is not None
+ assert execution.status == models.Execution.FAILED
+
+ def test_two_tasks_execution_order(self, workflow_context, executor):
+ node, _, operation_name = self._create_interface(
+ workflow_context, mock_ordered_task, {'counter': 1})
+
+ @workflow
+ def mock_workflow(ctx, graph):
+ op1 = self._op(node, operation_name, arguments={'counter': 1})
+ op2 = self._op(node, operation_name, arguments={'counter': 2})
+ graph.sequence(op1, op2)
+ self._execute(
+ workflow_func=mock_workflow,
+ workflow_context=workflow_context,
+ executor=executor)
+ assert workflow_context.states == ['start', 'success']
+ assert workflow_context.exception is None
+ assert global_test_holder.get('invocations') == [1, 2]
+ assert global_test_holder.get('sent_task_signal_calls') == 2
+
+ def test_stub_and_subworkflow_execution(self, workflow_context, executor):
+ node, _, operation_name = self._create_interface(
+ workflow_context, mock_ordered_task, {'counter': 1})
+
+ @workflow
+ def sub_workflow(ctx, graph):
+ op1 = self._op(node, operation_name, arguments={'counter': 1})
+ op2 = api.task.StubTask()
+ op3 = self._op(node, operation_name, arguments={'counter': 2})
+ graph.sequence(op1, op2, op3)
+
+ @workflow
+ def mock_workflow(ctx, graph):
+ graph.add_tasks(api.task.WorkflowTask(sub_workflow, ctx=ctx))
+ self._execute(workflow_func=mock_workflow,
+ workflow_context=workflow_context,
+ executor=executor)
+ assert workflow_context.states == ['start', 'success']
+ assert workflow_context.exception is None
+ assert global_test_holder.get('invocations') == [1, 2]
+ assert global_test_holder.get('sent_task_signal_calls') == 2
+
+
+class TestCancel(BaseTest):
+
+ def test_cancel_started_execution(self, workflow_context, executor):
+ number_of_tasks = 100
+ node, _, operation_name = self._create_interface(
+ workflow_context, mock_sleep_task, {'seconds': 0.1})
+
+ @workflow
+ def mock_workflow(ctx, graph):
+ operations = (
+ self._op(node, operation_name, arguments=dict(seconds=0.1))
+ for _ in range(number_of_tasks)
+ )
+ return graph.sequence(*operations)
+
+ eng = self._engine(workflow_func=mock_workflow,
+ workflow_context=workflow_context,
+ executor=executor)
+ t = threading.Thread(target=eng.execute, kwargs=dict(ctx=workflow_context))
+ t.daemon = True
+ t.start()
+ time.sleep(10)
+ eng.cancel_execution(workflow_context)
+ t.join(timeout=60) # we need to give this a *lot* of time because Travis can be *very* slow
+ assert not t.is_alive() # if join is timed out it will not raise an exception
+ assert workflow_context.states == ['start', 'cancel']
+ assert workflow_context.exception is None
+ invocations = global_test_holder.get('invocations', [])
+ assert 0 < len(invocations) < number_of_tasks
+ execution = workflow_context.execution
+ assert execution.started_at <= execution.ended_at <= datetime.utcnow()
+ assert execution.error is None
+ assert execution.status == models.Execution.CANCELLED
+
+ def test_cancel_pending_execution(self, workflow_context, executor):
+ @workflow
+ def mock_workflow(graph, **_):
+ return graph
+ eng = self._engine(workflow_func=mock_workflow,
+ workflow_context=workflow_context,
+ executor=executor)
+ eng.cancel_execution(workflow_context)
+ execution = workflow_context.execution
+ assert execution.status == models.Execution.CANCELLED
+
+
+class TestRetries(BaseTest):
+
+ def test_two_max_attempts_and_success_on_retry(self, workflow_context, executor):
+ node, _, operation_name = self._create_interface(
+ workflow_context, mock_conditional_failure_task, {'failure_count': 1})
+
+ @workflow
+ def mock_workflow(ctx, graph):
+ op = self._op(node, operation_name,
+ arguments={'failure_count': 1},
+ max_attempts=2)
+ graph.add_tasks(op)
+ self._execute(
+ workflow_func=mock_workflow,
+ workflow_context=workflow_context,
+ executor=executor)
+ assert workflow_context.states == ['start', 'success']
+ assert workflow_context.exception is None
+ assert len(global_test_holder.get('invocations', [])) == 2
+ assert global_test_holder.get('sent_task_signal_calls') == 2
+
+ def test_two_max_attempts_and_failure_on_retry(self, workflow_context, executor):
+ node, _, operation_name = self._create_interface(
+ workflow_context, mock_conditional_failure_task, {'failure_count': 1})
+
+ @workflow
+ def mock_workflow(ctx, graph):
+ op = self._op(node, operation_name,
+ arguments={'failure_count': 2},
+ max_attempts=2)
+ graph.add_tasks(op)
+ with pytest.raises(exceptions.ExecutorException):
+ self._execute(
+ workflow_func=mock_workflow,
+ workflow_context=workflow_context,
+ executor=executor)
+ assert workflow_context.states == ['start', 'failure']
+ assert isinstance(workflow_context.exception, exceptions.ExecutorException)
+ assert len(global_test_holder.get('invocations', [])) == 2
+ assert global_test_holder.get('sent_task_signal_calls') == 2
+
+ def test_three_max_attempts_and_success_on_first_retry(self, workflow_context, executor):
+ node, _, operation_name = self._create_interface(
+ workflow_context, mock_conditional_failure_task, {'failure_count': 1})
+ @workflow
+ def mock_workflow(ctx, graph):
+ op = self._op(node, operation_name,
+ arguments={'failure_count': 1},
+ max_attempts=3)
+ graph.add_tasks(op)
+ self._execute(
+ workflow_func=mock_workflow,
+ workflow_context=workflow_context,
+ executor=executor)
+ assert workflow_context.states == ['start', 'success']
+ assert workflow_context.exception is None
+ assert len(global_test_holder.get('invocations', [])) == 2
+ assert global_test_holder.get('sent_task_signal_calls') == 2
+
+ def test_three_max_attempts_and_success_on_second_retry(self, workflow_context, executor):
+ node, _, operation_name = self._create_interface(
+ workflow_context, mock_conditional_failure_task, {'failure_count': 1})
+
+ @workflow
+ def mock_workflow(ctx, graph):
+ op = self._op(node, operation_name,
+ arguments={'failure_count': 2},
+ max_attempts=3)
+ graph.add_tasks(op)
+ self._execute(
+ workflow_func=mock_workflow,
+ workflow_context=workflow_context,
+ executor=executor)
+ assert workflow_context.states == ['start', 'success']
+ assert workflow_context.exception is None
+ assert len(global_test_holder.get('invocations', [])) == 3
+ assert global_test_holder.get('sent_task_signal_calls') == 3
+
+ def test_infinite_retries(self, workflow_context, executor):
+ node, _, operation_name = self._create_interface(
+ workflow_context, mock_conditional_failure_task, {'failure_count': 1})
+ @workflow
+ def mock_workflow(ctx, graph):
+ op = self._op(node, operation_name,
+ arguments={'failure_count': 1},
+ max_attempts=-1)
+ graph.add_tasks(op)
+ self._execute(
+ workflow_func=mock_workflow,
+ workflow_context=workflow_context,
+ executor=executor)
+ assert workflow_context.states == ['start', 'success']
+ assert workflow_context.exception is None
+ assert len(global_test_holder.get('invocations', [])) == 2
+ assert global_test_holder.get('sent_task_signal_calls') == 2
+
+ def test_retry_interval_float(self, workflow_context, executor):
+ self._test_retry_interval(retry_interval=0.3,
+ workflow_context=workflow_context,
+ executor=executor)
+
+ def test_retry_interval_int(self, workflow_context, executor):
+ self._test_retry_interval(retry_interval=1,
+ workflow_context=workflow_context,
+ executor=executor)
+
+ def _test_retry_interval(self, retry_interval, workflow_context, executor):
+ node, _, operation_name = self._create_interface(
+ workflow_context, mock_conditional_failure_task, {'failure_count': 1})
+ @workflow
+ def mock_workflow(ctx, graph):
+ op = self._op(node, operation_name,
+ arguments={'failure_count': 1},
+ max_attempts=2,
+ retry_interval=retry_interval)
+ graph.add_tasks(op)
+ self._execute(
+ workflow_func=mock_workflow,
+ workflow_context=workflow_context,
+ executor=executor)
+ assert workflow_context.states == ['start', 'success']
+ assert workflow_context.exception is None
+ invocations = global_test_holder.get('invocations', [])
+ assert len(invocations) == 2
+ invocation1, invocation2 = invocations
+ assert invocation2 - invocation1 >= retry_interval
+ assert global_test_holder.get('sent_task_signal_calls') == 2
+
+ def test_ignore_failure(self, workflow_context, executor):
+ node, _, operation_name = self._create_interface(
+ workflow_context, mock_conditional_failure_task, {'failure_count': 1})
+ @workflow
+ def mock_workflow(ctx, graph):
+ op = self._op(node, operation_name,
+ ignore_failure=True,
+ arguments={'failure_count': 100},
+ max_attempts=100)
+ graph.add_tasks(op)
+ self._execute(
+ workflow_func=mock_workflow,
+ workflow_context=workflow_context,
+ executor=executor)
+ assert workflow_context.states == ['start', 'success']
+ assert workflow_context.exception is None
+ invocations = global_test_holder.get('invocations', [])
+ assert len(invocations) == 1
+ assert global_test_holder.get('sent_task_signal_calls') == 1
+
+
+class TestTaskRetryAndAbort(BaseTest):
+ message = 'EXPECTED_ERROR'
+
+ def test_task_retry_default_interval(self, workflow_context, executor):
+ default_retry_interval = 0.1
+ node, _, operation_name = self._create_interface(
+ workflow_context, mock_task_retry, {'message': self.message})
+
+ @workflow
+ def mock_workflow(ctx, graph):
+ op = self._op(node, operation_name,
+ arguments={'message': self.message},
+ retry_interval=default_retry_interval,
+ max_attempts=2)
+ graph.add_tasks(op)
+ with pytest.raises(exceptions.ExecutorException):
+ self._execute(
+ workflow_func=mock_workflow,
+ workflow_context=workflow_context,
+ executor=executor)
+ assert workflow_context.states == ['start', 'failure']
+ assert isinstance(workflow_context.exception, exceptions.ExecutorException)
+ invocations = global_test_holder.get('invocations', [])
+ assert len(invocations) == 2
+ invocation1, invocation2 = invocations
+ assert invocation2 - invocation1 >= default_retry_interval
+ assert global_test_holder.get('sent_task_signal_calls') == 2
+
+ def test_task_retry_custom_interval(self, workflow_context, executor):
+ default_retry_interval = 100
+ custom_retry_interval = 0.1
+ node, _, operation_name = self._create_interface(
+ workflow_context, mock_task_retry, {'message': self.message,
+ 'retry_interval': custom_retry_interval})
+
+ @workflow
+ def mock_workflow(ctx, graph):
+ op = self._op(node, operation_name,
+ arguments={'message': self.message,
+ 'retry_interval': custom_retry_interval},
+ retry_interval=default_retry_interval,
+ max_attempts=2)
+ graph.add_tasks(op)
+ execution_start = time.time()
+ with pytest.raises(exceptions.ExecutorException):
+ self._execute(
+ workflow_func=mock_workflow,
+ workflow_context=workflow_context,
+ executor=executor)
+ execution_end = time.time()
+ assert workflow_context.states == ['start', 'failure']
+ assert isinstance(workflow_context.exception, exceptions.ExecutorException)
+ invocations = global_test_holder.get('invocations', [])
+ assert len(invocations) == 2
+ assert (execution_end - execution_start) < default_retry_interval
+ assert global_test_holder.get('sent_task_signal_calls') == 2
+
+ def test_task_abort(self, workflow_context, executor):
+ node, _, operation_name = self._create_interface(
+ workflow_context, mock_task_abort, {'message': self.message})
+ @workflow
+ def mock_workflow(ctx, graph):
+ op = self._op(node, operation_name,
+ arguments={'message': self.message},
+ retry_interval=100,
+ max_attempts=100)
+ graph.add_tasks(op)
+ with pytest.raises(exceptions.ExecutorException):
+ self._execute(
+ workflow_func=mock_workflow,
+ workflow_context=workflow_context,
+ executor=executor)
+ assert workflow_context.states == ['start', 'failure']
+ assert isinstance(workflow_context.exception, exceptions.ExecutorException)
+ invocations = global_test_holder.get('invocations', [])
+ assert len(invocations) == 1
+ assert global_test_holder.get('sent_task_signal_calls') == 1
+
+
+@operation
+def mock_success_task(**_):
+ pass
+
+
+@operation
+def mock_failed_task(**_):
+ raise RuntimeError
+
+
+@operation
+def mock_ordered_task(counter, **_):
+ invocations = global_test_holder.setdefault('invocations', [])
+ invocations.append(counter)
+
+
+@operation
+def mock_conditional_failure_task(failure_count, **_):
+ invocations = global_test_holder.setdefault('invocations', [])
+ try:
+ if len(invocations) < failure_count:
+ raise RuntimeError
+ finally:
+ invocations.append(time.time())
+
+
+@operation
+def mock_sleep_task(seconds, **_):
+ _add_invocation_timestamp()
+ time.sleep(seconds)
+
+
+@operation
+def mock_task_retry(ctx, message, retry_interval=None, **_):
+ _add_invocation_timestamp()
+ retry_kwargs = {}
+ if retry_interval is not None:
+ retry_kwargs['retry_interval'] = retry_interval
+ ctx.task.retry(message, **retry_kwargs)
+
+
+@operation
+def mock_task_abort(ctx, message, **_):
+ _add_invocation_timestamp()
+ ctx.task.abort(message)
+
+
+def _add_invocation_timestamp():
+ invocations = global_test_holder.setdefault('invocations', [])
+ invocations.append(time.time())
diff --git a/azure/aria/aria-extension-cloudify/src/aria/tests/orchestrator/workflows/core/test_events.py b/azure/aria/aria-extension-cloudify/src/aria/tests/orchestrator/workflows/core/test_events.py
new file mode 100644
index 0000000..d804de5
--- /dev/null
+++ b/azure/aria/aria-extension-cloudify/src/aria/tests/orchestrator/workflows/core/test_events.py
@@ -0,0 +1,171 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import pytest
+
+from aria.orchestrator.decorators import operation, workflow
+from aria.orchestrator.workflows.core import engine, graph_compiler
+from aria.orchestrator.workflows.executor.thread import ThreadExecutor
+from aria.orchestrator.workflows import api
+from aria.modeling.service_instance import NodeBase
+
+from tests import mock, storage
+
+global_test_dict = {} # used to capture transitional node state changes
+
+
+@pytest.fixture
+def ctx(tmpdir):
+ context = mock.context.simple(str(tmpdir))
+ yield context
+ storage.release_sqlite_storage(context.model)
+
+# TODO another possible approach of writing these tests:
+# Don't create a ctx for every test.
+# Problem is, that if for every test we create a workflow that contains just one standard
+# lifecycle operation, then by the time we try to run the second test, the workflow failes since
+# the execution tries to go from 'terminated' to 'pending'.
+# And if we write a workflow that contains all the lifecycle operations, then first we need to
+# change the api of `mock.models.create_interface`, which a lot of other tests use, and second how
+# do we check all the state transition during the workflow execution in a convenient way.
+
+TYPE_URI_NAME = 'tosca.interfaces.node.lifecycle.Standard'
+SHORTHAND_NAME = 'Standard'
+
+
+def test_node_state_changes_as_a_result_of_standard_lifecycle_create(ctx, executor):
+ node = run_operation_on_node(
+ ctx, interface_name=TYPE_URI_NAME, op_name='create', executor=executor)
+ _assert_node_state_changed_as_a_result_of_standard_lifecycle_operation(node, 'create')
+
+
+def test_node_state_changes_as_a_result_of_standard_lifecycle_configure(ctx, executor):
+ node = run_operation_on_node(
+ ctx, interface_name=TYPE_URI_NAME, op_name='configure', executor=executor)
+ _assert_node_state_changed_as_a_result_of_standard_lifecycle_operation(node, 'configure')
+
+
+def test_node_state_changes_as_a_result_of_standard_lifecycle_start(ctx, executor):
+ node = run_operation_on_node(
+ ctx, interface_name=TYPE_URI_NAME, op_name='start', executor=executor)
+ _assert_node_state_changed_as_a_result_of_standard_lifecycle_operation(node, 'start')
+
+
+def test_node_state_changes_as_a_result_of_standard_lifecycle_stop(ctx, executor):
+ node = run_operation_on_node(
+ ctx, interface_name=TYPE_URI_NAME, op_name='stop', executor=executor)
+ _assert_node_state_changed_as_a_result_of_standard_lifecycle_operation(node, 'stop')
+
+
+def test_node_state_changes_as_a_result_of_standard_lifecycle_delete(ctx, executor):
+ node = run_operation_on_node(
+ ctx, interface_name=TYPE_URI_NAME, op_name='delete', executor=executor)
+ _assert_node_state_changed_as_a_result_of_standard_lifecycle_operation(node, 'delete')
+
+
+def test_node_state_changes_as_a_result_of_standard_lifecycle_create_shorthand_name(ctx, executor):
+ node = run_operation_on_node(
+ ctx, interface_name=SHORTHAND_NAME, op_name='create', executor=executor)
+ _assert_node_state_changed_as_a_result_of_standard_lifecycle_operation(node, 'create')
+
+
+def test_node_state_changes_as_a_result_of_standard_lifecycle_configure_shorthand_name(
+ ctx, executor):
+ node = run_operation_on_node(
+ ctx, interface_name=SHORTHAND_NAME, op_name='configure', executor=executor)
+ _assert_node_state_changed_as_a_result_of_standard_lifecycle_operation(node, 'configure')
+
+
+def test_node_state_changes_as_a_result_of_standard_lifecycle_start_shorthand_name(ctx, executor):
+ node = run_operation_on_node(
+ ctx, interface_name=SHORTHAND_NAME, op_name='start', executor=executor)
+ _assert_node_state_changed_as_a_result_of_standard_lifecycle_operation(node, 'start')
+
+
+def test_node_state_changes_as_a_result_of_standard_lifecycle_stop_shorthand_name(ctx, executor):
+ node = run_operation_on_node(
+ ctx, interface_name=SHORTHAND_NAME, op_name='stop', executor=executor)
+ _assert_node_state_changed_as_a_result_of_standard_lifecycle_operation(node, 'stop')
+
+
+def test_node_state_changes_as_a_result_of_standard_lifecycle_delete_shorthand_name(ctx, executor):
+ node = run_operation_on_node(
+ ctx, interface_name=SHORTHAND_NAME, op_name='delete', executor=executor)
+ _assert_node_state_changed_as_a_result_of_standard_lifecycle_operation(node, 'delete')
+
+
+def test_node_state_doesnt_change_as_a_result_of_an_operation_that_is_not_standard_lifecycle1(
+ ctx, executor):
+ node = run_operation_on_node(
+ ctx, interface_name='interface_name', op_name='op_name', executor=executor)
+ assert node.state == node.INITIAL
+
+
+def test_node_state_doesnt_change_as_a_result_of_an_operation_that_is_not_standard_lifecycle2(
+ ctx, executor):
+ node = run_operation_on_node(
+ ctx, interface_name='interface_name', op_name='create', executor=executor)
+ assert node.state == node.INITIAL
+
+
+def run_operation_on_node(ctx, op_name, interface_name, executor):
+ node = ctx.model.node.get_by_name(mock.models.DEPENDENCY_NODE_NAME)
+ interface = mock.models.create_interface(
+ service=node.service,
+ interface_name=interface_name,
+ operation_name=op_name,
+ operation_kwargs=dict(function='{name}.{func.__name__}'.format(name=__name__, func=func)))
+ node.interfaces[interface.name] = interface
+ graph_compiler.GraphCompiler(ctx, ThreadExecutor).compile(
+ single_operation_workflow(ctx, node=node, interface_name=interface_name, op_name=op_name)
+ )
+
+ eng = engine.Engine(executors={executor.__class__: executor})
+ eng.execute(ctx)
+ return node
+
+
+def run_standard_lifecycle_operation_on_node(ctx, op_name, executor):
+ return run_operation_on_node(ctx,
+ interface_name='aria.interfaces.lifecycle.Standard',
+ op_name=op_name,
+ executor=executor)
+
+
+def _assert_node_state_changed_as_a_result_of_standard_lifecycle_operation(node, op_name):
+ assert global_test_dict['transitional_state'] == NodeBase._OP_TO_STATE[op_name]['transitional']
+ assert node.state == NodeBase._OP_TO_STATE[op_name]['finished']
+
+
+@workflow
+def single_operation_workflow(graph, node, interface_name, op_name, **_):
+ graph.add_tasks(api.task.OperationTask(
+ node,
+ interface_name=interface_name,
+ operation_name=op_name))
+
+
+@operation
+def func(ctx):
+ global_test_dict['transitional_state'] = ctx.node.state
+
+
+@pytest.fixture
+def executor():
+ result = ThreadExecutor()
+ try:
+ yield result
+ finally:
+ result.close()
diff --git a/azure/aria/aria-extension-cloudify/src/aria/tests/orchestrator/workflows/core/test_task.py b/azure/aria/aria-extension-cloudify/src/aria/tests/orchestrator/workflows/core/test_task.py
new file mode 100644
index 0000000..2b3f7d7
--- /dev/null
+++ b/azure/aria/aria-extension-cloudify/src/aria/tests/orchestrator/workflows/core/test_task.py
@@ -0,0 +1,153 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+from datetime import (
+ datetime,
+ timedelta
+)
+
+import pytest
+
+from aria.orchestrator.context import workflow as workflow_context
+from aria.orchestrator.workflows import (
+ api,
+ exceptions,
+)
+from aria.modeling import models
+
+from tests import mock, storage
+
+NODE_INTERFACE_NAME = 'Standard'
+NODE_OPERATION_NAME = 'create'
+RELATIONSHIP_INTERFACE_NAME = 'Configure'
+RELATIONSHIP_OPERATION_NAME = 'pre_configure'
+
+
+@pytest.fixture
+def ctx(tmpdir):
+ context = mock.context.simple(str(tmpdir))
+
+ relationship = context.model.relationship.list()[0]
+ interface = mock.models.create_interface(
+ relationship.source_node.service,
+ RELATIONSHIP_INTERFACE_NAME,
+ RELATIONSHIP_OPERATION_NAME,
+ operation_kwargs=dict(function='test')
+ )
+ relationship.interfaces[interface.name] = interface
+ context.model.relationship.update(relationship)
+
+ node = context.model.node.get_by_name(mock.models.DEPENDENCY_NODE_NAME)
+ interface = mock.models.create_interface(
+ node.service,
+ NODE_INTERFACE_NAME,
+ NODE_OPERATION_NAME,
+ operation_kwargs=dict(function='test')
+ )
+ node.interfaces[interface.name] = interface
+ context.model.node.update(node)
+
+ yield context
+ storage.release_sqlite_storage(context.model)
+
+
+class TestOperationTask(object):
+
+ def _create_node_operation_task(self, ctx, node):
+ with workflow_context.current.push(ctx):
+ api_task = api.task.OperationTask(
+ node,
+ interface_name=NODE_INTERFACE_NAME,
+ operation_name=NODE_OPERATION_NAME)
+ model_task = models.Task.from_api_task(api_task, None)
+ return api_task, model_task
+
+ def _create_relationship_operation_task(self, ctx, relationship):
+ with workflow_context.current.push(ctx):
+ api_task = api.task.OperationTask(
+ relationship,
+ interface_name=RELATIONSHIP_INTERFACE_NAME,
+ operation_name=RELATIONSHIP_OPERATION_NAME)
+ core_task = models.Task.from_api_task(api_task, None)
+ return api_task, core_task
+
+ def test_node_operation_task_creation(self, ctx):
+ storage_plugin = mock.models.create_plugin('p1', '0.1')
+ storage_plugin_other = mock.models.create_plugin('p0', '0.0')
+ ctx.model.plugin.put(storage_plugin)
+ ctx.model.plugin.put(storage_plugin_other)
+ node = ctx.model.node.get_by_name(mock.models.DEPENDENCY_NODE_NAME)
+ interface = mock.models.create_interface(
+ node.service,
+ NODE_INTERFACE_NAME,
+ NODE_OPERATION_NAME,
+ operation_kwargs=dict(plugin=storage_plugin, function='test')
+ )
+ node.interfaces[interface.name] = interface
+ ctx.model.node.update(node)
+ api_task, model_task = self._create_node_operation_task(ctx, node)
+ assert model_task.name == api_task.name
+ assert model_task.function == api_task.function
+ assert model_task.actor == api_task.actor == node
+ assert model_task.arguments == api_task.arguments
+ assert model_task.plugin == storage_plugin
+
+ def test_relationship_operation_task_creation(self, ctx):
+ relationship = ctx.model.relationship.list()[0]
+ ctx.model.relationship.update(relationship)
+ _, model_task = self._create_relationship_operation_task(
+ ctx, relationship)
+ assert model_task.actor == relationship
+
+ @pytest.mark.skip("Currently not supported for model tasks")
+ def test_operation_task_edit_locked_attribute(self, ctx):
+ node = ctx.model.node.get_by_name(mock.models.DEPENDENCY_NODE_NAME)
+
+ _, core_task = self._create_node_operation_task(ctx, node)
+ now = datetime.utcnow()
+ with pytest.raises(exceptions.TaskException):
+ core_task.status = core_task.STARTED
+ with pytest.raises(exceptions.TaskException):
+ core_task.started_at = now
+ with pytest.raises(exceptions.TaskException):
+ core_task.ended_at = now
+ with pytest.raises(exceptions.TaskException):
+ core_task.attempts_count = 2
+ with pytest.raises(exceptions.TaskException):
+ core_task.due_at = now
+
+ @pytest.mark.skip("Currently not supported for model tasks")
+ def test_operation_task_edit_attributes(self, ctx):
+ node = ctx.model.node.get_by_name(mock.models.DEPENDENCY_NODE_NAME)
+
+ _, core_task = self._create_node_operation_task(ctx, node)
+ future_time = datetime.utcnow() + timedelta(seconds=3)
+
+ with core_task._update():
+ core_task.status = core_task.STARTED
+ core_task.started_at = future_time
+ core_task.ended_at = future_time
+ core_task.attempts_count = 2
+ core_task.due_at = future_time
+ assert core_task.status != core_task.STARTED
+ assert core_task.started_at != future_time
+ assert core_task.ended_at != future_time
+ assert core_task.attempts_count != 2
+ assert core_task.due_at != future_time
+
+ assert core_task.status == core_task.STARTED
+ assert core_task.started_at == future_time
+ assert core_task.ended_at == future_time
+ assert core_task.attempts_count == 2
+ assert core_task.due_at == future_time
diff --git a/azure/aria/aria-extension-cloudify/src/aria/tests/orchestrator/workflows/core/test_task_graph_into_execution_graph.py b/azure/aria/aria-extension-cloudify/src/aria/tests/orchestrator/workflows/core/test_task_graph_into_execution_graph.py
new file mode 100644
index 0000000..e24c901
--- /dev/null
+++ b/azure/aria/aria-extension-cloudify/src/aria/tests/orchestrator/workflows/core/test_task_graph_into_execution_graph.py
@@ -0,0 +1,172 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from networkx import topological_sort, DiGraph
+
+from aria.modeling import models
+from aria.orchestrator import context
+from aria.orchestrator.workflows import api
+from aria.orchestrator.workflows.core import graph_compiler
+from aria.orchestrator.workflows.executor import base
+from tests import mock
+from tests import storage
+
+
+def test_task_graph_into_execution_graph(tmpdir):
+ interface_name = 'Standard'
+ op1_name, op2_name, op3_name = 'create', 'configure', 'start'
+ workflow_context = mock.context.simple(str(tmpdir))
+ node = workflow_context.model.node.get_by_name(mock.models.DEPENDENCY_NODE_NAME)
+ interface = mock.models.create_interface(
+ node.service,
+ interface_name,
+ op1_name,
+ operation_kwargs=dict(function='test')
+ )
+ interface.operations[op2_name] = mock.models.create_operation(op2_name) # pylint: disable=unsubscriptable-object
+ interface.operations[op3_name] = mock.models.create_operation(op3_name) # pylint: disable=unsubscriptable-object
+ node.interfaces[interface.name] = interface
+ workflow_context.model.node.update(node)
+
+ def sub_workflow(name, **_):
+ return api.task_graph.TaskGraph(name)
+
+ with context.workflow.current.push(workflow_context):
+ test_task_graph = api.task.WorkflowTask(sub_workflow, name='test_task_graph')
+ simple_before_task = api.task.OperationTask(
+ node,
+ interface_name=interface_name,
+ operation_name=op1_name)
+ simple_after_task = api.task.OperationTask(
+ node,
+ interface_name=interface_name,
+ operation_name=op1_name)
+
+ inner_task_graph = api.task.WorkflowTask(sub_workflow, name='test_inner_task_graph')
+ inner_task_1 = api.task.OperationTask(
+ node,
+ interface_name=interface_name,
+ operation_name=op1_name)
+ inner_task_2 = api.task.OperationTask(
+ node,
+ interface_name=interface_name,
+ operation_name=op2_name)
+ inner_task_3 = api.task.OperationTask(
+ node,
+ interface_name=interface_name,
+ operation_name=op3_name)
+ inner_task_graph.add_tasks(inner_task_1)
+ inner_task_graph.add_tasks(inner_task_2)
+ inner_task_graph.add_tasks(inner_task_3)
+ inner_task_graph.add_dependency(inner_task_2, inner_task_1)
+ inner_task_graph.add_dependency(inner_task_3, inner_task_1)
+ inner_task_graph.add_dependency(inner_task_3, inner_task_2)
+
+ test_task_graph.add_tasks(simple_before_task)
+ test_task_graph.add_tasks(simple_after_task)
+ test_task_graph.add_tasks(inner_task_graph)
+ test_task_graph.add_dependency(inner_task_graph, simple_before_task)
+ test_task_graph.add_dependency(simple_after_task, inner_task_graph)
+
+ compiler = graph_compiler.GraphCompiler(workflow_context, base.StubTaskExecutor)
+ compiler.compile(test_task_graph)
+
+ execution_tasks = topological_sort(_graph(workflow_context.execution.tasks))
+
+ assert len(execution_tasks) == 9
+
+ expected_tasks_names = [
+ '{0}-Start'.format(test_task_graph.id),
+ simple_before_task.id,
+ '{0}-Start'.format(inner_task_graph.id),
+ inner_task_1.id,
+ inner_task_2.id,
+ inner_task_3.id,
+ '{0}-End'.format(inner_task_graph.id),
+ simple_after_task.id,
+ '{0}-End'.format(test_task_graph.id)
+ ]
+
+ assert expected_tasks_names == [compiler._model_to_api_id[t.id] for t in execution_tasks]
+ assert all(isinstance(task, models.Task) for task in execution_tasks)
+ execution_tasks = iter(execution_tasks)
+
+ _assert_tasks(
+ iter(execution_tasks),
+ iter([simple_after_task, inner_task_1, inner_task_2, inner_task_3, simple_after_task])
+ )
+ storage.release_sqlite_storage(workflow_context.model)
+
+
+def _assert_tasks(execution_tasks, api_tasks):
+ start_workflow_exec_task = next(execution_tasks)
+ assert start_workflow_exec_task._stub_type == models.Task.START_WORKFLOW
+
+ before_exec_task = next(execution_tasks)
+ simple_before_task = next(api_tasks)
+ _assert_execution_is_api_task(before_exec_task, simple_before_task)
+ assert before_exec_task.dependencies == [start_workflow_exec_task]
+
+ start_subworkflow_exec_task = next(execution_tasks)
+ assert start_subworkflow_exec_task._stub_type == models.Task.START_SUBWROFKLOW
+ assert start_subworkflow_exec_task.dependencies == [before_exec_task]
+
+ inner_exec_task_1 = next(execution_tasks)
+ inner_task_1 = next(api_tasks)
+ _assert_execution_is_api_task(inner_exec_task_1, inner_task_1)
+ assert inner_exec_task_1.dependencies == [start_subworkflow_exec_task]
+
+ inner_exec_task_2 = next(execution_tasks)
+ inner_task_2 = next(api_tasks)
+ _assert_execution_is_api_task(inner_exec_task_2, inner_task_2)
+ assert inner_exec_task_2.dependencies == [inner_exec_task_1]
+
+ inner_exec_task_3 = next(execution_tasks)
+ inner_task_3 = next(api_tasks)
+ _assert_execution_is_api_task(inner_exec_task_3, inner_task_3)
+ assert sorted(inner_exec_task_3.dependencies) == sorted([inner_exec_task_1, inner_exec_task_2])
+
+ end_subworkflow_exec_task = next(execution_tasks)
+ assert end_subworkflow_exec_task._stub_type == models.Task.END_SUBWORKFLOW
+ assert end_subworkflow_exec_task.dependencies == [inner_exec_task_3]
+
+ after_exec_task = next(execution_tasks)
+ simple_after_task = next(api_tasks)
+ _assert_execution_is_api_task(after_exec_task, simple_after_task)
+ assert after_exec_task.dependencies == [end_subworkflow_exec_task]
+
+ end_workflow_exec_task = next(execution_tasks)
+ assert end_workflow_exec_task._stub_type == models.Task.END_WORKFLOW
+ assert end_workflow_exec_task.dependencies == [after_exec_task]
+
+
+def _assert_execution_is_api_task(execution_task, api_task):
+ assert execution_task.name == api_task.name
+ assert execution_task.function == api_task.function
+ assert execution_task.actor == api_task.actor
+ assert execution_task.arguments == api_task.arguments
+
+
+def _get_task_by_name(task_name, graph):
+ return graph.node[task_name]['task']
+
+
+def _graph(tasks):
+ graph = DiGraph()
+ for task in tasks:
+ for dependency in task.dependencies:
+ graph.add_edge(dependency, task)
+
+ return graph