summaryrefslogtreecommitdiffstats
path: root/azure/aria/aria-extension-cloudify/src/aria/tests/orchestrator/context/test_serialize.py
diff options
context:
space:
mode:
Diffstat (limited to 'azure/aria/aria-extension-cloudify/src/aria/tests/orchestrator/context/test_serialize.py')
-rw-r--r--azure/aria/aria-extension-cloudify/src/aria/tests/orchestrator/context/test_serialize.py104
1 files changed, 104 insertions, 0 deletions
diff --git a/azure/aria/aria-extension-cloudify/src/aria/tests/orchestrator/context/test_serialize.py b/azure/aria/aria-extension-cloudify/src/aria/tests/orchestrator/context/test_serialize.py
new file mode 100644
index 0000000..091e23c
--- /dev/null
+++ b/azure/aria/aria-extension-cloudify/src/aria/tests/orchestrator/context/test_serialize.py
@@ -0,0 +1,104 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import pytest
+
+from aria.orchestrator.workflows import api
+from aria.orchestrator.workflows.core import engine, graph_compiler
+from aria.orchestrator.workflows.executor import process
+from aria.orchestrator import workflow, operation
+import tests
+from tests import mock
+from tests import storage
+
+TEST_FILE_CONTENT = 'CONTENT'
+TEST_FILE_ENTRY_ID = 'entry'
+TEST_FILE_NAME = 'test_file'
+
+
+def test_serialize_operation_context(context, executor, tmpdir):
+ test_file = tmpdir.join(TEST_FILE_NAME)
+ test_file.write(TEST_FILE_CONTENT)
+ resource = context.resource
+ resource.service_template.upload(TEST_FILE_ENTRY_ID, str(test_file))
+
+ node = context.model.node.get_by_name(mock.models.DEPENDENCY_NODE_NAME)
+ plugin = mock.models.create_plugin()
+ context.model.plugin.put(plugin)
+ interface = mock.models.create_interface(
+ node.service,
+ 'test',
+ 'op',
+ operation_kwargs=dict(function=_operation_mapping(),
+ plugin=plugin)
+ )
+ node.interfaces[interface.name] = interface
+ context.model.node.update(node)
+
+ graph = _mock_workflow(ctx=context) # pylint: disable=no-value-for-parameter
+ graph_compiler.GraphCompiler(context, executor.__class__).compile(graph)
+ eng = engine.Engine({executor.__class__: executor})
+ eng.execute(context)
+
+
+@workflow
+def _mock_workflow(ctx, graph):
+ node = ctx.model.node.get_by_name(mock.models.DEPENDENCY_NODE_NAME)
+ graph.add_tasks(api.task.OperationTask(node, interface_name='test', operation_name='op'))
+ return graph
+
+
+@operation
+def _mock_operation(ctx):
+ # We test several things in this operation
+ # ctx.task, ctx.node, etc... tell us that the model storage was properly re-created
+ # a correct ctx.task.function tells us we kept the correct task_id
+ assert ctx.task.function == _operation_mapping()
+ # a correct ctx.node.name tells us we kept the correct actor_id
+ assert ctx.node.name == mock.models.DEPENDENCY_NODE_NAME
+ # a correct ctx.name tells us we kept the correct name
+ assert ctx.name is not None
+ assert ctx.name == ctx.task.name
+ # a correct ctx.deployment.name tells us we kept the correct deployment_id
+ assert ctx.service.name == mock.models.SERVICE_NAME
+ # Here we test that the resource storage was properly re-created
+ test_file_content = ctx.resource.service_template.read(TEST_FILE_ENTRY_ID, TEST_FILE_NAME)
+ assert test_file_content == TEST_FILE_CONTENT
+ # a non empty plugin workdir tells us that we kept the correct base_workdir
+ assert ctx.plugin_workdir is not None
+
+
+def _operation_mapping():
+ return '{name}.{func.__name__}'.format(name=__name__, func=_mock_operation)
+
+
+@pytest.fixture
+def executor():
+ result = process.ProcessExecutor(python_path=[tests.ROOT_DIR])
+ try:
+ yield result
+ finally:
+ result.close()
+
+
+@pytest.fixture
+def context(tmpdir):
+ result = mock.context.simple(
+ str(tmpdir),
+ context_kwargs=dict(workdir=str(tmpdir.join('workdir')))
+ )
+
+ yield result
+ storage.release_sqlite_storage(result.model)