diff options
Diffstat (limited to 'azure/aria/aria-extension-cloudify/src/aria/tests/orchestrator/context/test_operation.py')
-rw-r--r-- | azure/aria/aria-extension-cloudify/src/aria/tests/orchestrator/context/test_operation.py | 498 |
1 files changed, 498 insertions, 0 deletions
diff --git a/azure/aria/aria-extension-cloudify/src/aria/tests/orchestrator/context/test_operation.py b/azure/aria/aria-extension-cloudify/src/aria/tests/orchestrator/context/test_operation.py new file mode 100644 index 0000000..111e121 --- /dev/null +++ b/azure/aria/aria-extension-cloudify/src/aria/tests/orchestrator/context/test_operation.py @@ -0,0 +1,498 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import os +import time + +import pytest + +from aria.orchestrator.workflows.executor import process, thread + +from aria import ( + workflow, + operation, +) +from aria.orchestrator import context +from aria.orchestrator.workflows import api + +import tests +from tests import ( + mock, + storage, + helpers +) +from . import ( + op_path, + execute, +) + + +@pytest.fixture +def ctx(tmpdir): + context = mock.context.simple( + str(tmpdir), + context_kwargs=dict(workdir=str(tmpdir.join('workdir'))) + ) + yield context + storage.release_sqlite_storage(context.model) + + +@pytest.fixture +def thread_executor(): + result = thread.ThreadExecutor() + try: + yield result + finally: + result.close() + + +@pytest.fixture +def dataholder(tmpdir): + dataholder_path = str(tmpdir.join('dataholder')) + holder = helpers.FilesystemDataHolder(dataholder_path) + return holder + + +def test_node_operation_task_execution(ctx, thread_executor, dataholder): + interface_name = 'Standard' + operation_name = 'create' + + arguments = {'putput': True, 'holder_path': dataholder.path} + node = ctx.model.node.get_by_name(mock.models.DEPENDENCY_NODE_NAME) + interface = mock.models.create_interface( + node.service, + interface_name, + operation_name, + operation_kwargs=dict(function=op_path(basic_node_operation, module_path=__name__), + arguments=arguments) + ) + node.interfaces[interface.name] = interface + ctx.model.node.update(node) + + @workflow + def basic_workflow(graph, **_): + graph.add_tasks( + api.task.OperationTask( + node, + interface_name=interface_name, + operation_name=operation_name, + arguments=arguments + ) + ) + + execute(workflow_func=basic_workflow, workflow_context=ctx, executor=thread_executor) + + assert dataholder['ctx_name'] == context.operation.NodeOperationContext.__name__ + + # Task bases assertions + assert dataholder['actor_name'] == node.name + assert dataholder['task_name'] == api.task.OperationTask.NAME_FORMAT.format( + type='node', + name=node.name, + interface=interface_name, + operation=operation_name + ) + operations = interface.operations + assert len(operations) == 1 + assert dataholder['function'] == operations.values()[0].function # pylint: disable=no-member + assert dataholder['arguments']['putput'] is True + + # Context based attributes (sugaring) + assert dataholder['template_name'] == node.node_template.name + assert dataholder['node_name'] == node.name + + +def test_relationship_operation_task_execution(ctx, thread_executor, dataholder): + interface_name = 'Configure' + operation_name = 'post_configure' + + arguments = {'putput': True, 'holder_path': dataholder.path} + relationship = ctx.model.relationship.list()[0] + interface = mock.models.create_interface( + relationship.source_node.service, + interface_name, + operation_name, + operation_kwargs=dict(function=op_path(basic_relationship_operation, module_path=__name__), + arguments=arguments), + ) + + relationship.interfaces[interface.name] = interface + ctx.model.relationship.update(relationship) + + @workflow + def basic_workflow(graph, **_): + graph.add_tasks( + api.task.OperationTask( + relationship, + interface_name=interface_name, + operation_name=operation_name, + arguments=arguments + ) + ) + + execute(workflow_func=basic_workflow, workflow_context=ctx, executor=thread_executor) + + assert dataholder['ctx_name'] == context.operation.RelationshipOperationContext.__name__ + + # Task bases assertions + assert dataholder['actor_name'] == relationship.name + assert interface_name in dataholder['task_name'] + operations = interface.operations + assert dataholder['function'] == operations.values()[0].function # pylint: disable=no-member + assert dataholder['arguments']['putput'] is True + + # Context based attributes (sugaring) + dependency_node_template = ctx.model.node_template.get_by_name( + mock.models.DEPENDENCY_NODE_TEMPLATE_NAME) + dependency_node = ctx.model.node.get_by_name(mock.models.DEPENDENCY_NODE_NAME) + dependent_node_template = ctx.model.node_template.get_by_name( + mock.models.DEPENDENT_NODE_TEMPLATE_NAME) + dependent_node = ctx.model.node.get_by_name(mock.models.DEPENDENT_NODE_NAME) + + assert dataholder['target_node_template_name'] == dependency_node_template.name + assert dataholder['target_node_name'] == dependency_node.name + assert dataholder['relationship_name'] == relationship.name + assert dataholder['source_node_template_name'] == dependent_node_template.name + assert dataholder['source_node_name'] == dependent_node.name + + +def test_invalid_task_operation_id(ctx, thread_executor, dataholder): + """ + Checks that the right id is used. The task created with id == 1, thus running the task on + node with id == 2. will check that indeed the node uses the correct id. + :param ctx: + :param thread_executor: + :return: + """ + interface_name = 'Standard' + operation_name = 'create' + + other_node, node = ctx.model.node.list() + assert other_node.id == 1 + assert node.id == 2 + + interface = mock.models.create_interface( + node.service, + interface_name=interface_name, + operation_name=operation_name, + operation_kwargs=dict(function=op_path(get_node_id, module_path=__name__), + arguments={'holder_path': dataholder.path}) + ) + node.interfaces[interface.name] = interface + ctx.model.node.update(node) + + @workflow + def basic_workflow(graph, **_): + graph.add_tasks( + api.task.OperationTask( + node, + interface_name=interface_name, + operation_name=operation_name, + ) + ) + + execute(workflow_func=basic_workflow, workflow_context=ctx, executor=thread_executor) + + op_node_id = dataholder[api.task.OperationTask.NAME_FORMAT.format( + type='node', + name=node.name, + interface=interface_name, + operation=operation_name + )] + assert op_node_id == node.id + assert op_node_id != other_node.id + + +def test_plugin_workdir(ctx, thread_executor, tmpdir): + interface_name = 'Standard' + operation_name = 'create' + + plugin = mock.models.create_plugin() + ctx.model.plugin.put(plugin) + node = ctx.model.node.get_by_name(mock.models.DEPENDENCY_NODE_NAME) + filename = 'test_file' + content = 'file content' + arguments = {'filename': filename, 'content': content} + interface = mock.models.create_interface( + node.service, + interface_name, + operation_name, + operation_kwargs=dict( + function='{0}.{1}'.format(__name__, _test_plugin_workdir.__name__), + plugin=plugin, + arguments=arguments) + ) + node.interfaces[interface.name] = interface + ctx.model.node.update(node) + + @workflow + def basic_workflow(graph, **_): + graph.add_tasks(api.task.OperationTask( + node, + interface_name=interface_name, + operation_name=operation_name, + arguments=arguments)) + + execute(workflow_func=basic_workflow, workflow_context=ctx, executor=thread_executor) + expected_file = tmpdir.join('workdir', 'plugins', str(ctx.service.id), + plugin.name, + filename) + assert expected_file.read() == content + + +@pytest.fixture(params=[ + (thread.ThreadExecutor, {}), + (process.ProcessExecutor, {'python_path': [tests.ROOT_DIR]}), +]) +def executor(request): + ex_cls, kwargs = request.param + ex = ex_cls(**kwargs) + try: + yield ex + finally: + ex.close() + + +def test_node_operation_logging(ctx, executor): + interface_name, operation_name = mock.operations.NODE_OPERATIONS_INSTALL[0] + + node = ctx.model.node.get_by_name(mock.models.DEPENDENCY_NODE_NAME) + + arguments = { + 'op_start': 'op_start', + 'op_end': 'op_end', + } + interface = mock.models.create_interface( + node.service, + interface_name, + operation_name, + operation_kwargs=dict( + function=op_path(logged_operation, module_path=__name__), + arguments=arguments) + ) + node.interfaces[interface.name] = interface + ctx.model.node.update(node) + + @workflow + def basic_workflow(graph, **_): + graph.add_tasks( + api.task.OperationTask( + node, + interface_name=interface_name, + operation_name=operation_name, + arguments=arguments + ) + ) + execute(workflow_func=basic_workflow, workflow_context=ctx, executor=executor) + _assert_loggins(ctx, arguments) + + +def test_relationship_operation_logging(ctx, executor): + interface_name, operation_name = mock.operations.RELATIONSHIP_OPERATIONS_INSTALL[0] + + relationship = ctx.model.relationship.list()[0] + arguments = { + 'op_start': 'op_start', + 'op_end': 'op_end', + } + interface = mock.models.create_interface( + relationship.source_node.service, + interface_name, + operation_name, + operation_kwargs=dict(function=op_path(logged_operation, module_path=__name__), + arguments=arguments) + ) + relationship.interfaces[interface.name] = interface + ctx.model.relationship.update(relationship) + + @workflow + def basic_workflow(graph, **_): + graph.add_tasks( + api.task.OperationTask( + relationship, + interface_name=interface_name, + operation_name=operation_name, + arguments=arguments + ) + ) + + execute(workflow_func=basic_workflow, workflow_context=ctx, executor=executor) + _assert_loggins(ctx, arguments) + + +def test_attribute_consumption(ctx, executor, dataholder): + # region Updating node operation + node_int_name, node_op_name = mock.operations.NODE_OPERATIONS_INSTALL[0] + + source_node = ctx.model.node.get_by_name(mock.models.DEPENDENT_NODE_NAME) + + arguments = {'dict_': {'key': 'value'}, + 'set_test_dict': {'key2': 'value2'}} + interface = mock.models.create_interface( + source_node.service, + node_int_name, + node_op_name, + operation_kwargs=dict( + function=op_path(attribute_altering_operation, module_path=__name__), + arguments=arguments) + ) + source_node.interfaces[interface.name] = interface + ctx.model.node.update(source_node) + # endregion + + # region updating relationship operation + rel_int_name, rel_op_name = mock.operations.RELATIONSHIP_OPERATIONS_INSTALL[2] + + relationship = ctx.model.relationship.list()[0] + interface = mock.models.create_interface( + relationship.source_node.service, + rel_int_name, + rel_op_name, + operation_kwargs=dict( + function=op_path(attribute_consuming_operation, module_path=__name__), + arguments={'holder_path': dataholder.path} + ) + ) + relationship.interfaces[interface.name] = interface + ctx.model.relationship.update(relationship) + # endregion + + @workflow + def basic_workflow(graph, **_): + graph.sequence( + api.task.OperationTask( + source_node, + interface_name=node_int_name, + operation_name=node_op_name, + arguments=arguments + ), + api.task.OperationTask( + relationship, + interface_name=rel_int_name, + operation_name=rel_op_name, + ) + ) + + execute(workflow_func=basic_workflow, workflow_context=ctx, executor=executor) + target_node = ctx.model.node.get_by_name(mock.models.DEPENDENCY_NODE_NAME) + + assert len(source_node.attributes) == len(target_node.attributes) == 2 + assert source_node.attributes['key'] != target_node.attributes['key'] + assert source_node.attributes['key'].value == \ + target_node.attributes['key'].value == \ + dataholder['key'] == 'value' + + assert source_node.attributes['key2'] != target_node.attributes['key2'] + assert source_node.attributes['key2'].value == \ + target_node.attributes['key2'].value == \ + dataholder['key2'] == 'value2' + + +def _assert_loggins(ctx, arguments): + # The logs should contain the following: Workflow Start, Operation Start, custom operation + # log string (op_start), custom operation log string (op_end), Operation End, Workflow End. + + executions = ctx.model.execution.list() + assert len(executions) == 1 + execution = executions[0] + + tasks = ctx.model.task.list(filters={'_stub_type': None}) + assert len(tasks) == 1 + task = tasks[0] + assert len(task.logs) == 4 + + logs = ctx.model.log.list() + assert len(logs) == len(execution.logs) == 6 + assert set(logs) == set(execution.logs) + + assert all(l.execution == execution for l in logs) + assert all(l in logs and l.task == task for l in task.logs) + + op_start_log = [l for l in logs if arguments['op_start'] in l.msg and l.level.lower() == 'info'] + assert len(op_start_log) == 1 + op_start_log = op_start_log[0] + + op_end_log = [l for l in logs if arguments['op_end'] in l.msg and l.level.lower() == 'debug'] + assert len(op_end_log) == 1 + op_end_log = op_end_log[0] + + assert op_start_log.created_at < op_end_log.created_at + + +@operation +def logged_operation(ctx, **_): + ctx.logger.info(ctx.task.arguments['op_start'].value) + # enables to check the relation between the created_at field properly + time.sleep(1) + ctx.logger.debug(ctx.task.arguments['op_end'].value) + + +@operation +def basic_node_operation(ctx, holder_path, **_): + holder = helpers.FilesystemDataHolder(holder_path) + + operation_common(ctx, holder) + holder['template_name'] = ctx.node_template.name + holder['node_name'] = ctx.node.name + + +@operation +def basic_relationship_operation(ctx, holder_path, **_): + holder = helpers.FilesystemDataHolder(holder_path) + + operation_common(ctx, holder) + holder['target_node_template_name'] = ctx.target_node_template.name + holder['target_node_name'] = ctx.target_node.name + holder['relationship_name'] = ctx.relationship.name + holder['source_node_template_name'] = ctx.source_node_template.name + holder['source_node_name'] = ctx.source_node.name + + +def operation_common(ctx, holder): + holder['ctx_name'] = ctx.__class__.__name__ + + holder['actor_name'] = ctx.task.actor.name + holder['task_name'] = ctx.task.name + holder['function'] = ctx.task.function + holder['arguments'] = dict(i.unwrapped for i in ctx.task.arguments.itervalues()) + + +@operation +def get_node_id(ctx, holder_path, **_): + helpers.FilesystemDataHolder(holder_path)[ctx.name] = ctx.node.id + + +@operation +def _test_plugin_workdir(ctx, filename, content): + with open(os.path.join(ctx.plugin_workdir, filename), 'w') as f: + f.write(content) + + +@operation +def attribute_altering_operation(ctx, dict_, set_test_dict, **_): + ctx.node.attributes.update(dict_) + + for key, value in set_test_dict.items(): + ctx.node.attributes[key] = value + + +@operation +def attribute_consuming_operation(ctx, holder_path, **_): + holder = helpers.FilesystemDataHolder(holder_path) + ctx.target_node.attributes.update(ctx.source_node.attributes) + holder.update(**ctx.target_node.attributes) + + ctx.target_node.attributes['key2'] = ctx.source_node.attributes['key2'] + holder['key2'] = ctx.target_node.attributes['key2'] |