summaryrefslogtreecommitdiffstats
path: root/azure/aria/aria-extension-cloudify/src/aria/tests/orchestrator/workflows/executor/test_process_executor.py
blob: e050d18c6774c902590d3ab0c641461a7148208e (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import os
import sys
import time
import Queue
import subprocess

import pytest
import psutil
import retrying

import aria

from aria import operation
from aria.modeling import models
from aria.orchestrator import events
from aria.utils.plugin import create as create_plugin
from aria.orchestrator.workflows.executor import process

import tests.storage
import tests.resources
from tests.helpers import FilesystemDataHolder
from tests.fixtures import (  # pylint: disable=unused-import
    plugins_dir,
    plugin_manager,
)
from . import MockContext


class TestProcessExecutor(object):

    def test_plugin_execution(self, executor, mock_plugin, model, queue):
        ctx = MockContext(
            model,
            task_kwargs=dict(function='mock_plugin1.operation', plugin_fk=mock_plugin.id)
        )

        executor.execute(ctx)
        error = queue.get(timeout=60)
        # tests/resources/plugins/mock-plugin1 is the plugin installed
        # during this tests setup. The module mock_plugin1 contains a single
        # operation named "operation" which calls an entry point defined in the plugin's
        # setup.py. This entry points simply prints 'mock-plugin-output' to stdout.
        # The "operation" operation that called this subprocess, then raises a RuntimeError
        # with that subprocess output as the error message.
        # This is what we assert here. This tests checks that both the PYTHONPATH (operation)
        # and PATH (entry point) are properly updated in the subprocess in which the task is
        # running.
        assert isinstance(error, RuntimeError)
        assert error.message == 'mock-plugin-output'

    def test_closed(self, executor, model):
        executor.close()
        with pytest.raises(RuntimeError) as exc_info:
            executor.execute(MockContext(model, task_kwargs=dict(function='some.function')))
        assert 'closed' in exc_info.value.message

    def test_process_termination(self, executor, model, fs_test_holder, tmpdir):
        freeze_script_path = str(tmpdir.join('freeze_script'))
        with open(freeze_script_path, 'w+b') as f:
            f.write(
                '''import time
while True:
    time.sleep(5)
                '''
            )
        holder_path_argument = models.Argument.wrap('holder_path', fs_test_holder._path)
        script_path_argument = models.Argument.wrap('freezing_script_path',
                                                    str(tmpdir.join('freeze_script')))

        model.argument.put(holder_path_argument)
        model.argument.put(script_path_argument)
        ctx = MockContext(
            model,
            task_kwargs=dict(
                function='{0}.{1}'.format(__name__, freezing_task.__name__),
                arguments=dict(holder_path=holder_path_argument,
                               freezing_script_path=script_path_argument)),
        )

        executor.execute(ctx)

        @retrying.retry(retry_on_result=lambda r: r is False, stop_max_delay=60000, wait_fixed=500)
        def wait_for_extra_process_id():
            return fs_test_holder.get('subproc', False)

        task_pid = executor._tasks[ctx.task.id].proc.pid
        extra_process_pid = wait_for_extra_process_id()

        assert set([task_pid, extra_process_pid]).issubset(set(psutil.pids()))
        executor.terminate(ctx.task.id)

        # Give a chance to the processes to terminate
        time.sleep(2)

        # all processes should be either zombies or non existent
        pids = [task_pid, extra_process_pid]
        for pid in pids:
            if pid in psutil.pids():
                assert psutil.Process(pid).status() == psutil.STATUS_ZOMBIE
            else:
                # making the test more readable
                assert pid not in psutil.pids()


@pytest.fixture
def queue():
    _queue = Queue.Queue()

    def handler(_, exception=None, **kwargs):
        _queue.put(exception)

    events.on_success_task_signal.connect(handler)
    events.on_failure_task_signal.connect(handler)
    try:
        yield _queue
    finally:
        events.on_success_task_signal.disconnect(handler)
        events.on_failure_task_signal.disconnect(handler)


@pytest.fixture
def fs_test_holder(tmpdir):
    dataholder_path = str(tmpdir.join('dataholder'))
    holder = FilesystemDataHolder(dataholder_path)
    return holder


@pytest.fixture
def executor(plugin_manager):
    result = process.ProcessExecutor(plugin_manager=plugin_manager, python_path=[tests.ROOT_DIR])
    try:
        yield result
    finally:
        result.close()


@pytest.fixture
def mock_plugin(plugin_manager, tmpdir):
    source = os.path.join(tests.resources.DIR, 'plugins', 'mock-plugin1')
    plugin_path = create_plugin(source=source, destination_dir=str(tmpdir))
    return plugin_manager.install(source=plugin_path)


@pytest.fixture
def model(tmpdir):
    _storage = aria.application_model_storage(aria.storage.sql_mapi.SQLAlchemyModelAPI,
                                              initiator_kwargs=dict(base_dir=str(tmpdir)))
    yield _storage
    tests.storage.release_sqlite_storage(_storage)


@operation
def freezing_task(holder_path, freezing_script_path, **_):
    holder = FilesystemDataHolder(holder_path)
    holder['subproc'] = subprocess.Popen([sys.executable, freezing_script_path], shell=True).pid
    while True:
        time.sleep(5)