summaryrefslogtreecommitdiffstats
path: root/azure/aria/aria-extension-cloudify/src/aria/tests/orchestrator/context/test_serialize.py
blob: 091e23c2f98654571c43d6e6c30ae02335325ee4 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import pytest

from aria.orchestrator.workflows import api
from aria.orchestrator.workflows.core import engine, graph_compiler
from aria.orchestrator.workflows.executor import process
from aria.orchestrator import workflow, operation
import tests
from tests import mock
from tests import storage

TEST_FILE_CONTENT = 'CONTENT'
TEST_FILE_ENTRY_ID = 'entry'
TEST_FILE_NAME = 'test_file'


def test_serialize_operation_context(context, executor, tmpdir):
    test_file = tmpdir.join(TEST_FILE_NAME)
    test_file.write(TEST_FILE_CONTENT)
    resource = context.resource
    resource.service_template.upload(TEST_FILE_ENTRY_ID, str(test_file))

    node = context.model.node.get_by_name(mock.models.DEPENDENCY_NODE_NAME)
    plugin = mock.models.create_plugin()
    context.model.plugin.put(plugin)
    interface = mock.models.create_interface(
        node.service,
        'test',
        'op',
        operation_kwargs=dict(function=_operation_mapping(),
                              plugin=plugin)
    )
    node.interfaces[interface.name] = interface
    context.model.node.update(node)

    graph = _mock_workflow(ctx=context)  # pylint: disable=no-value-for-parameter
    graph_compiler.GraphCompiler(context, executor.__class__).compile(graph)
    eng = engine.Engine({executor.__class__: executor})
    eng.execute(context)


@workflow
def _mock_workflow(ctx, graph):
    node = ctx.model.node.get_by_name(mock.models.DEPENDENCY_NODE_NAME)
    graph.add_tasks(api.task.OperationTask(node, interface_name='test', operation_name='op'))
    return graph


@operation
def _mock_operation(ctx):
    # We test several things in this operation
    # ctx.task, ctx.node, etc... tell us that the model storage was properly re-created
    # a correct ctx.task.function tells us we kept the correct task_id
    assert ctx.task.function == _operation_mapping()
    # a correct ctx.node.name tells us we kept the correct actor_id
    assert ctx.node.name == mock.models.DEPENDENCY_NODE_NAME
    # a correct ctx.name tells us we kept the correct name
    assert ctx.name is not None
    assert ctx.name == ctx.task.name
    # a correct ctx.deployment.name tells us we kept the correct deployment_id
    assert ctx.service.name == mock.models.SERVICE_NAME
    # Here we test that the resource storage was properly re-created
    test_file_content = ctx.resource.service_template.read(TEST_FILE_ENTRY_ID, TEST_FILE_NAME)
    assert test_file_content == TEST_FILE_CONTENT
    # a non empty plugin workdir tells us that we kept the correct base_workdir
    assert ctx.plugin_workdir is not None


def _operation_mapping():
    return '{name}.{func.__name__}'.format(name=__name__, func=_mock_operation)


@pytest.fixture
def executor():
    result = process.ProcessExecutor(python_path=[tests.ROOT_DIR])
    try:
        yield result
    finally:
        result.close()


@pytest.fixture
def context(tmpdir):
    result = mock.context.simple(
        str(tmpdir),
        context_kwargs=dict(workdir=str(tmpdir.join('workdir')))
    )

    yield result
    storage.release_sqlite_storage(result.model)