summaryrefslogtreecommitdiffstats
path: root/azure/aria/aria-extension-cloudify/aria_extension_tests/adapters/test_context_adapter.py
diff options
context:
space:
mode:
Diffstat (limited to 'azure/aria/aria-extension-cloudify/aria_extension_tests/adapters/test_context_adapter.py')
-rw-r--r--azure/aria/aria-extension-cloudify/aria_extension_tests/adapters/test_context_adapter.py541
1 files changed, 541 insertions, 0 deletions
diff --git a/azure/aria/aria-extension-cloudify/aria_extension_tests/adapters/test_context_adapter.py b/azure/aria/aria-extension-cloudify/aria_extension_tests/adapters/test_context_adapter.py
new file mode 100644
index 0000000..267f211
--- /dev/null
+++ b/azure/aria/aria-extension-cloudify/aria_extension_tests/adapters/test_context_adapter.py
@@ -0,0 +1,541 @@
+#
+# Copyright (c) 2017 GigaSpaces Technologies Ltd. All rights reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+#
+
+import os
+import copy
+import datetime
+import contextlib
+
+import pytest
+
+from aria import (workflow, operation)
+from aria.modeling import models
+from aria.orchestrator import events
+from aria.orchestrator.workflows import api
+from aria.orchestrator.workflows.exceptions import ExecutorException
+from aria.orchestrator.workflows.executor import process
+from aria.orchestrator.workflows.core import (engine, graph_compiler)
+from aria.orchestrator.exceptions import (TaskAbortException, TaskRetryException)
+from aria.utils import type as type_
+
+import tests
+from tests import (mock, storage, conftest)
+from tests.orchestrator.workflows.helpers import events_collector
+
+from adapters import context_adapter
+
+
+@pytest.fixture(autouse=True)
+def cleanup_logger(request):
+ conftest.logging_handler_cleanup(request)
+
+
+class TestCloudifyContextAdapter(object):
+
+ def test_node_instance_operation(self, executor, workflow_context):
+ node_template = self._get_node_template(workflow_context)
+ node_type = 'aria.plugin.nodes.App'
+ node_instance_property = models.Property.wrap('hello', 'world')
+ node_template.type = models.Type(variant='variant', name=node_type)
+ node = self._get_node(workflow_context)
+ node_instance_attribute = models.Attribute.wrap('hello2', 'world2')
+ node.attributes[node_instance_attribute.name] = node_instance_attribute
+ node.properties[node_instance_property.name] = node_instance_property
+ workflow_context.model.node.update(node)
+ workflow_context.model.node_template.update(node_template)
+
+ out = self._run(executor, workflow_context, _test_node_instance_operation)
+
+ node_template = self._get_node_template(workflow_context)
+ node = self._get_node(workflow_context)
+ assert out['type'] == context_adapter.NODE_INSTANCE
+ assert out['node']['id'] == node_template.id
+ assert out['node']['name'] == node_template.name
+ assert out['node']['properties'] == \
+ {node_instance_property.name: node_instance_property.value}
+ assert out['node']['type'] == node_type
+ assert out['node']['type_hierarchy'] == ['cloudify.plugin.nodes.App']
+ assert out['instance']['id'] == node.id
+ assert out['instance']['runtime_properties'] == \
+ {node_instance_attribute.name: node_instance_attribute.value}
+ assert not out['source']
+ assert not out['target']
+
+ def test_node_instance_relationships(self, executor, workflow_context):
+ relationship_node_template = self._get_dependency_node_template(workflow_context)
+ relationship_node_instance = self._get_dependency_node(workflow_context)
+ relationship = relationship_node_instance.inbound_relationships[0]
+ relationship_type = models.Type(variant='variant', name='test.relationships.Relationship')
+ relationship.type = relationship_type
+ workflow_context.model.relationship.update(relationship)
+
+ out = self._run(executor, workflow_context, _test_node_instance_relationships)
+
+ assert len(out['instance']['relationships']) == 1
+ relationship = out['instance']['relationships'][0]
+ assert relationship['type'] == relationship_type.name
+ assert relationship['type_hierarchy'] == [relationship_type.name]
+ assert relationship['target']['node']['id'] == relationship_node_template.id
+ assert relationship['target']['instance']['id'] == relationship_node_instance.id
+
+ def test_source_operation(self, executor, workflow_context):
+ self._test_relationship_operation(executor, workflow_context, operation_end='source')
+
+ def test_target_operation(self, executor, workflow_context):
+ self._test_relationship_operation(executor, workflow_context, operation_end='target')
+
+ def _test_relationship_operation(self, executor, workflow_context, operation_end):
+ out = self._run(
+ executor, workflow_context, _test_relationship_operation, operation_end=operation_end)
+
+ source_node = self._get_node_template(workflow_context)
+ source_node_instance = self._get_node(workflow_context)
+ target_node = self._get_dependency_node_template(workflow_context)
+ target_node_instance = self._get_dependency_node(workflow_context)
+ assert out['type'] == context_adapter.RELATIONSHIP_INSTANCE
+ assert out['source']['node']['id'] == source_node.id
+ assert out['source']['instance']['id'] == source_node_instance.id
+ assert out['target']['node']['id'] == target_node.id
+ assert out['target']['instance']['id'] == target_node_instance.id
+ assert not out['node']
+ assert not out['instance']
+
+ def test_host_ip(self, executor, workflow_context):
+ node = self._get_node_template(workflow_context)
+ node.type_hierarchy = ['aria.nodes.Compute']
+ node_instance = self._get_node(workflow_context)
+ node_instance.host_fk = node_instance.id
+ node_instance_ip = '120.120.120.120'
+ node_instance.attributes['ip'] = models.Attribute.wrap('ip', node_instance_ip)
+ workflow_context.model.node_template.update(node)
+ workflow_context.model.node.update(node_instance)
+
+ out = self._run(executor, workflow_context, _test_host_ip)
+
+ assert out['instance']['host_ip'] == node_instance_ip
+
+ def test_get_and_download_resource_and_render(self, tmpdir, executor, workflow_context):
+ resource_path = 'resource'
+ variable = 'VALUE'
+ content = '{{ctx.service.name}}-{{variable}}'
+ rendered = '{0}-{1}'.format(workflow_context.service.name, variable)
+ source = tmpdir.join(resource_path)
+ source.write(content)
+ workflow_context.resource.service.upload(
+ entry_id=str(workflow_context.service.id),
+ source=str(source),
+ path=resource_path)
+
+ out = self._run(executor, workflow_context, _test_get_and_download_resource_and_render,
+ inputs={'resource': resource_path,
+ 'variable': variable})
+
+ assert out['get_resource'] == content
+ assert out['get_resource_and_render'] == rendered
+ with open(out['download_resource'], 'rb') as f:
+ assert f.read() == content
+ with open(out['download_resource_and_render'], 'rb') as f:
+ assert f.read() == rendered
+
+ os.remove(out['download_resource'])
+ os.remove(out['download_resource_and_render'])
+
+ def test_retry(self, executor, workflow_context):
+ message = 'retry-message'
+ retry_interval = 0.01
+
+ exception = self._run_and_get_task_exceptions(
+ executor, workflow_context, _test_retry,
+ inputs={'message': message, 'retry_interval': retry_interval},
+ max_attempts=2
+ )[-1]
+
+ assert isinstance(exception, TaskRetryException)
+ assert exception.message == message
+ assert exception.retry_interval == retry_interval
+
+ out = self._get_node(workflow_context).attributes['out'].value
+ assert out['operation']['retry_number'] == 1
+ assert out['operation']['max_retries'] == 1
+
+ def test_logger_and_send_event(self, executor, workflow_context):
+ # TODO: add assertions of output once process executor output can be captured
+ message = 'logger-message'
+ event = 'event-message'
+ self._run(executor, workflow_context, _test_logger_and_send_event,
+ inputs={'message': message, 'event': event})
+
+ def test_plugin(self, executor, workflow_context, tmpdir):
+ plugin = self._put_plugin(workflow_context)
+ out = self._run(executor, workflow_context, _test_plugin, plugin=plugin)
+
+ expected_workdir = tmpdir.join(
+ 'workdir', 'plugins', str(workflow_context.service.id), plugin.name)
+ assert out['plugin']['name'] == plugin.name
+ assert out['plugin']['package_name'] == plugin.package_name
+ assert out['plugin']['package_version'] == plugin.package_version
+ assert out['plugin']['workdir'] == str(expected_workdir)
+
+ def test_importable_ctx_and_inputs(self, executor, workflow_context):
+ test_inputs = {'input1': 1, 'input2': 2}
+ plugin = self._put_plugin(workflow_context, mock_cfy_plugin=True)
+
+ out = self._run(executor, workflow_context, _test_importable_ctx_and_inputs,
+ inputs=test_inputs,
+ skip_common_assert=True,
+ plugin=plugin)
+ assert out['inputs'] == test_inputs
+
+ def test_non_recoverable_error(self, executor, workflow_context):
+ message = 'NON_RECOVERABLE_MESSAGE'
+ plugin = self._put_plugin(workflow_context, mock_cfy_plugin=True)
+
+ exception = self._run_and_get_task_exceptions(
+ executor, workflow_context, _test_non_recoverable_error,
+ inputs={'message': message},
+ skip_common_assert=True,
+ plugin=plugin
+ )[0]
+ assert isinstance(exception, TaskAbortException)
+ assert exception.message == message
+
+ def test_recoverable_error(self, executor, workflow_context):
+ message = 'RECOVERABLE_MESSAGE'
+ plugin = self._put_plugin(workflow_context, mock_cfy_plugin=True)
+
+ retry_interval = 0.01
+ exception = self._run_and_get_task_exceptions(
+ executor, workflow_context, _test_recoverable_error,
+ inputs={'message': message, 'retry_interval': retry_interval},
+ skip_common_assert=True,
+ plugin=plugin
+ )[0]
+ assert isinstance(exception, TaskRetryException)
+ assert message in exception.message
+ assert exception.retry_interval == retry_interval
+
+ def _test_common(self, out, workflow_context):
+ assert out['execution_id'] == workflow_context.execution.id
+ assert out['workflow_id'] == workflow_context.execution.workflow_name
+ assert out['rest_token'] is None
+ assert out['task_id'][0] == out['task_id'][1]
+ assert out['task_name'][0] == out['task_name'][1]
+ assert out['task_target'] is None
+ assert out['task_queue'] is None
+ assert out['provider_context'] == {}
+ assert out['blueprint']['id'] == workflow_context.service_template.id
+ assert out['deployment']['id'] == workflow_context.service.id
+ assert out['operation']['name'][0] == out['operation']['name'][1]
+ assert out['operation']['retry_number'][0] == out['operation']['retry_number'][1]
+ assert out['operation']['max_retries'][0] == out['operation']['max_retries'][1] - 1
+ assert out['bootstrap_context']['resources_prefix'] == ''
+ assert out['bootstrap_context']['broker_config'] == {}
+ assert out['bootstrap_context']['cloudify_agent']['any'] is None
+ assert out['agent']['init_script'] is None
+
+ def _run(self,
+ executor,
+ workflow_context,
+ func,
+ inputs=None,
+ max_attempts=None,
+ skip_common_assert=False,
+ operation_end=None,
+ plugin=None):
+ interface_name = 'test'
+ operation_name = 'op'
+ op_dict = {'function': '{0}.{1}'.format(__name__, func.__name__),
+ 'plugin': plugin,
+ 'arguments': inputs or {}}
+ node = self._get_node(workflow_context)
+
+ if operation_end:
+ actor = relationship = node.outbound_relationships[0]
+ relationship.interfaces[interface_name] = mock.models.create_interface(
+ relationship.source_node.service,
+ interface_name,
+ operation_name,
+ operation_kwargs=op_dict
+ )
+ workflow_context.model.relationship.update(relationship)
+
+ else:
+ actor = node
+ node.interfaces[interface_name] = mock.models.create_interface(
+ node.service,
+ interface_name,
+ operation_name,
+ operation_kwargs=op_dict
+ )
+ workflow_context.model.node.update(node)
+
+ if inputs:
+ operation_inputs = \
+ actor.interfaces[interface_name].operations[operation_name].inputs
+ for input_name, input in inputs.iteritems():
+ operation_inputs[input_name] = models.Input(name=input_name,
+ type_name=type_.full_type_name(input))
+
+ @workflow
+ def mock_workflow(graph, **kwargs):
+ task = api.task.OperationTask(
+ actor,
+ interface_name,
+ operation_name,
+ arguments=inputs or {},
+ max_attempts=max_attempts
+ )
+ graph.add_tasks(task)
+
+ tasks_graph = mock_workflow(ctx=workflow_context)
+ graph_compiler.GraphCompiler(workflow_context, executor.__class__).compile(tasks_graph)
+ eng = engine.Engine(executors={executor.__class__: executor})
+ eng.execute(workflow_context)
+ out = self._get_node(workflow_context).attributes['out'].value
+ if not skip_common_assert:
+ self._test_common(out, workflow_context)
+ return out
+
+ def _get_dependency_node_template(self, workflow_context):
+ return workflow_context.model.node_template.get_by_name(
+ mock.models.DEPENDENCY_NODE_TEMPLATE_NAME)
+
+ def _get_dependency_node(self, workflow_context):
+ return workflow_context.model.node.get_by_name(mock.models.DEPENDENCY_NODE_NAME)
+
+ def _get_node_template(self, workflow_context):
+ return workflow_context.model.node_template.get_by_name(
+ mock.models.DEPENDENT_NODE_TEMPLATE_NAME)
+
+ def _get_node(self, workflow_context):
+ return workflow_context.model.node.get_by_name(mock.models.DEPENDENT_NODE_NAME)
+
+ def _run_and_get_task_exceptions(self, *args, **kwargs):
+ signal = events.on_failure_task_signal
+ with events_collector(signal) as collected:
+ with pytest.raises(ExecutorException):
+ self._run(*args, **kwargs)
+ return [event['kwargs']['exception'] for event in collected[signal]]
+
+ @pytest.fixture
+ def executor(self):
+ result = process.ProcessExecutor(python_path=[tests.ROOT_DIR])
+ yield result
+ result.close()
+
+ @pytest.fixture
+ def workflow_context(self, tmpdir):
+ result = mock.context.simple(
+ str(tmpdir),
+ context_kwargs=dict(workdir=str(tmpdir.join('workdir')))
+ )
+ yield result
+ storage.release_sqlite_storage(result.model)
+
+ def _put_plugin(self, workflow_context, mock_cfy_plugin=False):
+ name = 'PLUGIN'
+ archive_name = 'ARCHIVE'
+ package_name = 'PACKAGE'
+ package_version = '0.1.1'
+
+ plugin = models.Plugin(
+ name=name,
+ archive_name=archive_name,
+ package_name=package_name,
+ package_version=package_version,
+ uploaded_at=datetime.datetime.now(),
+ wheels=['cloudify_plugins_common'] if mock_cfy_plugin else []
+ )
+
+ workflow_context.model.plugin.put(plugin)
+
+ return plugin
+
+
+@operation
+def _test_node_instance_operation(ctx):
+ with _adapter(ctx) as (adapter, out):
+ node = adapter.node
+ instance = adapter.instance
+ out.update({
+ 'node': {
+ 'id': node.id,
+ 'name': node.name,
+ 'properties': copy.deepcopy(node.properties),
+ 'type': node.type,
+ 'type_hierarchy': node.type_hierarchy
+ },
+ 'instance': {
+ 'id': instance.id,
+ 'runtime_properties': copy.deepcopy(instance.runtime_properties)
+ }
+ })
+ try:
+ assert adapter.source
+ out['source'] = True
+ except TaskAbortException:
+ out['source'] = False
+ try:
+ assert adapter.target
+ out['target'] = True
+ except TaskAbortException:
+ out['target'] = False
+
+
+@operation
+def _test_node_instance_relationships(ctx):
+ with _adapter(ctx) as (adapter, out):
+ relationships = [{'type': r.type,
+ 'type_hierarchy': [t.name for t in r.type_hierarchy],
+ 'target': {'node': {'id': r.target.node.id},
+ 'instance': {'id': r.target.instance.id}}}
+ for r in adapter.instance.relationships]
+ out['instance'] = {'relationships': relationships}
+
+
+@operation
+def _test_relationship_operation(ctx):
+ with _adapter(ctx) as (adapter, out):
+ out.update({
+ 'source': {'node': {'id': adapter.source.node.id},
+ 'instance': {'id': adapter.source.instance.id}},
+ 'target': {'node': {'id': adapter.target.node.id},
+ 'instance': {'id': adapter.target.instance.id}}
+ })
+ try:
+ assert adapter.node
+ out['node'] = True
+ except TaskAbortException:
+ out['node'] = False
+ try:
+ assert adapter.instance
+ out['instance'] = True
+ except TaskAbortException:
+ out['instance'] = False
+
+
+@operation
+def _test_host_ip(ctx):
+ with _adapter(ctx) as (adapter, out):
+ out['instance'] = {'host_ip': adapter.instance.host_ip}
+
+
+@operation
+def _test_get_and_download_resource_and_render(ctx, resource, variable):
+ with _adapter(ctx) as (adapter, out):
+ out.update({
+ 'get_resource': adapter.get_resource(resource),
+ 'get_resource_and_render': adapter.get_resource_and_render(
+ resource, template_variables={'variable': variable}
+ ),
+ 'download_resource': adapter.download_resource(resource),
+ 'download_resource_and_render': adapter.download_resource_and_render(
+ resource, template_variables={'variable': variable}
+ )
+ })
+
+
+@operation
+def _test_retry(ctx, message, retry_interval):
+ with _adapter(ctx) as (adapter, out):
+ op = adapter.operation
+ out['operation'] = {'retry_number': op.retry_number, 'max_retries': op.max_retries}
+ op.retry(message, retry_after=retry_interval)
+
+
+@operation
+def _test_logger_and_send_event(ctx, message, event):
+ with _adapter(ctx) as (adapter, _):
+ adapter.logger.info(message)
+ adapter.send_event(event)
+
+
+@operation
+def _test_plugin(ctx):
+ with _adapter(ctx) as (adapter, out):
+ plugin = adapter.plugin
+ out['plugin'] = {
+ 'name': plugin.name,
+ 'package_name': plugin.package_name,
+ 'package_version': plugin.package_version,
+ 'workdir': plugin.workdir
+ }
+
+
+@operation
+def _test_importable_ctx_and_inputs(**_):
+ from cloudify import ctx
+ from cloudify.state import ctx_parameters
+ ctx.instance.runtime_properties['out'] = {'inputs': dict(ctx_parameters)}
+
+
+@operation
+def _test_non_recoverable_error(message, **_):
+ from cloudify.exceptions import NonRecoverableError
+ raise NonRecoverableError(message)
+
+
+@operation
+def _test_recoverable_error(message, retry_interval, **_):
+ from cloudify.exceptions import RecoverableError
+ raise RecoverableError(message, retry_interval)
+
+
+def _test_common(out, ctx, adapter):
+ op = adapter.operation
+ bootstrap_context = adapter.bootstrap_context
+ out.update({
+ 'type': adapter.type,
+ 'execution_id': adapter.execution_id,
+ 'workflow_id': adapter.workflow_id,
+ 'rest_token': adapter.rest_token,
+ 'task_id': (adapter.task_id, ctx.task.id),
+ 'task_name': (adapter.task_name, ctx.task.function),
+ 'task_target': adapter.task_target,
+ 'task_queue': adapter.task_queue,
+ 'provider_context': adapter.provider_context,
+ 'blueprint': {'id': adapter.blueprint.id},
+ 'deployment': {'id': adapter.deployment.id},
+ 'operation': {
+ 'name': [op.name, ctx.name.split('@')[0].replace(':', '.')],
+ 'retry_number': [op.retry_number, ctx.task.attempts_count - 1],
+ 'max_retries': [op.max_retries, ctx.task.max_attempts]
+ },
+ 'bootstrap_context': {
+ 'broker_config': bootstrap_context.broker_config('arg1', 'arg2', arg3='arg3'),
+ # All attribute access of cloudify_agent returns none
+ 'cloudify_agent': {'any': bootstrap_context.cloudify_agent.any},
+ 'resources_prefix': bootstrap_context.resources_prefix
+ },
+ 'agent': {
+ 'init_script': adapter.agent.init_script('arg1', 'arg2', arg3='arg3')
+ }
+ })
+
+
+@contextlib.contextmanager
+def _adapter(ctx):
+ out = {}
+ adapter = context_adapter.CloudifyContextAdapter(ctx)
+ _test_common(out, ctx, adapter)
+ try:
+ yield adapter, out
+ finally:
+ try:
+ instance = adapter.instance
+ except TaskAbortException:
+ instance = adapter.source.instance
+ instance.runtime_properties['out'] = out