summaryrefslogtreecommitdiffstats
path: root/azure/aria/aria-extension-cloudify/src/aria/tests/orchestrator/context/test_operation.py
diff options
context:
space:
mode:
Diffstat (limited to 'azure/aria/aria-extension-cloudify/src/aria/tests/orchestrator/context/test_operation.py')
-rw-r--r--azure/aria/aria-extension-cloudify/src/aria/tests/orchestrator/context/test_operation.py498
1 files changed, 498 insertions, 0 deletions
diff --git a/azure/aria/aria-extension-cloudify/src/aria/tests/orchestrator/context/test_operation.py b/azure/aria/aria-extension-cloudify/src/aria/tests/orchestrator/context/test_operation.py
new file mode 100644
index 0000000..111e121
--- /dev/null
+++ b/azure/aria/aria-extension-cloudify/src/aria/tests/orchestrator/context/test_operation.py
@@ -0,0 +1,498 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import os
+import time
+
+import pytest
+
+from aria.orchestrator.workflows.executor import process, thread
+
+from aria import (
+ workflow,
+ operation,
+)
+from aria.orchestrator import context
+from aria.orchestrator.workflows import api
+
+import tests
+from tests import (
+ mock,
+ storage,
+ helpers
+)
+from . import (
+ op_path,
+ execute,
+)
+
+
+@pytest.fixture
+def ctx(tmpdir):
+ context = mock.context.simple(
+ str(tmpdir),
+ context_kwargs=dict(workdir=str(tmpdir.join('workdir')))
+ )
+ yield context
+ storage.release_sqlite_storage(context.model)
+
+
+@pytest.fixture
+def thread_executor():
+ result = thread.ThreadExecutor()
+ try:
+ yield result
+ finally:
+ result.close()
+
+
+@pytest.fixture
+def dataholder(tmpdir):
+ dataholder_path = str(tmpdir.join('dataholder'))
+ holder = helpers.FilesystemDataHolder(dataholder_path)
+ return holder
+
+
+def test_node_operation_task_execution(ctx, thread_executor, dataholder):
+ interface_name = 'Standard'
+ operation_name = 'create'
+
+ arguments = {'putput': True, 'holder_path': dataholder.path}
+ node = ctx.model.node.get_by_name(mock.models.DEPENDENCY_NODE_NAME)
+ interface = mock.models.create_interface(
+ node.service,
+ interface_name,
+ operation_name,
+ operation_kwargs=dict(function=op_path(basic_node_operation, module_path=__name__),
+ arguments=arguments)
+ )
+ node.interfaces[interface.name] = interface
+ ctx.model.node.update(node)
+
+ @workflow
+ def basic_workflow(graph, **_):
+ graph.add_tasks(
+ api.task.OperationTask(
+ node,
+ interface_name=interface_name,
+ operation_name=operation_name,
+ arguments=arguments
+ )
+ )
+
+ execute(workflow_func=basic_workflow, workflow_context=ctx, executor=thread_executor)
+
+ assert dataholder['ctx_name'] == context.operation.NodeOperationContext.__name__
+
+ # Task bases assertions
+ assert dataholder['actor_name'] == node.name
+ assert dataholder['task_name'] == api.task.OperationTask.NAME_FORMAT.format(
+ type='node',
+ name=node.name,
+ interface=interface_name,
+ operation=operation_name
+ )
+ operations = interface.operations
+ assert len(operations) == 1
+ assert dataholder['function'] == operations.values()[0].function # pylint: disable=no-member
+ assert dataholder['arguments']['putput'] is True
+
+ # Context based attributes (sugaring)
+ assert dataholder['template_name'] == node.node_template.name
+ assert dataholder['node_name'] == node.name
+
+
+def test_relationship_operation_task_execution(ctx, thread_executor, dataholder):
+ interface_name = 'Configure'
+ operation_name = 'post_configure'
+
+ arguments = {'putput': True, 'holder_path': dataholder.path}
+ relationship = ctx.model.relationship.list()[0]
+ interface = mock.models.create_interface(
+ relationship.source_node.service,
+ interface_name,
+ operation_name,
+ operation_kwargs=dict(function=op_path(basic_relationship_operation, module_path=__name__),
+ arguments=arguments),
+ )
+
+ relationship.interfaces[interface.name] = interface
+ ctx.model.relationship.update(relationship)
+
+ @workflow
+ def basic_workflow(graph, **_):
+ graph.add_tasks(
+ api.task.OperationTask(
+ relationship,
+ interface_name=interface_name,
+ operation_name=operation_name,
+ arguments=arguments
+ )
+ )
+
+ execute(workflow_func=basic_workflow, workflow_context=ctx, executor=thread_executor)
+
+ assert dataholder['ctx_name'] == context.operation.RelationshipOperationContext.__name__
+
+ # Task bases assertions
+ assert dataholder['actor_name'] == relationship.name
+ assert interface_name in dataholder['task_name']
+ operations = interface.operations
+ assert dataholder['function'] == operations.values()[0].function # pylint: disable=no-member
+ assert dataholder['arguments']['putput'] is True
+
+ # Context based attributes (sugaring)
+ dependency_node_template = ctx.model.node_template.get_by_name(
+ mock.models.DEPENDENCY_NODE_TEMPLATE_NAME)
+ dependency_node = ctx.model.node.get_by_name(mock.models.DEPENDENCY_NODE_NAME)
+ dependent_node_template = ctx.model.node_template.get_by_name(
+ mock.models.DEPENDENT_NODE_TEMPLATE_NAME)
+ dependent_node = ctx.model.node.get_by_name(mock.models.DEPENDENT_NODE_NAME)
+
+ assert dataholder['target_node_template_name'] == dependency_node_template.name
+ assert dataholder['target_node_name'] == dependency_node.name
+ assert dataholder['relationship_name'] == relationship.name
+ assert dataholder['source_node_template_name'] == dependent_node_template.name
+ assert dataholder['source_node_name'] == dependent_node.name
+
+
+def test_invalid_task_operation_id(ctx, thread_executor, dataholder):
+ """
+ Checks that the right id is used. The task created with id == 1, thus running the task on
+ node with id == 2. will check that indeed the node uses the correct id.
+ :param ctx:
+ :param thread_executor:
+ :return:
+ """
+ interface_name = 'Standard'
+ operation_name = 'create'
+
+ other_node, node = ctx.model.node.list()
+ assert other_node.id == 1
+ assert node.id == 2
+
+ interface = mock.models.create_interface(
+ node.service,
+ interface_name=interface_name,
+ operation_name=operation_name,
+ operation_kwargs=dict(function=op_path(get_node_id, module_path=__name__),
+ arguments={'holder_path': dataholder.path})
+ )
+ node.interfaces[interface.name] = interface
+ ctx.model.node.update(node)
+
+ @workflow
+ def basic_workflow(graph, **_):
+ graph.add_tasks(
+ api.task.OperationTask(
+ node,
+ interface_name=interface_name,
+ operation_name=operation_name,
+ )
+ )
+
+ execute(workflow_func=basic_workflow, workflow_context=ctx, executor=thread_executor)
+
+ op_node_id = dataholder[api.task.OperationTask.NAME_FORMAT.format(
+ type='node',
+ name=node.name,
+ interface=interface_name,
+ operation=operation_name
+ )]
+ assert op_node_id == node.id
+ assert op_node_id != other_node.id
+
+
+def test_plugin_workdir(ctx, thread_executor, tmpdir):
+ interface_name = 'Standard'
+ operation_name = 'create'
+
+ plugin = mock.models.create_plugin()
+ ctx.model.plugin.put(plugin)
+ node = ctx.model.node.get_by_name(mock.models.DEPENDENCY_NODE_NAME)
+ filename = 'test_file'
+ content = 'file content'
+ arguments = {'filename': filename, 'content': content}
+ interface = mock.models.create_interface(
+ node.service,
+ interface_name,
+ operation_name,
+ operation_kwargs=dict(
+ function='{0}.{1}'.format(__name__, _test_plugin_workdir.__name__),
+ plugin=plugin,
+ arguments=arguments)
+ )
+ node.interfaces[interface.name] = interface
+ ctx.model.node.update(node)
+
+ @workflow
+ def basic_workflow(graph, **_):
+ graph.add_tasks(api.task.OperationTask(
+ node,
+ interface_name=interface_name,
+ operation_name=operation_name,
+ arguments=arguments))
+
+ execute(workflow_func=basic_workflow, workflow_context=ctx, executor=thread_executor)
+ expected_file = tmpdir.join('workdir', 'plugins', str(ctx.service.id),
+ plugin.name,
+ filename)
+ assert expected_file.read() == content
+
+
+@pytest.fixture(params=[
+ (thread.ThreadExecutor, {}),
+ (process.ProcessExecutor, {'python_path': [tests.ROOT_DIR]}),
+])
+def executor(request):
+ ex_cls, kwargs = request.param
+ ex = ex_cls(**kwargs)
+ try:
+ yield ex
+ finally:
+ ex.close()
+
+
+def test_node_operation_logging(ctx, executor):
+ interface_name, operation_name = mock.operations.NODE_OPERATIONS_INSTALL[0]
+
+ node = ctx.model.node.get_by_name(mock.models.DEPENDENCY_NODE_NAME)
+
+ arguments = {
+ 'op_start': 'op_start',
+ 'op_end': 'op_end',
+ }
+ interface = mock.models.create_interface(
+ node.service,
+ interface_name,
+ operation_name,
+ operation_kwargs=dict(
+ function=op_path(logged_operation, module_path=__name__),
+ arguments=arguments)
+ )
+ node.interfaces[interface.name] = interface
+ ctx.model.node.update(node)
+
+ @workflow
+ def basic_workflow(graph, **_):
+ graph.add_tasks(
+ api.task.OperationTask(
+ node,
+ interface_name=interface_name,
+ operation_name=operation_name,
+ arguments=arguments
+ )
+ )
+ execute(workflow_func=basic_workflow, workflow_context=ctx, executor=executor)
+ _assert_loggins(ctx, arguments)
+
+
+def test_relationship_operation_logging(ctx, executor):
+ interface_name, operation_name = mock.operations.RELATIONSHIP_OPERATIONS_INSTALL[0]
+
+ relationship = ctx.model.relationship.list()[0]
+ arguments = {
+ 'op_start': 'op_start',
+ 'op_end': 'op_end',
+ }
+ interface = mock.models.create_interface(
+ relationship.source_node.service,
+ interface_name,
+ operation_name,
+ operation_kwargs=dict(function=op_path(logged_operation, module_path=__name__),
+ arguments=arguments)
+ )
+ relationship.interfaces[interface.name] = interface
+ ctx.model.relationship.update(relationship)
+
+ @workflow
+ def basic_workflow(graph, **_):
+ graph.add_tasks(
+ api.task.OperationTask(
+ relationship,
+ interface_name=interface_name,
+ operation_name=operation_name,
+ arguments=arguments
+ )
+ )
+
+ execute(workflow_func=basic_workflow, workflow_context=ctx, executor=executor)
+ _assert_loggins(ctx, arguments)
+
+
+def test_attribute_consumption(ctx, executor, dataholder):
+ # region Updating node operation
+ node_int_name, node_op_name = mock.operations.NODE_OPERATIONS_INSTALL[0]
+
+ source_node = ctx.model.node.get_by_name(mock.models.DEPENDENT_NODE_NAME)
+
+ arguments = {'dict_': {'key': 'value'},
+ 'set_test_dict': {'key2': 'value2'}}
+ interface = mock.models.create_interface(
+ source_node.service,
+ node_int_name,
+ node_op_name,
+ operation_kwargs=dict(
+ function=op_path(attribute_altering_operation, module_path=__name__),
+ arguments=arguments)
+ )
+ source_node.interfaces[interface.name] = interface
+ ctx.model.node.update(source_node)
+ # endregion
+
+ # region updating relationship operation
+ rel_int_name, rel_op_name = mock.operations.RELATIONSHIP_OPERATIONS_INSTALL[2]
+
+ relationship = ctx.model.relationship.list()[0]
+ interface = mock.models.create_interface(
+ relationship.source_node.service,
+ rel_int_name,
+ rel_op_name,
+ operation_kwargs=dict(
+ function=op_path(attribute_consuming_operation, module_path=__name__),
+ arguments={'holder_path': dataholder.path}
+ )
+ )
+ relationship.interfaces[interface.name] = interface
+ ctx.model.relationship.update(relationship)
+ # endregion
+
+ @workflow
+ def basic_workflow(graph, **_):
+ graph.sequence(
+ api.task.OperationTask(
+ source_node,
+ interface_name=node_int_name,
+ operation_name=node_op_name,
+ arguments=arguments
+ ),
+ api.task.OperationTask(
+ relationship,
+ interface_name=rel_int_name,
+ operation_name=rel_op_name,
+ )
+ )
+
+ execute(workflow_func=basic_workflow, workflow_context=ctx, executor=executor)
+ target_node = ctx.model.node.get_by_name(mock.models.DEPENDENCY_NODE_NAME)
+
+ assert len(source_node.attributes) == len(target_node.attributes) == 2
+ assert source_node.attributes['key'] != target_node.attributes['key']
+ assert source_node.attributes['key'].value == \
+ target_node.attributes['key'].value == \
+ dataholder['key'] == 'value'
+
+ assert source_node.attributes['key2'] != target_node.attributes['key2']
+ assert source_node.attributes['key2'].value == \
+ target_node.attributes['key2'].value == \
+ dataholder['key2'] == 'value2'
+
+
+def _assert_loggins(ctx, arguments):
+ # The logs should contain the following: Workflow Start, Operation Start, custom operation
+ # log string (op_start), custom operation log string (op_end), Operation End, Workflow End.
+
+ executions = ctx.model.execution.list()
+ assert len(executions) == 1
+ execution = executions[0]
+
+ tasks = ctx.model.task.list(filters={'_stub_type': None})
+ assert len(tasks) == 1
+ task = tasks[0]
+ assert len(task.logs) == 4
+
+ logs = ctx.model.log.list()
+ assert len(logs) == len(execution.logs) == 6
+ assert set(logs) == set(execution.logs)
+
+ assert all(l.execution == execution for l in logs)
+ assert all(l in logs and l.task == task for l in task.logs)
+
+ op_start_log = [l for l in logs if arguments['op_start'] in l.msg and l.level.lower() == 'info']
+ assert len(op_start_log) == 1
+ op_start_log = op_start_log[0]
+
+ op_end_log = [l for l in logs if arguments['op_end'] in l.msg and l.level.lower() == 'debug']
+ assert len(op_end_log) == 1
+ op_end_log = op_end_log[0]
+
+ assert op_start_log.created_at < op_end_log.created_at
+
+
+@operation
+def logged_operation(ctx, **_):
+ ctx.logger.info(ctx.task.arguments['op_start'].value)
+ # enables to check the relation between the created_at field properly
+ time.sleep(1)
+ ctx.logger.debug(ctx.task.arguments['op_end'].value)
+
+
+@operation
+def basic_node_operation(ctx, holder_path, **_):
+ holder = helpers.FilesystemDataHolder(holder_path)
+
+ operation_common(ctx, holder)
+ holder['template_name'] = ctx.node_template.name
+ holder['node_name'] = ctx.node.name
+
+
+@operation
+def basic_relationship_operation(ctx, holder_path, **_):
+ holder = helpers.FilesystemDataHolder(holder_path)
+
+ operation_common(ctx, holder)
+ holder['target_node_template_name'] = ctx.target_node_template.name
+ holder['target_node_name'] = ctx.target_node.name
+ holder['relationship_name'] = ctx.relationship.name
+ holder['source_node_template_name'] = ctx.source_node_template.name
+ holder['source_node_name'] = ctx.source_node.name
+
+
+def operation_common(ctx, holder):
+ holder['ctx_name'] = ctx.__class__.__name__
+
+ holder['actor_name'] = ctx.task.actor.name
+ holder['task_name'] = ctx.task.name
+ holder['function'] = ctx.task.function
+ holder['arguments'] = dict(i.unwrapped for i in ctx.task.arguments.itervalues())
+
+
+@operation
+def get_node_id(ctx, holder_path, **_):
+ helpers.FilesystemDataHolder(holder_path)[ctx.name] = ctx.node.id
+
+
+@operation
+def _test_plugin_workdir(ctx, filename, content):
+ with open(os.path.join(ctx.plugin_workdir, filename), 'w') as f:
+ f.write(content)
+
+
+@operation
+def attribute_altering_operation(ctx, dict_, set_test_dict, **_):
+ ctx.node.attributes.update(dict_)
+
+ for key, value in set_test_dict.items():
+ ctx.node.attributes[key] = value
+
+
+@operation
+def attribute_consuming_operation(ctx, holder_path, **_):
+ holder = helpers.FilesystemDataHolder(holder_path)
+ ctx.target_node.attributes.update(ctx.source_node.attributes)
+ holder.update(**ctx.target_node.attributes)
+
+ ctx.target_node.attributes['key2'] = ctx.source_node.attributes['key2']
+ holder['key2'] = ctx.target_node.attributes['key2']