summaryrefslogtreecommitdiffstats
path: root/azure/aria/aria-extension-cloudify/src/aria/tests/orchestrator/context/test_workflow.py
diff options
context:
space:
mode:
Diffstat (limited to 'azure/aria/aria-extension-cloudify/src/aria/tests/orchestrator/context/test_workflow.py')
-rw-r--r--azure/aria/aria-extension-cloudify/src/aria/tests/orchestrator/context/test_workflow.py126
1 files changed, 126 insertions, 0 deletions
diff --git a/azure/aria/aria-extension-cloudify/src/aria/tests/orchestrator/context/test_workflow.py b/azure/aria/aria-extension-cloudify/src/aria/tests/orchestrator/context/test_workflow.py
new file mode 100644
index 0000000..6d53c2a
--- /dev/null
+++ b/azure/aria/aria-extension-cloudify/src/aria/tests/orchestrator/context/test_workflow.py
@@ -0,0 +1,126 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from datetime import datetime
+
+import pytest
+
+from aria import application_model_storage, workflow
+from aria.orchestrator import context
+from aria.storage import sql_mapi
+from aria.orchestrator.workflows.executor import thread, process
+
+from tests import storage as test_storage, ROOT_DIR
+from ... import mock
+from . import execute
+
+
+class TestWorkflowContext(object):
+
+ def test_execution_creation_on_workflow_context_creation(self, storage):
+ ctx = self._create_ctx(storage)
+ execution = storage.execution.get(ctx.execution.id) # pylint: disable=no-member
+ assert execution.service == storage.service.get_by_name(
+ mock.models.SERVICE_NAME)
+ assert execution.workflow_name == mock.models.WORKFLOW_NAME
+ assert execution.service_template == storage.service_template.get_by_name(
+ mock.models.SERVICE_TEMPLATE_NAME)
+ assert execution.status == storage.execution.model_cls.PENDING
+ assert execution.inputs == {}
+ assert execution.created_at <= datetime.utcnow()
+
+ def test_subsequent_workflow_context_creation_do_not_fail(self, storage):
+ self._create_ctx(storage)
+ self._create_ctx(storage)
+
+ @staticmethod
+ def _create_ctx(storage):
+ """
+
+ :param storage:
+ :return WorkflowContext:
+ """
+ service = storage.service.get_by_name(mock.models.SERVICE_NAME)
+ return context.workflow.WorkflowContext(
+ name='simple_context',
+ model_storage=storage,
+ resource_storage=None,
+ service_id=service,
+ execution_id=storage.execution.list(filters=dict(service=service))[0].id,
+ workflow_name=mock.models.WORKFLOW_NAME,
+ task_max_attempts=mock.models.TASK_MAX_ATTEMPTS,
+ task_retry_interval=mock.models.TASK_RETRY_INTERVAL
+ )
+
+ @pytest.fixture
+ def storage(self):
+ workflow_storage = application_model_storage(
+ sql_mapi.SQLAlchemyModelAPI, initiator=test_storage.init_inmemory_model_storage)
+ workflow_storage.service_template.put(mock.models.create_service_template())
+ service_template = workflow_storage.service_template.get_by_name(
+ mock.models.SERVICE_TEMPLATE_NAME)
+ service = mock.models.create_service(service_template)
+ workflow_storage.service.put(service)
+ workflow_storage.execution.put(mock.models.create_execution(service))
+ yield workflow_storage
+ test_storage.release_sqlite_storage(workflow_storage)
+
+
+@pytest.fixture
+def ctx(tmpdir):
+ context = mock.context.simple(
+ str(tmpdir),
+ context_kwargs=dict(workdir=str(tmpdir.join('workdir')))
+ )
+ yield context
+ test_storage.release_sqlite_storage(context.model)
+
+
+@pytest.fixture(params=[
+ (thread.ThreadExecutor, {}),
+ (process.ProcessExecutor, {'python_path': [ROOT_DIR]}),
+])
+def executor(request):
+ executor_cls, executor_kwargs = request.param
+ result = executor_cls(**executor_kwargs)
+ try:
+ yield result
+ finally:
+ result.close()
+
+
+def test_attribute_consumption(ctx, executor):
+
+ node = ctx.model.node.get_by_name(mock.models.DEPENDENT_NODE_NAME)
+ node.attributes['key'] = ctx.model.attribute.model_cls.wrap('key', 'value')
+ node.attributes['key2'] = ctx.model.attribute.model_cls.wrap('key2', 'value_to_change')
+ ctx.model.node.update(node)
+
+ assert node.attributes['key'].value == 'value'
+ assert node.attributes['key2'].value == 'value_to_change'
+
+ @workflow
+ def basic_workflow(ctx, **_):
+ node = ctx.model.node.get_by_name(mock.models.DEPENDENT_NODE_NAME)
+ node.attributes['new_key'] = 'new_value'
+ node.attributes['key2'] = 'changed_value'
+
+ execute(workflow_func=basic_workflow, workflow_context=ctx, executor=executor)
+ node = ctx.model.node.get_by_name(mock.models.DEPENDENT_NODE_NAME)
+
+ assert len(node.attributes) == 3
+ assert node.attributes['key'].value == 'value'
+ assert node.attributes['new_key'].value == 'new_value'
+ assert node.attributes['key2'].value == 'changed_value'