summaryrefslogtreecommitdiffstats
path: root/azure/aria/aria-extension-cloudify/src/aria/tests/orchestrator/execution_plugin/test_local.py
diff options
context:
space:
mode:
Diffstat (limited to 'azure/aria/aria-extension-cloudify/src/aria/tests/orchestrator/execution_plugin/test_local.py')
-rw-r--r--azure/aria/aria-extension-cloudify/src/aria/tests/orchestrator/execution_plugin/test_local.py598
1 files changed, 598 insertions, 0 deletions
diff --git a/azure/aria/aria-extension-cloudify/src/aria/tests/orchestrator/execution_plugin/test_local.py b/azure/aria/aria-extension-cloudify/src/aria/tests/orchestrator/execution_plugin/test_local.py
new file mode 100644
index 0000000..7f33318
--- /dev/null
+++ b/azure/aria/aria-extension-cloudify/src/aria/tests/orchestrator/execution_plugin/test_local.py
@@ -0,0 +1,598 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import json
+import os
+
+import pytest
+
+from aria import workflow
+from aria.orchestrator import events
+from aria.orchestrator.workflows import api
+from aria.orchestrator.workflows.exceptions import ExecutorException
+from aria.orchestrator.exceptions import TaskAbortException, TaskRetryException
+from aria.orchestrator.execution_plugin import operations
+from aria.orchestrator.execution_plugin.exceptions import ProcessException
+from aria.orchestrator.execution_plugin import local
+from aria.orchestrator.execution_plugin import constants
+from aria.orchestrator.workflows.executor import process
+from aria.orchestrator.workflows.core import engine, graph_compiler
+
+from tests import mock
+from tests import storage
+from tests.orchestrator.workflows.helpers import events_collector
+
+IS_WINDOWS = os.name == 'nt'
+
+
+class TestLocalRunScript(object):
+
+ def test_script_path_parameter(self, executor, workflow_context, tmpdir):
+ script_path = self._create_script(
+ tmpdir,
+ linux_script='''#! /bin/bash -e
+ ctx node attributes map key = value
+ ''',
+ windows_script='''
+ ctx node attributes map key = value
+ ''')
+ props = self._run(
+ executor, workflow_context,
+ script_path=script_path)
+ assert props['map'].value['key'] == 'value'
+
+ def test_process_env(self, executor, workflow_context, tmpdir):
+ script_path = self._create_script(
+ tmpdir,
+ linux_script='''#! /bin/bash -e
+ ctx node attributes map key1 = "$key1"
+ ctx node attributes map key2 = "$key2"
+ ''',
+ windows_script='''
+ ctx node attributes map key1 = %key1%
+ ctx node attributes map key2 = %key2%
+ ''')
+ props = self._run(
+ executor, workflow_context,
+ script_path=script_path,
+ process={
+ 'env': {
+ 'key1': 'value1',
+ 'key2': 'value2'
+ }
+ })
+ p_map = props['map'].value
+ assert p_map['key1'] == 'value1'
+ assert p_map['key2'] == 'value2'
+
+ def test_process_cwd(self, executor, workflow_context, tmpdir):
+ script_path = self._create_script(
+ tmpdir,
+ linux_script='''#! /bin/bash -e
+ ctx node attributes map cwd = "$PWD"
+ ''',
+ windows_script='''
+ ctx node attributes map cwd = %CD%
+ ''')
+ tmpdir = str(tmpdir)
+ props = self._run(
+ executor, workflow_context,
+ script_path=script_path,
+ process={
+ 'cwd': tmpdir
+ })
+ p_map = props['map'].value
+ assert p_map['cwd'] == tmpdir
+
+ def test_process_command_prefix(self, executor, workflow_context, tmpdir):
+ use_ctx = 'ctx node attributes map key = value'
+ python_script = ['import subprocess',
+ 'subprocess.Popen("{0}".split(' ')).communicate()[0]'.format(use_ctx)]
+ python_script = '\n'.join(python_script)
+ script_path = self._create_script(
+ tmpdir,
+ linux_script=python_script,
+ windows_script=python_script,
+ windows_suffix='',
+ linux_suffix='')
+ props = self._run(
+ executor, workflow_context,
+ script_path=script_path,
+ process={
+ 'env': {'TEST_KEY': 'value'},
+ 'command_prefix': 'python'
+ })
+ p_map = props['map'].value
+ assert p_map['key'] == 'value'
+
+ def test_process_args(self, executor, workflow_context, tmpdir):
+ script_path = self._create_script(
+ tmpdir,
+ linux_script='''#! /bin/bash -e
+ ctx node attributes map arg1 = "$1"
+ ctx node attributes map arg2 = "$2"
+ ''',
+ windows_script='''
+ ctx node attributes map arg1 = %1
+ ctx node attributes map arg2 = %2
+ ''')
+ props = self._run(
+ executor, workflow_context,
+ script_path=script_path,
+ process={
+ 'args': ['"arg with spaces"', 'arg2']
+ })
+ assert props['map'].value['arg1'] == 'arg with spaces'
+ assert props['map'].value['arg2'] == 'arg2'
+
+ def test_no_script_path(self, executor, workflow_context):
+ exception = self._run_and_get_task_exception(
+ executor, workflow_context,
+ script_path=None)
+ assert isinstance(exception, TaskAbortException)
+ assert 'script_path' in exception.message
+
+ def test_script_error(self, executor, workflow_context, tmpdir):
+ script_path = self._create_script(
+ tmpdir,
+ linux_script='''#! /bin/bash -e
+ echo 123123
+ command_that_does_not_exist [ ]
+ ''',
+ windows_script='''
+ @echo off
+ echo 123123
+ command_that_does_not_exist [ ]
+ ''')
+ exception = self._run_and_get_task_exception(
+ executor, workflow_context,
+ script_path=script_path)
+ assert isinstance(exception, ProcessException)
+ assert os.path.basename(script_path) in exception.command
+ assert exception.exit_code == 1 if IS_WINDOWS else 127
+ assert exception.stdout.strip() == '123123'
+ assert 'command_that_does_not_exist' in exception.stderr
+
+ def test_script_error_from_bad_ctx_request(self, executor, workflow_context, tmpdir):
+ script_path = self._create_script(
+ tmpdir,
+ linux_script='''#! /bin/bash -e
+ ctx property_that_does_not_exist
+ ''',
+ windows_script='''
+ ctx property_that_does_not_exist
+ ''')
+ exception = self._run_and_get_task_exception(
+ executor, workflow_context,
+ script_path=script_path)
+ assert isinstance(exception, ProcessException)
+ assert os.path.basename(script_path) in exception.command
+ assert exception.exit_code == 1
+ assert 'RequestError' in exception.stderr
+ assert 'property_that_does_not_exist' in exception.stderr
+
+ def test_python_script(self, executor, workflow_context, tmpdir):
+ script = '''
+from aria.orchestrator.execution_plugin import ctx, inputs
+if __name__ == '__main__':
+ ctx.node.attributes['key'] = inputs['key']
+'''
+ suffix = '.py'
+ script_path = self._create_script(
+ tmpdir,
+ linux_script=script,
+ windows_script=script,
+ linux_suffix=suffix,
+ windows_suffix=suffix)
+ props = self._run(
+ executor, workflow_context,
+ script_path=script_path,
+ arguments={'key': 'value'})
+ assert props['key'].value == 'value'
+
+ @pytest.mark.parametrize(
+ 'value', ['string-value', [1, 2, 3], 999, 3.14, False,
+ {'complex1': {'complex2': {'key': 'value'}, 'list': [1, 2, 3]}}])
+ def test_inputs_as_environment_variables(self, executor, workflow_context, tmpdir, value):
+ script_path = self._create_script(
+ tmpdir,
+ linux_script='''#! /bin/bash -e
+ ctx node attributes key = "${input_as_env_var}"
+ ''',
+ windows_script='''
+ ctx node attributes key = "%input_as_env_var%"
+ ''')
+ props = self._run(
+ executor, workflow_context,
+ script_path=script_path,
+ env_var=value)
+ value = props['key'].value
+ expected = value if isinstance(value, basestring) else json.loads(value)
+ assert expected == value
+
+ @pytest.mark.parametrize('value', ['override', {'key': 'value'}])
+ def test_explicit_env_variables_inputs_override(
+ self, executor, workflow_context, tmpdir, value):
+ script_path = self._create_script(
+ tmpdir,
+ linux_script='''#! /bin/bash -e
+ ctx node attributes key = "${input_as_env_var}"
+ ''',
+ windows_script='''
+ ctx node attributes key = "%input_as_env_var%"
+ ''')
+
+ props = self._run(
+ executor, workflow_context,
+ script_path=script_path,
+ env_var='test-value',
+ process={
+ 'env': {
+ 'input_as_env_var': value
+ }
+ })
+ value = props['key'].value
+ expected = value if isinstance(value, basestring) else json.loads(value)
+ assert expected == value
+
+ def test_get_nonexistent_runtime_property(self, executor, workflow_context, tmpdir):
+ script_path = self._create_script(
+ tmpdir,
+ linux_script='''#! /bin/bash -e
+ ctx node attributes nonexistent
+ ''',
+ windows_script='''
+ ctx node attributes nonexistent
+ ''')
+ exception = self._run_and_get_task_exception(
+ executor, workflow_context,
+ script_path=script_path)
+ assert isinstance(exception, ProcessException)
+ assert os.path.basename(script_path) in exception.command
+ assert 'RequestError' in exception.stderr
+ assert 'nonexistent' in exception.stderr
+
+ def test_get_nonexistent_runtime_property_json(self, executor, workflow_context, tmpdir):
+ script_path = self._create_script(
+ tmpdir,
+ linux_script='''#! /bin/bash -e
+ ctx -j node attributes nonexistent
+ ''',
+ windows_script='''
+ ctx -j node attributes nonexistent
+ ''')
+ exception = self._run_and_get_task_exception(
+ executor, workflow_context,
+ script_path=script_path)
+ assert isinstance(exception, ProcessException)
+ assert os.path.basename(script_path) in exception.command
+ assert 'RequestError' in exception.stderr
+ assert 'nonexistent' in exception.stderr
+
+ def test_abort(self, executor, workflow_context, tmpdir):
+ script_path = self._create_script(
+ tmpdir,
+ linux_script='''#! /bin/bash -e
+ ctx task abort [ abort-message ]
+ ''',
+ windows_script='''
+ ctx task abort [ abort-message ]
+ ''')
+ exception = self._run_and_get_task_exception(
+ executor, workflow_context,
+ script_path=script_path)
+ assert isinstance(exception, TaskAbortException)
+ assert exception.message == 'abort-message'
+
+ def test_retry(self, executor, workflow_context, tmpdir):
+ script_path = self._create_script(
+ tmpdir,
+ linux_script='''#! /bin/bash -e
+ ctx task retry [ retry-message ]
+ ''',
+ windows_script='''
+ ctx task retry [ retry-message ]
+ ''')
+ exception = self._run_and_get_task_exception(
+ executor, workflow_context,
+ script_path=script_path)
+ assert isinstance(exception, TaskRetryException)
+ assert exception.message == 'retry-message'
+
+ def test_retry_with_interval(self, executor, workflow_context, tmpdir):
+ script_path = self._create_script(
+ tmpdir,
+ linux_script='''#! /bin/bash -e
+ ctx task retry [ retry-message @100 ]
+ ''',
+ windows_script='''
+ ctx task retry [ retry-message @100 ]
+ ''')
+ exception = self._run_and_get_task_exception(
+ executor, workflow_context,
+ script_path=script_path)
+ assert isinstance(exception, TaskRetryException)
+ assert exception.message == 'retry-message'
+ assert exception.retry_interval == 100
+
+ def test_crash_abort_after_retry(self, executor, workflow_context, tmpdir):
+ script_path = self._create_script(
+ tmpdir,
+ linux_script='''#! /bin/bash
+ ctx task retry [ retry-message ]
+ ctx task abort [ should-raise-a-runtime-error ]
+ ''',
+ windows_script='''
+ ctx task retry [ retry-message ]
+ ctx task abort [ should-raise-a-runtime-error ]
+ ''')
+ exception = self._run_and_get_task_exception(
+ executor, workflow_context,
+ script_path=script_path)
+ assert isinstance(exception, TaskAbortException)
+ assert exception.message == constants.ILLEGAL_CTX_OPERATION_MESSAGE
+
+ def test_crash_retry_after_abort(self, executor, workflow_context, tmpdir):
+ script_path = self._create_script(
+ tmpdir,
+ linux_script='''#! /bin/bash
+ ctx task abort [ abort-message ]
+ ctx task retry [ should-raise-a-runtime-error ]
+ ''',
+ windows_script='''
+ ctx task abort [ abort-message ]
+ ctx task retry [ should-raise-a-runtime-error ]
+ ''')
+ exception = self._run_and_get_task_exception(
+ executor, workflow_context,
+ script_path=script_path)
+ assert isinstance(exception, TaskAbortException)
+ assert exception.message == constants.ILLEGAL_CTX_OPERATION_MESSAGE
+
+ def test_crash_abort_after_abort(self, executor, workflow_context, tmpdir):
+ script_path = self._create_script(
+ tmpdir,
+ linux_script='''#! /bin/bash
+ ctx task abort [ abort-message ]
+ ctx task abort [ should-raise-a-runtime-error ]
+ ''',
+ windows_script='''
+ ctx task abort [ abort-message ]
+ ctx task abort [ should-raise-a-runtime-error ]
+ ''')
+ exception = self._run_and_get_task_exception(
+ executor, workflow_context,
+ script_path=script_path)
+ assert isinstance(exception, TaskAbortException)
+ assert exception.message == constants.ILLEGAL_CTX_OPERATION_MESSAGE
+
+ def test_crash_retry_after_retry(self, executor, workflow_context, tmpdir):
+ script_path = self._create_script(
+ tmpdir,
+ linux_script='''#! /bin/bash
+ ctx task retry [ retry-message ]
+ ctx task retry [ should-raise-a-runtime-error ]
+ ''',
+ windows_script='''
+ ctx task retry [ retry-message ]
+ ctx task retry [ should-raise-a-runtime-error ]
+ ''')
+ exception = self._run_and_get_task_exception(
+ executor, workflow_context,
+ script_path=script_path)
+ assert isinstance(exception, TaskAbortException)
+ assert exception.message == constants.ILLEGAL_CTX_OPERATION_MESSAGE
+
+ def test_retry_returns_a_nonzero_exit_code(self, executor, workflow_context, tmpdir):
+ log_path = tmpdir.join('temp.log')
+ message = 'message'
+ script_path = self._create_script(
+ tmpdir,
+ linux_script='''#! /bin/bash -e
+ ctx task retry [ "{0}" ] 2> {1}
+ echo should-not-run > {1}
+ '''.format(message, log_path),
+ windows_script='''
+ ctx task retry [ "{0}" ] 2> {1}
+ if %errorlevel% neq 0 exit /b %errorlevel%
+ echo should-not-run > {1}
+ '''.format(message, log_path))
+ with pytest.raises(ExecutorException):
+ self._run(
+ executor, workflow_context,
+ script_path=script_path)
+ assert log_path.read().strip() == message
+
+ def test_abort_returns_a_nonzero_exit_code(self, executor, workflow_context, tmpdir):
+ log_path = tmpdir.join('temp.log')
+ message = 'message'
+ script_path = self._create_script(
+ tmpdir,
+ linux_script='''#! /bin/bash -e
+ ctx task abort [ "{0}" ] 2> {1}
+ echo should-not-run > {1}
+ '''.format(message, log_path),
+ windows_script='''
+ ctx task abort [ "{0}" ] 2> {1}
+ if %errorlevel% neq 0 exit /b %errorlevel%
+ echo should-not-run > {1}
+ '''.format(message, log_path))
+ with pytest.raises(ExecutorException):
+ self._run(
+ executor, workflow_context,
+ script_path=script_path)
+ assert log_path.read().strip() == message
+
+ def _create_script(self,
+ tmpdir,
+ linux_script,
+ windows_script,
+ windows_suffix='.bat',
+ linux_suffix=''):
+ suffix = windows_suffix if IS_WINDOWS else linux_suffix
+ script = windows_script if IS_WINDOWS else linux_script
+ script_path = tmpdir.join('script{0}'.format(suffix))
+ script_path.write(script)
+ return str(script_path)
+
+ def _run_and_get_task_exception(self, *args, **kwargs):
+ signal = events.on_failure_task_signal
+ with events_collector(signal) as collected:
+ with pytest.raises(ExecutorException):
+ self._run(*args, **kwargs)
+ return collected[signal][0]['kwargs']['exception']
+
+ def _run(self,
+ executor,
+ workflow_context,
+ script_path,
+ process=None,
+ env_var='value',
+ arguments=None):
+ local_script_path = script_path
+ script_path = os.path.basename(local_script_path) if local_script_path else ''
+ arguments = arguments or {}
+ process = process or {}
+ if script_path:
+ workflow_context.resource.service.upload(
+ entry_id=str(workflow_context.service.id),
+ source=local_script_path,
+ path=script_path)
+
+ arguments.update({
+ 'script_path': script_path,
+ 'process': process,
+ 'input_as_env_var': env_var
+ })
+
+ node = workflow_context.model.node.get_by_name(mock.models.DEPENDENCY_NODE_NAME)
+ interface = mock.models.create_interface(
+ node.service,
+ 'test',
+ 'op',
+ operation_kwargs=dict(
+ function='{0}.{1}'.format(
+ operations.__name__,
+ operations.run_script_locally.__name__),
+ arguments=arguments)
+ )
+ node.interfaces[interface.name] = interface
+ workflow_context.model.node.update(node)
+
+ @workflow
+ def mock_workflow(ctx, graph):
+ graph.add_tasks(api.task.OperationTask(
+ node,
+ interface_name='test',
+ operation_name='op',
+ arguments=arguments))
+ return graph
+ tasks_graph = mock_workflow(ctx=workflow_context) # pylint: disable=no-value-for-parameter
+ graph_compiler.GraphCompiler(workflow_context, executor.__class__).compile(tasks_graph)
+ eng = engine.Engine({executor.__class__: executor})
+ eng.execute(workflow_context)
+ return workflow_context.model.node.get_by_name(
+ mock.models.DEPENDENCY_NODE_NAME).attributes
+
+ @pytest.fixture
+ def executor(self):
+ result = process.ProcessExecutor()
+ try:
+ yield result
+ finally:
+ result.close()
+
+ @pytest.fixture
+ def workflow_context(self, tmpdir):
+ workflow_context = mock.context.simple(str(tmpdir), inmemory=False)
+ workflow_context.states = []
+ workflow_context.exception = None
+ yield workflow_context
+ storage.release_sqlite_storage(workflow_context.model)
+
+
+class BaseTestConfiguration(object):
+
+ @pytest.fixture(autouse=True)
+ def mock_execute(self, mocker):
+ def eval_func(**_):
+ self.called = 'eval'
+
+ def execute_func(process, **_):
+ self.process = process
+ self.called = 'execute'
+ self.process = {}
+ self.called = None
+ mocker.patch.object(local, '_execute_func', execute_func)
+ mocker.patch.object(local, '_eval_script_func', eval_func)
+
+ class Ctx(object):
+ @staticmethod
+ def download_resource(destination, *args, **kwargs):
+ return destination
+
+ def _run(self, script_path, process=None):
+ local.run_script(
+ script_path=script_path,
+ process=process,
+ ctx=self.Ctx)
+
+
+class TestPowerShellConfiguration(BaseTestConfiguration):
+
+ def test_implicit_powershell_call_with_ps1_extension(self):
+ self._run(script_path='script_path.ps1')
+ assert self.process['command_prefix'] == 'powershell'
+
+ def test_command_prefix_is_overridden_for_ps1_extension(self):
+ self._run(script_path='script_path.ps1',
+ process={'command_prefix': 'bash'})
+ assert self.process['command_prefix'] == 'bash'
+
+ def test_explicit_powershell_call(self):
+ self._run(script_path='script_path.ps1',
+ process={'command_prefix': 'powershell'})
+ assert self.process['command_prefix'] == 'powershell'
+
+
+class TestEvalPythonConfiguration(BaseTestConfiguration):
+
+ def test_explicit_eval_without_py_extension(self):
+ self._run(script_path='script_path',
+ process={'eval_python': True})
+ assert self.called == 'eval'
+
+ def test_explicit_eval_with_py_extension(self):
+ self._run(script_path='script_path.py',
+ process={'eval_python': True})
+ assert self.called == 'eval'
+
+ def test_implicit_eval(self):
+ self._run(script_path='script_path.py')
+ assert self.called == 'eval'
+
+ def test_explicit_execute_without_py_extension(self):
+ self._run(script_path='script_path',
+ process={'eval_python': False})
+ assert self.called == 'execute'
+
+ def test_explicit_execute_with_py_extension(self):
+ self._run(script_path='script_path.py',
+ process={'eval_python': False})
+ assert self.called == 'execute'
+
+ def test_implicit_execute(self):
+ self._run(script_path='script_path')
+ assert self.called == 'execute'