diff options
Diffstat (limited to 'azure/aria/aria-extension-cloudify/src/aria/tests/orchestrator/execution_plugin/test_local.py')
-rw-r--r-- | azure/aria/aria-extension-cloudify/src/aria/tests/orchestrator/execution_plugin/test_local.py | 598 |
1 files changed, 598 insertions, 0 deletions
diff --git a/azure/aria/aria-extension-cloudify/src/aria/tests/orchestrator/execution_plugin/test_local.py b/azure/aria/aria-extension-cloudify/src/aria/tests/orchestrator/execution_plugin/test_local.py new file mode 100644 index 0000000..7f33318 --- /dev/null +++ b/azure/aria/aria-extension-cloudify/src/aria/tests/orchestrator/execution_plugin/test_local.py @@ -0,0 +1,598 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import json +import os + +import pytest + +from aria import workflow +from aria.orchestrator import events +from aria.orchestrator.workflows import api +from aria.orchestrator.workflows.exceptions import ExecutorException +from aria.orchestrator.exceptions import TaskAbortException, TaskRetryException +from aria.orchestrator.execution_plugin import operations +from aria.orchestrator.execution_plugin.exceptions import ProcessException +from aria.orchestrator.execution_plugin import local +from aria.orchestrator.execution_plugin import constants +from aria.orchestrator.workflows.executor import process +from aria.orchestrator.workflows.core import engine, graph_compiler + +from tests import mock +from tests import storage +from tests.orchestrator.workflows.helpers import events_collector + +IS_WINDOWS = os.name == 'nt' + + +class TestLocalRunScript(object): + + def test_script_path_parameter(self, executor, workflow_context, tmpdir): + script_path = self._create_script( + tmpdir, + linux_script='''#! /bin/bash -e + ctx node attributes map key = value + ''', + windows_script=''' + ctx node attributes map key = value + ''') + props = self._run( + executor, workflow_context, + script_path=script_path) + assert props['map'].value['key'] == 'value' + + def test_process_env(self, executor, workflow_context, tmpdir): + script_path = self._create_script( + tmpdir, + linux_script='''#! /bin/bash -e + ctx node attributes map key1 = "$key1" + ctx node attributes map key2 = "$key2" + ''', + windows_script=''' + ctx node attributes map key1 = %key1% + ctx node attributes map key2 = %key2% + ''') + props = self._run( + executor, workflow_context, + script_path=script_path, + process={ + 'env': { + 'key1': 'value1', + 'key2': 'value2' + } + }) + p_map = props['map'].value + assert p_map['key1'] == 'value1' + assert p_map['key2'] == 'value2' + + def test_process_cwd(self, executor, workflow_context, tmpdir): + script_path = self._create_script( + tmpdir, + linux_script='''#! /bin/bash -e + ctx node attributes map cwd = "$PWD" + ''', + windows_script=''' + ctx node attributes map cwd = %CD% + ''') + tmpdir = str(tmpdir) + props = self._run( + executor, workflow_context, + script_path=script_path, + process={ + 'cwd': tmpdir + }) + p_map = props['map'].value + assert p_map['cwd'] == tmpdir + + def test_process_command_prefix(self, executor, workflow_context, tmpdir): + use_ctx = 'ctx node attributes map key = value' + python_script = ['import subprocess', + 'subprocess.Popen("{0}".split(' ')).communicate()[0]'.format(use_ctx)] + python_script = '\n'.join(python_script) + script_path = self._create_script( + tmpdir, + linux_script=python_script, + windows_script=python_script, + windows_suffix='', + linux_suffix='') + props = self._run( + executor, workflow_context, + script_path=script_path, + process={ + 'env': {'TEST_KEY': 'value'}, + 'command_prefix': 'python' + }) + p_map = props['map'].value + assert p_map['key'] == 'value' + + def test_process_args(self, executor, workflow_context, tmpdir): + script_path = self._create_script( + tmpdir, + linux_script='''#! /bin/bash -e + ctx node attributes map arg1 = "$1" + ctx node attributes map arg2 = "$2" + ''', + windows_script=''' + ctx node attributes map arg1 = %1 + ctx node attributes map arg2 = %2 + ''') + props = self._run( + executor, workflow_context, + script_path=script_path, + process={ + 'args': ['"arg with spaces"', 'arg2'] + }) + assert props['map'].value['arg1'] == 'arg with spaces' + assert props['map'].value['arg2'] == 'arg2' + + def test_no_script_path(self, executor, workflow_context): + exception = self._run_and_get_task_exception( + executor, workflow_context, + script_path=None) + assert isinstance(exception, TaskAbortException) + assert 'script_path' in exception.message + + def test_script_error(self, executor, workflow_context, tmpdir): + script_path = self._create_script( + tmpdir, + linux_script='''#! /bin/bash -e + echo 123123 + command_that_does_not_exist [ ] + ''', + windows_script=''' + @echo off + echo 123123 + command_that_does_not_exist [ ] + ''') + exception = self._run_and_get_task_exception( + executor, workflow_context, + script_path=script_path) + assert isinstance(exception, ProcessException) + assert os.path.basename(script_path) in exception.command + assert exception.exit_code == 1 if IS_WINDOWS else 127 + assert exception.stdout.strip() == '123123' + assert 'command_that_does_not_exist' in exception.stderr + + def test_script_error_from_bad_ctx_request(self, executor, workflow_context, tmpdir): + script_path = self._create_script( + tmpdir, + linux_script='''#! /bin/bash -e + ctx property_that_does_not_exist + ''', + windows_script=''' + ctx property_that_does_not_exist + ''') + exception = self._run_and_get_task_exception( + executor, workflow_context, + script_path=script_path) + assert isinstance(exception, ProcessException) + assert os.path.basename(script_path) in exception.command + assert exception.exit_code == 1 + assert 'RequestError' in exception.stderr + assert 'property_that_does_not_exist' in exception.stderr + + def test_python_script(self, executor, workflow_context, tmpdir): + script = ''' +from aria.orchestrator.execution_plugin import ctx, inputs +if __name__ == '__main__': + ctx.node.attributes['key'] = inputs['key'] +''' + suffix = '.py' + script_path = self._create_script( + tmpdir, + linux_script=script, + windows_script=script, + linux_suffix=suffix, + windows_suffix=suffix) + props = self._run( + executor, workflow_context, + script_path=script_path, + arguments={'key': 'value'}) + assert props['key'].value == 'value' + + @pytest.mark.parametrize( + 'value', ['string-value', [1, 2, 3], 999, 3.14, False, + {'complex1': {'complex2': {'key': 'value'}, 'list': [1, 2, 3]}}]) + def test_inputs_as_environment_variables(self, executor, workflow_context, tmpdir, value): + script_path = self._create_script( + tmpdir, + linux_script='''#! /bin/bash -e + ctx node attributes key = "${input_as_env_var}" + ''', + windows_script=''' + ctx node attributes key = "%input_as_env_var%" + ''') + props = self._run( + executor, workflow_context, + script_path=script_path, + env_var=value) + value = props['key'].value + expected = value if isinstance(value, basestring) else json.loads(value) + assert expected == value + + @pytest.mark.parametrize('value', ['override', {'key': 'value'}]) + def test_explicit_env_variables_inputs_override( + self, executor, workflow_context, tmpdir, value): + script_path = self._create_script( + tmpdir, + linux_script='''#! /bin/bash -e + ctx node attributes key = "${input_as_env_var}" + ''', + windows_script=''' + ctx node attributes key = "%input_as_env_var%" + ''') + + props = self._run( + executor, workflow_context, + script_path=script_path, + env_var='test-value', + process={ + 'env': { + 'input_as_env_var': value + } + }) + value = props['key'].value + expected = value if isinstance(value, basestring) else json.loads(value) + assert expected == value + + def test_get_nonexistent_runtime_property(self, executor, workflow_context, tmpdir): + script_path = self._create_script( + tmpdir, + linux_script='''#! /bin/bash -e + ctx node attributes nonexistent + ''', + windows_script=''' + ctx node attributes nonexistent + ''') + exception = self._run_and_get_task_exception( + executor, workflow_context, + script_path=script_path) + assert isinstance(exception, ProcessException) + assert os.path.basename(script_path) in exception.command + assert 'RequestError' in exception.stderr + assert 'nonexistent' in exception.stderr + + def test_get_nonexistent_runtime_property_json(self, executor, workflow_context, tmpdir): + script_path = self._create_script( + tmpdir, + linux_script='''#! /bin/bash -e + ctx -j node attributes nonexistent + ''', + windows_script=''' + ctx -j node attributes nonexistent + ''') + exception = self._run_and_get_task_exception( + executor, workflow_context, + script_path=script_path) + assert isinstance(exception, ProcessException) + assert os.path.basename(script_path) in exception.command + assert 'RequestError' in exception.stderr + assert 'nonexistent' in exception.stderr + + def test_abort(self, executor, workflow_context, tmpdir): + script_path = self._create_script( + tmpdir, + linux_script='''#! /bin/bash -e + ctx task abort [ abort-message ] + ''', + windows_script=''' + ctx task abort [ abort-message ] + ''') + exception = self._run_and_get_task_exception( + executor, workflow_context, + script_path=script_path) + assert isinstance(exception, TaskAbortException) + assert exception.message == 'abort-message' + + def test_retry(self, executor, workflow_context, tmpdir): + script_path = self._create_script( + tmpdir, + linux_script='''#! /bin/bash -e + ctx task retry [ retry-message ] + ''', + windows_script=''' + ctx task retry [ retry-message ] + ''') + exception = self._run_and_get_task_exception( + executor, workflow_context, + script_path=script_path) + assert isinstance(exception, TaskRetryException) + assert exception.message == 'retry-message' + + def test_retry_with_interval(self, executor, workflow_context, tmpdir): + script_path = self._create_script( + tmpdir, + linux_script='''#! /bin/bash -e + ctx task retry [ retry-message @100 ] + ''', + windows_script=''' + ctx task retry [ retry-message @100 ] + ''') + exception = self._run_and_get_task_exception( + executor, workflow_context, + script_path=script_path) + assert isinstance(exception, TaskRetryException) + assert exception.message == 'retry-message' + assert exception.retry_interval == 100 + + def test_crash_abort_after_retry(self, executor, workflow_context, tmpdir): + script_path = self._create_script( + tmpdir, + linux_script='''#! /bin/bash + ctx task retry [ retry-message ] + ctx task abort [ should-raise-a-runtime-error ] + ''', + windows_script=''' + ctx task retry [ retry-message ] + ctx task abort [ should-raise-a-runtime-error ] + ''') + exception = self._run_and_get_task_exception( + executor, workflow_context, + script_path=script_path) + assert isinstance(exception, TaskAbortException) + assert exception.message == constants.ILLEGAL_CTX_OPERATION_MESSAGE + + def test_crash_retry_after_abort(self, executor, workflow_context, tmpdir): + script_path = self._create_script( + tmpdir, + linux_script='''#! /bin/bash + ctx task abort [ abort-message ] + ctx task retry [ should-raise-a-runtime-error ] + ''', + windows_script=''' + ctx task abort [ abort-message ] + ctx task retry [ should-raise-a-runtime-error ] + ''') + exception = self._run_and_get_task_exception( + executor, workflow_context, + script_path=script_path) + assert isinstance(exception, TaskAbortException) + assert exception.message == constants.ILLEGAL_CTX_OPERATION_MESSAGE + + def test_crash_abort_after_abort(self, executor, workflow_context, tmpdir): + script_path = self._create_script( + tmpdir, + linux_script='''#! /bin/bash + ctx task abort [ abort-message ] + ctx task abort [ should-raise-a-runtime-error ] + ''', + windows_script=''' + ctx task abort [ abort-message ] + ctx task abort [ should-raise-a-runtime-error ] + ''') + exception = self._run_and_get_task_exception( + executor, workflow_context, + script_path=script_path) + assert isinstance(exception, TaskAbortException) + assert exception.message == constants.ILLEGAL_CTX_OPERATION_MESSAGE + + def test_crash_retry_after_retry(self, executor, workflow_context, tmpdir): + script_path = self._create_script( + tmpdir, + linux_script='''#! /bin/bash + ctx task retry [ retry-message ] + ctx task retry [ should-raise-a-runtime-error ] + ''', + windows_script=''' + ctx task retry [ retry-message ] + ctx task retry [ should-raise-a-runtime-error ] + ''') + exception = self._run_and_get_task_exception( + executor, workflow_context, + script_path=script_path) + assert isinstance(exception, TaskAbortException) + assert exception.message == constants.ILLEGAL_CTX_OPERATION_MESSAGE + + def test_retry_returns_a_nonzero_exit_code(self, executor, workflow_context, tmpdir): + log_path = tmpdir.join('temp.log') + message = 'message' + script_path = self._create_script( + tmpdir, + linux_script='''#! /bin/bash -e + ctx task retry [ "{0}" ] 2> {1} + echo should-not-run > {1} + '''.format(message, log_path), + windows_script=''' + ctx task retry [ "{0}" ] 2> {1} + if %errorlevel% neq 0 exit /b %errorlevel% + echo should-not-run > {1} + '''.format(message, log_path)) + with pytest.raises(ExecutorException): + self._run( + executor, workflow_context, + script_path=script_path) + assert log_path.read().strip() == message + + def test_abort_returns_a_nonzero_exit_code(self, executor, workflow_context, tmpdir): + log_path = tmpdir.join('temp.log') + message = 'message' + script_path = self._create_script( + tmpdir, + linux_script='''#! /bin/bash -e + ctx task abort [ "{0}" ] 2> {1} + echo should-not-run > {1} + '''.format(message, log_path), + windows_script=''' + ctx task abort [ "{0}" ] 2> {1} + if %errorlevel% neq 0 exit /b %errorlevel% + echo should-not-run > {1} + '''.format(message, log_path)) + with pytest.raises(ExecutorException): + self._run( + executor, workflow_context, + script_path=script_path) + assert log_path.read().strip() == message + + def _create_script(self, + tmpdir, + linux_script, + windows_script, + windows_suffix='.bat', + linux_suffix=''): + suffix = windows_suffix if IS_WINDOWS else linux_suffix + script = windows_script if IS_WINDOWS else linux_script + script_path = tmpdir.join('script{0}'.format(suffix)) + script_path.write(script) + return str(script_path) + + def _run_and_get_task_exception(self, *args, **kwargs): + signal = events.on_failure_task_signal + with events_collector(signal) as collected: + with pytest.raises(ExecutorException): + self._run(*args, **kwargs) + return collected[signal][0]['kwargs']['exception'] + + def _run(self, + executor, + workflow_context, + script_path, + process=None, + env_var='value', + arguments=None): + local_script_path = script_path + script_path = os.path.basename(local_script_path) if local_script_path else '' + arguments = arguments or {} + process = process or {} + if script_path: + workflow_context.resource.service.upload( + entry_id=str(workflow_context.service.id), + source=local_script_path, + path=script_path) + + arguments.update({ + 'script_path': script_path, + 'process': process, + 'input_as_env_var': env_var + }) + + node = workflow_context.model.node.get_by_name(mock.models.DEPENDENCY_NODE_NAME) + interface = mock.models.create_interface( + node.service, + 'test', + 'op', + operation_kwargs=dict( + function='{0}.{1}'.format( + operations.__name__, + operations.run_script_locally.__name__), + arguments=arguments) + ) + node.interfaces[interface.name] = interface + workflow_context.model.node.update(node) + + @workflow + def mock_workflow(ctx, graph): + graph.add_tasks(api.task.OperationTask( + node, + interface_name='test', + operation_name='op', + arguments=arguments)) + return graph + tasks_graph = mock_workflow(ctx=workflow_context) # pylint: disable=no-value-for-parameter + graph_compiler.GraphCompiler(workflow_context, executor.__class__).compile(tasks_graph) + eng = engine.Engine({executor.__class__: executor}) + eng.execute(workflow_context) + return workflow_context.model.node.get_by_name( + mock.models.DEPENDENCY_NODE_NAME).attributes + + @pytest.fixture + def executor(self): + result = process.ProcessExecutor() + try: + yield result + finally: + result.close() + + @pytest.fixture + def workflow_context(self, tmpdir): + workflow_context = mock.context.simple(str(tmpdir), inmemory=False) + workflow_context.states = [] + workflow_context.exception = None + yield workflow_context + storage.release_sqlite_storage(workflow_context.model) + + +class BaseTestConfiguration(object): + + @pytest.fixture(autouse=True) + def mock_execute(self, mocker): + def eval_func(**_): + self.called = 'eval' + + def execute_func(process, **_): + self.process = process + self.called = 'execute' + self.process = {} + self.called = None + mocker.patch.object(local, '_execute_func', execute_func) + mocker.patch.object(local, '_eval_script_func', eval_func) + + class Ctx(object): + @staticmethod + def download_resource(destination, *args, **kwargs): + return destination + + def _run(self, script_path, process=None): + local.run_script( + script_path=script_path, + process=process, + ctx=self.Ctx) + + +class TestPowerShellConfiguration(BaseTestConfiguration): + + def test_implicit_powershell_call_with_ps1_extension(self): + self._run(script_path='script_path.ps1') + assert self.process['command_prefix'] == 'powershell' + + def test_command_prefix_is_overridden_for_ps1_extension(self): + self._run(script_path='script_path.ps1', + process={'command_prefix': 'bash'}) + assert self.process['command_prefix'] == 'bash' + + def test_explicit_powershell_call(self): + self._run(script_path='script_path.ps1', + process={'command_prefix': 'powershell'}) + assert self.process['command_prefix'] == 'powershell' + + +class TestEvalPythonConfiguration(BaseTestConfiguration): + + def test_explicit_eval_without_py_extension(self): + self._run(script_path='script_path', + process={'eval_python': True}) + assert self.called == 'eval' + + def test_explicit_eval_with_py_extension(self): + self._run(script_path='script_path.py', + process={'eval_python': True}) + assert self.called == 'eval' + + def test_implicit_eval(self): + self._run(script_path='script_path.py') + assert self.called == 'eval' + + def test_explicit_execute_without_py_extension(self): + self._run(script_path='script_path', + process={'eval_python': False}) + assert self.called == 'execute' + + def test_explicit_execute_with_py_extension(self): + self._run(script_path='script_path.py', + process={'eval_python': False}) + assert self.called == 'execute' + + def test_implicit_execute(self): + self._run(script_path='script_path') + assert self.called == 'execute' |