path: root/azure/aria/aria-extension-cloudify/src/aria/
diff options
Diffstat (limited to 'azure/aria/aria-extension-cloudify/src/aria/')
1 files changed, 528 insertions, 0 deletions
diff --git a/azure/aria/aria-extension-cloudify/src/aria/ b/azure/aria/aria-extension-cloudify/src/aria/
new file mode 100644
index 0000000..5256cf8
--- /dev/null
+++ b/azure/aria/aria-extension-cloudify/src/aria/
@@ -0,0 +1,528 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# See the License for the specific language governing permissions and
+# limitations under the License.
+import contextlib
+import json
+import logging
+import os
+import pytest
+import fabric.api
+from fabric.contrib import files
+from fabric import context_managers
+from aria.modeling import models
+from aria.orchestrator import events
+from aria.orchestrator import workflow
+from aria.orchestrator.workflows import api
+from aria.orchestrator.workflows.executor import process
+from aria.orchestrator.workflows.core import engine, graph_compiler
+from aria.orchestrator.workflows.exceptions import ExecutorException
+from aria.orchestrator.exceptions import TaskAbortException, TaskRetryException
+from aria.orchestrator.execution_plugin import operations
+from aria.orchestrator.execution_plugin import constants
+from aria.orchestrator.execution_plugin.exceptions import ProcessException, TaskException
+from aria.orchestrator.execution_plugin.ssh import operations as ssh_operations
+from tests import mock, storage, resources
+from tests.orchestrator.workflows.helpers import events_collector
+_CUSTOM_BASE_DIR = '/tmp/new-aria-ctx'
+import tests
+KEY_FILENAME = os.path.join(tests.ROOT_DIR, 'tests/resources/keys/test')
+ 'disable_known_hosts': True,
+ 'user': 'test',
+ 'key_filename': KEY_FILENAME
+import mockssh
+def server():
+ with mockssh.Server({'test': KEY_FILENAME}) as s:
+ yield s
+#@pytest.mark.skipif(not os.environ.get('TRAVIS'), reason='actual ssh server required')
+class TestWithActualSSHServer(object):
+ def test_run_script_basic(self):
+ expected_attribute_value = 'some_value'
+ props = self._execute(env={'test_value': expected_attribute_value})
+ assert props['test_value'].value == expected_attribute_value
+ @pytest.mark.skip(reason='sudo privileges are required')
+ def test_run_script_as_sudo(self):
+ self._execute(use_sudo=True)
+ with self._ssh_env():
+ assert files.exists('/opt/test_dir')
+ fabric.api.sudo('rm -rf /opt/test_dir')
+ def test_run_script_default_base_dir(self):
+ props = self._execute()
+ assert props['work_dir'].value == '{0}/work'.format(constants.DEFAULT_BASE_DIR)
+ @pytest.mark.skip(reason='Re-enable once output from process executor can be captured')
+ @pytest.mark.parametrize('hide_groups', [[], ['everything']])
+ def test_run_script_with_hide(self, hide_groups):
+ self._execute(hide_output=hide_groups)
+ output = 'TODO'
+ expected_log_message = ('[localhost] run: source {0}/scripts/'
+ .format(constants.DEFAULT_BASE_DIR))
+ if hide_groups:
+ assert expected_log_message not in output
+ else:
+ assert expected_log_message in output
+ def test_run_script_process_config(self):
+ expected_env_value = 'test_value_env'
+ expected_arg1_value = 'test_value_arg1'
+ expected_arg2_value = 'test_value_arg2'
+ expected_cwd = '/tmp'
+ expected_base_dir = _CUSTOM_BASE_DIR
+ props = self._execute(
+ env={'test_value_env': expected_env_value},
+ process={
+ 'args': [expected_arg1_value, expected_arg2_value],
+ 'cwd': expected_cwd,
+ 'base_dir': expected_base_dir
+ })
+ assert props['env_value'].value == expected_env_value
+ assert len(props['bash_version'].value) > 0
+ assert props['arg1_value'].value == expected_arg1_value
+ assert props['arg2_value'].value == expected_arg2_value
+ assert props['cwd'].value == expected_cwd
+ assert props['ctx_path'].value == '{0}/ctx'.format(expected_base_dir)
+ def test_run_script_command_prefix(self):
+ props = self._execute(process={'command_prefix': 'bash -i'})
+ assert 'i' in props['dollar_dash'].value
+ def test_run_script_reuse_existing_ctx(self):
+ expected_test_value_1 = 'test_value_1'
+ expected_test_value_2 = 'test_value_2'
+ props = self._execute(
+ test_operations=['{0}_1'.format(self.test_name),
+ '{0}_2'.format(self.test_name)],
+ env={'test_value1': expected_test_value_1,
+ 'test_value2': expected_test_value_2})
+ assert props['test_value1'].value == expected_test_value_1
+ assert props['test_value2'].value == expected_test_value_2
+ def test_run_script_download_resource_plain(self, tmpdir):
+ resource = tmpdir.join('resource')
+ resource.write('content')
+ self._upload(str(resource), 'test_resource')
+ props = self._execute()
+ assert props['test_value'].value == 'content'
+ def test_run_script_download_resource_and_render(self, tmpdir):
+ resource = tmpdir.join('resource')
+ resource.write('{{}}')
+ self._upload(str(resource), 'test_resource')
+ props = self._execute()
+ assert props['test_value'].value ==
+ @pytest.mark.parametrize('value', ['string-value', [1, 2, 3], {'key': 'value'}])
+ def test_run_script_inputs_as_env_variables_no_override(self, value):
+ props = self._execute(custom_input=value)
+ return_value = props['test_value'].value
+ expected = return_value if isinstance(value, basestring) else json.loads(return_value)
+ assert value == expected
+ @pytest.mark.parametrize('value', ['string-value', [1, 2, 3], {'key': 'value'}])
+ def test_run_script_inputs_as_env_variables_process_env_override(self, value):
+ props = self._execute(custom_input='custom-input-value',
+ env={'custom_env_var': value})
+ return_value = props['test_value'].value
+ expected = return_value if isinstance(value, basestring) else json.loads(return_value)
+ assert value == expected
+ def test_run_script_error_in_script(self):
+ exception = self._execute_and_get_task_exception()
+ assert isinstance(exception, TaskException)
+ def test_run_script_abort_immediate(self):
+ exception = self._execute_and_get_task_exception()
+ assert isinstance(exception, TaskAbortException)
+ assert exception.message == 'abort-message'
+ def test_run_script_retry(self):
+ exception = self._execute_and_get_task_exception()
+ assert isinstance(exception, TaskRetryException)
+ assert exception.message == 'retry-message'
+ def test_run_script_abort_error_ignored_by_script(self):
+ exception = self._execute_and_get_task_exception()
+ assert isinstance(exception, TaskAbortException)
+ assert exception.message == 'abort-message'
+ def test_run_commands(self):
+ temp_file_path = '/tmp/very_temporary_file'
+ with self._ssh_env():
+ if files.exists(temp_file_path):
+'rm {0}'.format(temp_file_path))
+ self._execute(commands=['touch {0}'.format(temp_file_path)])
+ with self._ssh_env():
+ assert files.exists(temp_file_path)
+'rm {0}'.format(temp_file_path))
+ @pytest.fixture(autouse=True)
+ def _setup(self, request, workflow_context, executor, capfd, server):
+ print 'HI!!!!!!!!!!', server.port
+ self._workflow_context = workflow_context
+ self._executor = executor
+ self._capfd = capfd
+ self.test_name = request.node.originalname or
+ with self._ssh_env(server):
+ for directory in [constants.DEFAULT_BASE_DIR, _CUSTOM_BASE_DIR]:
+ if files.exists(directory):
+'rm -rf {0}'.format(directory))
+ @contextlib.contextmanager
+ def _ssh_env(self, server):
+ with self._capfd.disabled():
+ with context_managers.settings(fabric.api.hide('everything'),
+ host_string='localhost:{0}'.format(server.port),
+ yield
+ def _execute(self,
+ env=None,
+ use_sudo=False,
+ hide_output=None,
+ process=None,
+ custom_input='',
+ test_operations=None,
+ commands=None):
+ process = process or {}
+ if env:
+ process.setdefault('env', {}).update(env)
+ test_operations = test_operations or [self.test_name]
+ local_script_path = os.path.join(resources.DIR, 'scripts', '')
+ script_path = os.path.basename(local_script_path)
+ self._upload(local_script_path, script_path)
+ if commands:
+ operation = operations.run_commands_with_ssh
+ else:
+ operation = operations.run_script_with_ssh
+ node = self._workflow_context.model.node.get_by_name(mock.models.DEPENDENCY_NODE_NAME)
+ arguments = {
+ 'script_path': script_path,
+ 'fabric_env': _FABRIC_ENV,
+ 'process': process,
+ 'use_sudo': use_sudo,
+ 'custom_env_var': custom_input,
+ 'test_operation': '',
+ }
+ if hide_output:
+ arguments['hide_output'] = hide_output
+ if commands:
+ arguments['commands'] = commands
+ interface = mock.models.create_interface(
+ node.service,
+ 'test',
+ 'op',
+ operation_kwargs=dict(
+ function='{0}.{1}'.format(
+ operations.__name__,
+ operation.__name__),
+ arguments=arguments)
+ )
+ node.interfaces[] = interface
+ @workflow
+ def mock_workflow(ctx, graph):
+ ops = []
+ for test_operation in test_operations:
+ op_arguments = arguments.copy()
+ op_arguments['test_operation'] = test_operation
+ ops.append(api.task.OperationTask(
+ node,
+ interface_name='test',
+ operation_name='op',
+ arguments=op_arguments))
+ graph.sequence(*ops)
+ return graph
+ tasks_graph = mock_workflow(ctx=self._workflow_context) # pylint: disable=no-value-for-parameter
+ graph_compiler.GraphCompiler(
+ self._workflow_context, self._executor.__class__).compile(tasks_graph)
+ eng = engine.Engine({self._executor.__class__: self._executor})
+ eng.execute(self._workflow_context)
+ return self._workflow_context.model.node.get_by_name(
+ mock.models.DEPENDENCY_NODE_NAME).attributes
+ def _execute_and_get_task_exception(self, *args, **kwargs):
+ signal = events.on_failure_task_signal
+ with events_collector(signal) as collected:
+ with pytest.raises(ExecutorException):
+ self._execute(*args, **kwargs)
+ return collected[signal][0]['kwargs']['exception']
+ def _upload(self, source, path):
+ self._workflow_context.resource.service.upload(
+ entry_id=str(,
+ source=source,
+ path=path)
+ @pytest.fixture
+ def executor(self):
+ result = process.ProcessExecutor()
+ try:
+ yield result
+ finally:
+ result.close()
+ @pytest.fixture
+ def workflow_context(self, tmpdir):
+ workflow_context = mock.context.simple(str(tmpdir))
+ workflow_context.states = []
+ workflow_context.exception = None
+ yield workflow_context
+ storage.release_sqlite_storage(workflow_context.model)
+class TestFabricEnvHideGroupsAndRunCommands(object):
+ def test_fabric_env_default_override(self):
+ # first sanity for no override
+ self._run()
+ assert self.mock.settings_merged['timeout'] == constants.FABRIC_ENV_DEFAULTS['timeout']
+ # now override
+ invocation_fabric_env = self.default_fabric_env.copy()
+ timeout = 1000000
+ invocation_fabric_env['timeout'] = timeout
+ self._run(fabric_env=invocation_fabric_env)
+ assert self.mock.settings_merged['timeout'] == timeout
+ def test_implicit_host_string(self, mocker):
+ expected_host_address = ''
+ mocker.patch.object(, 'host')
+ mocker.patch.object(, 'host_address', expected_host_address)
+ fabric_env = self.default_fabric_env.copy()
+ del fabric_env['host_string']
+ self._run(fabric_env=fabric_env)
+ assert self.mock.settings_merged['host_string'] == expected_host_address
+ def test_explicit_host_string(self):
+ fabric_env = self.default_fabric_env.copy()
+ host_string = 'explicit_host_string'
+ fabric_env['host_string'] = host_string
+ self._run(fabric_env=fabric_env)
+ assert self.mock.settings_merged['host_string'] == host_string
+ def test_override_warn_only(self):
+ fabric_env = self.default_fabric_env.copy()
+ self._run(fabric_env=fabric_env)
+ assert self.mock.settings_merged['warn_only'] is True
+ fabric_env = self.default_fabric_env.copy()
+ fabric_env['warn_only'] = False
+ self._run(fabric_env=fabric_env)
+ assert self.mock.settings_merged['warn_only'] is False
+ def test_missing_host_string(self):
+ with pytest.raises(TaskAbortException) as exc_ctx:
+ fabric_env = self.default_fabric_env.copy()
+ del fabric_env['host_string']
+ self._run(fabric_env=fabric_env)
+ assert '`host_string` not supplied' in str(exc_ctx.value)
+ def test_missing_user(self):
+ with pytest.raises(TaskAbortException) as exc_ctx:
+ fabric_env = self.default_fabric_env.copy()
+ del fabric_env['user']
+ self._run(fabric_env=fabric_env)
+ assert '`user` not supplied' in str(exc_ctx.value)
+ def test_missing_key_or_password(self):
+ with pytest.raises(TaskAbortException) as exc_ctx:
+ fabric_env = self.default_fabric_env.copy()
+ del fabric_env['key_filename']
+ self._run(fabric_env=fabric_env)
+ assert 'Access credentials not supplied' in str(exc_ctx.value)
+ def test_hide_in_settings_and_non_viable_groups(self):
+ groups = ('running', 'stdout')
+ self._run(hide_output=groups)
+ assert set(self.mock.settings_merged['hide_output']) == set(groups)
+ with pytest.raises(TaskAbortException) as exc_ctx:
+ self._run(hide_output=('running', 'bla'))
+ assert '`hide_output` must be a subset of' in str(exc_ctx.value)
+ def test_run_commands(self):
+ def test(use_sudo):
+ commands = ['command1', 'command2']
+ self._run(
+ commands=commands,
+ use_sudo=use_sudo)
+ assert all(item in self.mock.settings_merged.items() for
+ item in self.default_fabric_env.items())
+ assert self.mock.settings_merged['warn_only'] is True
+ assert self.mock.settings_merged['use_sudo'] == use_sudo
+ assert self.mock.commands == commands
+ self.mock.settings_merged = {}
+ self.mock.commands = []
+ test(use_sudo=False)
+ test(use_sudo=True)
+ def test_failed_command(self):
+ with pytest.raises(ProcessException) as exc_ctx:
+ self._run(commands=['fail'])
+ exception = exc_ctx.value
+ assert exception.stdout == self.MockCommandResult.stdout
+ assert exception.stderr == self.MockCommandResult.stderr
+ assert exception.command == self.MockCommandResult.command
+ assert exception.exit_code == self.MockCommandResult.return_code
+ class MockCommandResult(object):
+ stdout = 'mock_stdout'
+ stderr = 'mock_stderr'
+ command = 'mock_command'
+ return_code = 1
+ def __init__(self, failed):
+ self.failed = failed
+ class MockFabricApi(object):
+ def __init__(self):
+ self.commands = []
+ self.settings_merged = {}
+ @contextlib.contextmanager
+ def settings(self, *args, **kwargs):
+ self.settings_merged.update(kwargs)
+ if args:
+ groups = args[0]
+ self.settings_merged.update({'hide_output': groups})
+ yield
+ def run(self, command):
+ self.commands.append(command)
+ self.settings_merged['use_sudo'] = False
+ return TestFabricEnvHideGroupsAndRunCommands.MockCommandResult(command == 'fail')
+ def sudo(self, command):
+ self.commands.append(command)
+ self.settings_merged['use_sudo'] = True
+ return TestFabricEnvHideGroupsAndRunCommands.MockCommandResult(command == 'fail')
+ def hide(self, *groups):
+ return groups
+ def exists(self, *args, **kwargs):
+ raise RuntimeError
+ class _Ctx(object):
+ class Task(object):
+ @staticmethod
+ def abort(message=None):
+ models.Task.abort(message)
+ actor = None
+ class Actor(object):
+ host = None
+ class Model(object):
+ @contextlib.contextmanager
+ def instrument(self, *args, **kwargs):
+ yield
+ task = Task
+ = Actor
+ model = Model()
+ logger = logging.getLogger()
+ @staticmethod
+ @contextlib.contextmanager
+ def _mock_self_logging(*args, **kwargs):
+ yield
+ _Ctx.logging_handlers = _mock_self_logging
+ @pytest.fixture(autouse=True)
+ def _setup(self, mocker):
+ self.default_fabric_env = {
+ 'host_string': 'test',
+ 'user': 'test',
+ 'key_filename': 'test',
+ }
+ self.mock = self.MockFabricApi()
+ mocker.patch('fabric.api', self.mock)
+ def _run(self,
+ commands=(),
+ fabric_env=None,
+ process=None,
+ use_sudo=False,
+ hide_output=None):
+ operations.run_commands_with_ssh(
+ ctx=self._Ctx,
+ commands=commands,
+ process=process,
+ fabric_env=fabric_env or self.default_fabric_env,
+ use_sudo=use_sudo,
+ hide_output=hide_output)
+class TestUtilityFunctions(object):
+ def test_paths(self):
+ base_dir = '/path'
+ local_script_path = '/local/script/'
+ paths = ssh_operations._Paths(base_dir=base_dir,
+ local_script_path=local_script_path)
+ assert paths.local_script_path == local_script_path
+ assert paths.remote_ctx_dir == base_dir
+ assert paths.base_script_path == ''
+ assert paths.remote_ctx_path == '/path/ctx'
+ assert paths.remote_scripts_dir == '/path/scripts'
+ assert paths.remote_work_dir == '/path/work'
+ assert paths.remote_env_script_path.startswith('/path/scripts/')
+ assert paths.remote_script_path.startswith('/path/scripts/')
+ def test_write_environment_script_file(self):
+ base_dir = '/path'
+ local_script_path = '/local/script/'
+ paths = ssh_operations._Paths(base_dir=base_dir,
+ local_script_path=local_script_path)
+ env = {'one': "'1'"}
+ local_socket_url = 'local_socket_url'
+ remote_socket_url = 'remote_socket_url'
+ env_script_lines = set([l for l in ssh_operations._write_environment_script_file(
+ process={'env': env},
+ paths=paths,
+ local_socket_url=local_socket_url,
+ remote_socket_url=remote_socket_url
+ ).getvalue().split('\n') if l])
+ expected_env_script_lines = set([
+ 'export PATH=/path:$PATH',
+ 'export PYTHONPATH=/path:$PYTHONPATH',
+ 'chmod +x /path/ctx',
+ 'chmod +x {0}'.format(paths.remote_script_path),
+ 'export CTX_SOCKET_URL={0}'.format(remote_socket_url),
+ 'export LOCAL_CTX_SOCKET_URL={0}'.format(local_socket_url),
+ 'export one=\'1\''
+ ])
+ assert env_script_lines == expected_env_script_lines