summaryrefslogtreecommitdiffstats
path: root/azure/aria/aria-extension-cloudify/src/aria/tests/orchestrator/workflows/api/test_task.py
diff options
context:
space:
mode:
Diffstat (limited to 'azure/aria/aria-extension-cloudify/src/aria/tests/orchestrator/workflows/api/test_task.py')
-rw-r--r--azure/aria/aria-extension-cloudify/src/aria/tests/orchestrator/workflows/api/test_task.py223
1 files changed, 223 insertions, 0 deletions
diff --git a/azure/aria/aria-extension-cloudify/src/aria/tests/orchestrator/workflows/api/test_task.py b/azure/aria/aria-extension-cloudify/src/aria/tests/orchestrator/workflows/api/test_task.py
new file mode 100644
index 0000000..9d91b6b
--- /dev/null
+++ b/azure/aria/aria-extension-cloudify/src/aria/tests/orchestrator/workflows/api/test_task.py
@@ -0,0 +1,223 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+import pytest
+
+from aria.orchestrator import context
+from aria.orchestrator.workflows import api
+
+from tests import mock, storage
+
+
+@pytest.fixture
+def ctx(tmpdir):
+ """
+ Create the following graph in storage:
+ dependency_node <------ dependent_node
+ :return:
+ """
+ simple_context = mock.context.simple(str(tmpdir), inmemory=False)
+ simple_context.model.execution.put(mock.models.create_execution(simple_context.service))
+ yield simple_context
+ storage.release_sqlite_storage(simple_context.model)
+
+
+class TestOperationTask(object):
+
+ def test_node_operation_task_creation(self, ctx):
+ interface_name = 'test_interface'
+ operation_name = 'create'
+
+ plugin = mock.models.create_plugin('test_plugin', '0.1')
+ ctx.model.node.update(plugin)
+
+ arguments = {'test_input': True}
+
+ interface = mock.models.create_interface(
+ ctx.service,
+ interface_name,
+ operation_name,
+ operation_kwargs=dict(plugin=plugin,
+ function='op_path',
+ arguments=arguments),)
+
+ node = ctx.model.node.get_by_name(mock.models.DEPENDENT_NODE_NAME)
+ node.interfaces[interface_name] = interface
+ ctx.model.node.update(node)
+ max_attempts = 10
+ retry_interval = 10
+ ignore_failure = True
+
+ with context.workflow.current.push(ctx):
+ api_task = api.task.OperationTask(
+ node,
+ interface_name=interface_name,
+ operation_name=operation_name,
+ arguments=arguments,
+ max_attempts=max_attempts,
+ retry_interval=retry_interval,
+ ignore_failure=ignore_failure)
+
+ assert api_task.name == api.task.OperationTask.NAME_FORMAT.format(
+ type='node',
+ name=node.name,
+ interface=interface_name,
+ operation=operation_name
+ )
+ assert api_task.function == 'op_path'
+ assert api_task.actor == node
+ assert api_task.arguments['test_input'].value is True
+ assert api_task.retry_interval == retry_interval
+ assert api_task.max_attempts == max_attempts
+ assert api_task.ignore_failure == ignore_failure
+ assert api_task.plugin.name == 'test_plugin'
+
+ def test_source_relationship_operation_task_creation(self, ctx):
+ interface_name = 'test_interface'
+ operation_name = 'preconfigure'
+
+ plugin = mock.models.create_plugin('test_plugin', '0.1')
+ ctx.model.plugin.update(plugin)
+
+ arguments = {'test_input': True}
+
+ interface = mock.models.create_interface(
+ ctx.service,
+ interface_name,
+ operation_name,
+ operation_kwargs=dict(plugin=plugin,
+ function='op_path',
+ arguments=arguments)
+ )
+
+ relationship = ctx.model.relationship.list()[0]
+ relationship.interfaces[interface.name] = interface
+ max_attempts = 10
+ retry_interval = 10
+
+ with context.workflow.current.push(ctx):
+ api_task = api.task.OperationTask(
+ relationship,
+ interface_name=interface_name,
+ operation_name=operation_name,
+ arguments=arguments,
+ max_attempts=max_attempts,
+ retry_interval=retry_interval)
+
+ assert api_task.name == api.task.OperationTask.NAME_FORMAT.format(
+ type='relationship',
+ name=relationship.name,
+ interface=interface_name,
+ operation=operation_name
+ )
+ assert api_task.function == 'op_path'
+ assert api_task.actor == relationship
+ assert api_task.arguments['test_input'].value is True
+ assert api_task.retry_interval == retry_interval
+ assert api_task.max_attempts == max_attempts
+ assert api_task.plugin.name == 'test_plugin'
+
+ def test_target_relationship_operation_task_creation(self, ctx):
+ interface_name = 'test_interface'
+ operation_name = 'preconfigure'
+
+ plugin = mock.models.create_plugin('test_plugin', '0.1')
+ ctx.model.node.update(plugin)
+
+ arguments = {'test_input': True}
+
+ interface = mock.models.create_interface(
+ ctx.service,
+ interface_name,
+ operation_name,
+ operation_kwargs=dict(plugin=plugin,
+ function='op_path',
+ arguments=arguments)
+ )
+
+ relationship = ctx.model.relationship.list()[0]
+ relationship.interfaces[interface.name] = interface
+ max_attempts = 10
+ retry_interval = 10
+
+ with context.workflow.current.push(ctx):
+ api_task = api.task.OperationTask(
+ relationship,
+ interface_name=interface_name,
+ operation_name=operation_name,
+ arguments=arguments,
+ max_attempts=max_attempts,
+ retry_interval=retry_interval)
+
+ assert api_task.name == api.task.OperationTask.NAME_FORMAT.format(
+ type='relationship',
+ name=relationship.name,
+ interface=interface_name,
+ operation=operation_name
+ )
+ assert api_task.function == 'op_path'
+ assert api_task.actor == relationship
+ assert api_task.arguments['test_input'].value is True
+ assert api_task.retry_interval == retry_interval
+ assert api_task.max_attempts == max_attempts
+ assert api_task.plugin.name == 'test_plugin'
+
+ def test_operation_task_default_values(self, ctx):
+ interface_name = 'test_interface'
+ operation_name = 'create'
+
+ plugin = mock.models.create_plugin('package', '0.1')
+ ctx.model.node.update(plugin)
+
+ dependency_node = ctx.model.node.get_by_name(mock.models.DEPENDENCY_NODE_NAME)
+
+ interface = mock.models.create_interface(
+ ctx.service,
+ interface_name,
+ operation_name,
+ operation_kwargs=dict(plugin=plugin,
+ function='op_path'))
+ dependency_node.interfaces[interface_name] = interface
+
+ with context.workflow.current.push(ctx):
+ task = api.task.OperationTask(
+ dependency_node,
+ interface_name=interface_name,
+ operation_name=operation_name)
+
+ assert task.arguments == {}
+ assert task.retry_interval == ctx._task_retry_interval
+ assert task.max_attempts == ctx._task_max_attempts
+ assert task.ignore_failure == ctx._task_ignore_failure
+ assert task.plugin is plugin
+
+
+class TestWorkflowTask(object):
+
+ def test_workflow_task_creation(self, ctx):
+
+ workspace = {}
+
+ mock_class = type('mock_class', (object,), {'test_attribute': True})
+
+ def sub_workflow(**kwargs):
+ workspace.update(kwargs)
+ return mock_class
+
+ with context.workflow.current.push(ctx):
+ workflow_task = api.task.WorkflowTask(sub_workflow, kwarg='workflow_kwarg')
+ assert workflow_task.graph is mock_class
+ assert workflow_task.test_attribute is True