summaryrefslogtreecommitdiffstats
path: root/azure/aria/aria-extension-cloudify/src/aria/tests/orchestrator/workflows/executor/test_process_executor.py
diff options
context:
space:
mode:
Diffstat (limited to 'azure/aria/aria-extension-cloudify/src/aria/tests/orchestrator/workflows/executor/test_process_executor.py')
-rw-r--r--azure/aria/aria-extension-cloudify/src/aria/tests/orchestrator/workflows/executor/test_process_executor.py172
1 files changed, 172 insertions, 0 deletions
diff --git a/azure/aria/aria-extension-cloudify/src/aria/tests/orchestrator/workflows/executor/test_process_executor.py b/azure/aria/aria-extension-cloudify/src/aria/tests/orchestrator/workflows/executor/test_process_executor.py
new file mode 100644
index 0000000..e050d18
--- /dev/null
+++ b/azure/aria/aria-extension-cloudify/src/aria/tests/orchestrator/workflows/executor/test_process_executor.py
@@ -0,0 +1,172 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import os
+import sys
+import time
+import Queue
+import subprocess
+
+import pytest
+import psutil
+import retrying
+
+import aria
+
+from aria import operation
+from aria.modeling import models
+from aria.orchestrator import events
+from aria.utils.plugin import create as create_plugin
+from aria.orchestrator.workflows.executor import process
+
+import tests.storage
+import tests.resources
+from tests.helpers import FilesystemDataHolder
+from tests.fixtures import ( # pylint: disable=unused-import
+ plugins_dir,
+ plugin_manager,
+)
+from . import MockContext
+
+
+class TestProcessExecutor(object):
+
+ def test_plugin_execution(self, executor, mock_plugin, model, queue):
+ ctx = MockContext(
+ model,
+ task_kwargs=dict(function='mock_plugin1.operation', plugin_fk=mock_plugin.id)
+ )
+
+ executor.execute(ctx)
+ error = queue.get(timeout=60)
+ # tests/resources/plugins/mock-plugin1 is the plugin installed
+ # during this tests setup. The module mock_plugin1 contains a single
+ # operation named "operation" which calls an entry point defined in the plugin's
+ # setup.py. This entry points simply prints 'mock-plugin-output' to stdout.
+ # The "operation" operation that called this subprocess, then raises a RuntimeError
+ # with that subprocess output as the error message.
+ # This is what we assert here. This tests checks that both the PYTHONPATH (operation)
+ # and PATH (entry point) are properly updated in the subprocess in which the task is
+ # running.
+ assert isinstance(error, RuntimeError)
+ assert error.message == 'mock-plugin-output'
+
+ def test_closed(self, executor, model):
+ executor.close()
+ with pytest.raises(RuntimeError) as exc_info:
+ executor.execute(MockContext(model, task_kwargs=dict(function='some.function')))
+ assert 'closed' in exc_info.value.message
+
+ def test_process_termination(self, executor, model, fs_test_holder, tmpdir):
+ freeze_script_path = str(tmpdir.join('freeze_script'))
+ with open(freeze_script_path, 'w+b') as f:
+ f.write(
+ '''import time
+while True:
+ time.sleep(5)
+ '''
+ )
+ holder_path_argument = models.Argument.wrap('holder_path', fs_test_holder._path)
+ script_path_argument = models.Argument.wrap('freezing_script_path',
+ str(tmpdir.join('freeze_script')))
+
+ model.argument.put(holder_path_argument)
+ model.argument.put(script_path_argument)
+ ctx = MockContext(
+ model,
+ task_kwargs=dict(
+ function='{0}.{1}'.format(__name__, freezing_task.__name__),
+ arguments=dict(holder_path=holder_path_argument,
+ freezing_script_path=script_path_argument)),
+ )
+
+ executor.execute(ctx)
+
+ @retrying.retry(retry_on_result=lambda r: r is False, stop_max_delay=60000, wait_fixed=500)
+ def wait_for_extra_process_id():
+ return fs_test_holder.get('subproc', False)
+
+ task_pid = executor._tasks[ctx.task.id].proc.pid
+ extra_process_pid = wait_for_extra_process_id()
+
+ assert set([task_pid, extra_process_pid]).issubset(set(psutil.pids()))
+ executor.terminate(ctx.task.id)
+
+ # Give a chance to the processes to terminate
+ time.sleep(2)
+
+ # all processes should be either zombies or non existent
+ pids = [task_pid, extra_process_pid]
+ for pid in pids:
+ if pid in psutil.pids():
+ assert psutil.Process(pid).status() == psutil.STATUS_ZOMBIE
+ else:
+ # making the test more readable
+ assert pid not in psutil.pids()
+
+
+@pytest.fixture
+def queue():
+ _queue = Queue.Queue()
+
+ def handler(_, exception=None, **kwargs):
+ _queue.put(exception)
+
+ events.on_success_task_signal.connect(handler)
+ events.on_failure_task_signal.connect(handler)
+ try:
+ yield _queue
+ finally:
+ events.on_success_task_signal.disconnect(handler)
+ events.on_failure_task_signal.disconnect(handler)
+
+
+@pytest.fixture
+def fs_test_holder(tmpdir):
+ dataholder_path = str(tmpdir.join('dataholder'))
+ holder = FilesystemDataHolder(dataholder_path)
+ return holder
+
+
+@pytest.fixture
+def executor(plugin_manager):
+ result = process.ProcessExecutor(plugin_manager=plugin_manager, python_path=[tests.ROOT_DIR])
+ try:
+ yield result
+ finally:
+ result.close()
+
+
+@pytest.fixture
+def mock_plugin(plugin_manager, tmpdir):
+ source = os.path.join(tests.resources.DIR, 'plugins', 'mock-plugin1')
+ plugin_path = create_plugin(source=source, destination_dir=str(tmpdir))
+ return plugin_manager.install(source=plugin_path)
+
+
+@pytest.fixture
+def model(tmpdir):
+ _storage = aria.application_model_storage(aria.storage.sql_mapi.SQLAlchemyModelAPI,
+ initiator_kwargs=dict(base_dir=str(tmpdir)))
+ yield _storage
+ tests.storage.release_sqlite_storage(_storage)
+
+
+@operation
+def freezing_task(holder_path, freezing_script_path, **_):
+ holder = FilesystemDataHolder(holder_path)
+ holder['subproc'] = subprocess.Popen([sys.executable, freezing_script_path], shell=True).pid
+ while True:
+ time.sleep(5)