diff options
Diffstat (limited to 'azure/aria/aria-extension-cloudify/src/aria/tests/orchestrator/context/test_serialize.py')
-rw-r--r-- | azure/aria/aria-extension-cloudify/src/aria/tests/orchestrator/context/test_serialize.py | 104 |
1 files changed, 104 insertions, 0 deletions
diff --git a/azure/aria/aria-extension-cloudify/src/aria/tests/orchestrator/context/test_serialize.py b/azure/aria/aria-extension-cloudify/src/aria/tests/orchestrator/context/test_serialize.py new file mode 100644 index 0000000..091e23c --- /dev/null +++ b/azure/aria/aria-extension-cloudify/src/aria/tests/orchestrator/context/test_serialize.py @@ -0,0 +1,104 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import pytest + +from aria.orchestrator.workflows import api +from aria.orchestrator.workflows.core import engine, graph_compiler +from aria.orchestrator.workflows.executor import process +from aria.orchestrator import workflow, operation +import tests +from tests import mock +from tests import storage + +TEST_FILE_CONTENT = 'CONTENT' +TEST_FILE_ENTRY_ID = 'entry' +TEST_FILE_NAME = 'test_file' + + +def test_serialize_operation_context(context, executor, tmpdir): + test_file = tmpdir.join(TEST_FILE_NAME) + test_file.write(TEST_FILE_CONTENT) + resource = context.resource + resource.service_template.upload(TEST_FILE_ENTRY_ID, str(test_file)) + + node = context.model.node.get_by_name(mock.models.DEPENDENCY_NODE_NAME) + plugin = mock.models.create_plugin() + context.model.plugin.put(plugin) + interface = mock.models.create_interface( + node.service, + 'test', + 'op', + operation_kwargs=dict(function=_operation_mapping(), + plugin=plugin) + ) + node.interfaces[interface.name] = interface + context.model.node.update(node) + + graph = _mock_workflow(ctx=context) # pylint: disable=no-value-for-parameter + graph_compiler.GraphCompiler(context, executor.__class__).compile(graph) + eng = engine.Engine({executor.__class__: executor}) + eng.execute(context) + + +@workflow +def _mock_workflow(ctx, graph): + node = ctx.model.node.get_by_name(mock.models.DEPENDENCY_NODE_NAME) + graph.add_tasks(api.task.OperationTask(node, interface_name='test', operation_name='op')) + return graph + + +@operation +def _mock_operation(ctx): + # We test several things in this operation + # ctx.task, ctx.node, etc... tell us that the model storage was properly re-created + # a correct ctx.task.function tells us we kept the correct task_id + assert ctx.task.function == _operation_mapping() + # a correct ctx.node.name tells us we kept the correct actor_id + assert ctx.node.name == mock.models.DEPENDENCY_NODE_NAME + # a correct ctx.name tells us we kept the correct name + assert ctx.name is not None + assert ctx.name == ctx.task.name + # a correct ctx.deployment.name tells us we kept the correct deployment_id + assert ctx.service.name == mock.models.SERVICE_NAME + # Here we test that the resource storage was properly re-created + test_file_content = ctx.resource.service_template.read(TEST_FILE_ENTRY_ID, TEST_FILE_NAME) + assert test_file_content == TEST_FILE_CONTENT + # a non empty plugin workdir tells us that we kept the correct base_workdir + assert ctx.plugin_workdir is not None + + +def _operation_mapping(): + return '{name}.{func.__name__}'.format(name=__name__, func=_mock_operation) + + +@pytest.fixture +def executor(): + result = process.ProcessExecutor(python_path=[tests.ROOT_DIR]) + try: + yield result + finally: + result.close() + + +@pytest.fixture +def context(tmpdir): + result = mock.context.simple( + str(tmpdir), + context_kwargs=dict(workdir=str(tmpdir.join('workdir'))) + ) + + yield result + storage.release_sqlite_storage(result.model) |