summaryrefslogtreecommitdiffstats
path: root/azure/aria/aria-extension-cloudify/src/aria/aria/orchestrator/workflows
diff options
context:
space:
mode:
Diffstat (limited to 'azure/aria/aria-extension-cloudify/src/aria/aria/orchestrator/workflows')
-rw-r--r--azure/aria/aria-extension-cloudify/src/aria/aria/orchestrator/workflows/__init__.py21
-rw-r--r--azure/aria/aria-extension-cloudify/src/aria/aria/orchestrator/workflows/api/__init__.py20
-rw-r--r--azure/aria/aria-extension-cloudify/src/aria/aria/orchestrator/workflows/api/task.py272
-rw-r--r--azure/aria/aria-extension-cloudify/src/aria/aria/orchestrator/workflows/api/task_graph.py295
-rw-r--r--azure/aria/aria-extension-cloudify/src/aria/aria/orchestrator/workflows/builtin/__init__.py36
-rw-r--r--azure/aria/aria-extension-cloudify/src/aria/aria/orchestrator/workflows/builtin/execute_operation.py101
-rw-r--r--azure/aria/aria-extension-cloudify/src/aria/aria/orchestrator/workflows/builtin/heal.py179
-rw-r--r--azure/aria/aria-extension-cloudify/src/aria/aria/orchestrator/workflows/builtin/install.py34
-rw-r--r--azure/aria/aria-extension-cloudify/src/aria/aria/orchestrator/workflows/builtin/start.py31
-rw-r--r--azure/aria/aria-extension-cloudify/src/aria/aria/orchestrator/workflows/builtin/stop.py31
-rw-r--r--azure/aria/aria-extension-cloudify/src/aria/aria/orchestrator/workflows/builtin/uninstall.py34
-rw-r--r--azure/aria/aria-extension-cloudify/src/aria/aria/orchestrator/workflows/builtin/workflows.py149
-rw-r--r--azure/aria/aria-extension-cloudify/src/aria/aria/orchestrator/workflows/core/__init__.py20
-rw-r--r--azure/aria/aria-extension-cloudify/src/aria/aria/orchestrator/workflows/core/engine.py185
-rw-r--r--azure/aria/aria-extension-cloudify/src/aria/aria/orchestrator/workflows/core/events_handler.py170
-rw-r--r--azure/aria/aria-extension-cloudify/src/aria/aria/orchestrator/workflows/core/graph_compiler.py118
-rw-r--r--azure/aria/aria-extension-cloudify/src/aria/aria/orchestrator/workflows/events_logging.py85
-rw-r--r--azure/aria/aria-extension-cloudify/src/aria/aria/orchestrator/workflows/exceptions.py91
-rw-r--r--azure/aria/aria-extension-cloudify/src/aria/aria/orchestrator/workflows/executor/__init__.py22
-rw-r--r--azure/aria/aria-extension-cloudify/src/aria/aria/orchestrator/workflows/executor/base.py75
-rw-r--r--azure/aria/aria-extension-cloudify/src/aria/aria/orchestrator/workflows/executor/celery.py97
-rw-r--r--azure/aria/aria-extension-cloudify/src/aria/aria/orchestrator/workflows/executor/dry.py54
-rw-r--r--azure/aria/aria-extension-cloudify/src/aria/aria/orchestrator/workflows/executor/process.py350
-rw-r--r--azure/aria/aria-extension-cloudify/src/aria/aria/orchestrator/workflows/executor/thread.py79
24 files changed, 2549 insertions, 0 deletions
diff --git a/azure/aria/aria-extension-cloudify/src/aria/aria/orchestrator/workflows/__init__.py b/azure/aria/aria-extension-cloudify/src/aria/aria/orchestrator/workflows/__init__.py
new file mode 100644
index 0000000..1f6c368
--- /dev/null
+++ b/azure/aria/aria-extension-cloudify/src/aria/aria/orchestrator/workflows/__init__.py
@@ -0,0 +1,21 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""
+Workflows package.
+"""
+
+# Import required so that logging signals are registered
+from . import events_logging
diff --git a/azure/aria/aria-extension-cloudify/src/aria/aria/orchestrator/workflows/api/__init__.py b/azure/aria/aria-extension-cloudify/src/aria/aria/orchestrator/workflows/api/__init__.py
new file mode 100644
index 0000000..587eee3
--- /dev/null
+++ b/azure/aria/aria-extension-cloudify/src/aria/aria/orchestrator/workflows/api/__init__.py
@@ -0,0 +1,20 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""
+Workflow API.
+"""
+
+from . import task, task_graph
diff --git a/azure/aria/aria-extension-cloudify/src/aria/aria/orchestrator/workflows/api/task.py b/azure/aria/aria-extension-cloudify/src/aria/aria/orchestrator/workflows/api/task.py
new file mode 100644
index 0000000..6ce4a00
--- /dev/null
+++ b/azure/aria/aria-extension-cloudify/src/aria/aria/orchestrator/workflows/api/task.py
@@ -0,0 +1,272 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""
+Provides the tasks to be entered into the task graph
+"""
+
+from ... import context
+from ....modeling import models
+from ....modeling import utils as modeling_utils
+from ....utils.uuid import generate_uuid
+from .. import exceptions
+
+
+class BaseTask(object):
+ """
+ Base class for tasks.
+ """
+
+ def __init__(self, ctx=None, **kwargs):
+ if ctx is not None:
+ self._workflow_context = ctx
+ else:
+ self._workflow_context = context.workflow.current.get()
+ self._id = generate_uuid(variant='uuid')
+
+ @property
+ def id(self):
+ """
+ UUID4 ID.
+ """
+ return self._id
+
+ @property
+ def workflow_context(self):
+ """
+ Context of the current workflow.
+ """
+ return self._workflow_context
+
+
+class OperationTask(BaseTask):
+ """
+ Executes an operation.
+
+ :ivar name: formatted name (includes actor type, actor name, and interface/operation names)
+ :vartype name: basestring
+ :ivar actor: node or relationship
+ :vartype actor: :class:`~aria.modeling.models.Node` or
+ :class:`~aria.modeling.models.Relationship`
+ :ivar interface_name: interface name on actor
+ :vartype interface_name: basestring
+ :ivar operation_name: operation name on interface
+ :vartype operation_name: basestring
+ :ivar plugin: plugin (or None for default plugin)
+ :vartype plugin: :class:`~aria.modeling.models.Plugin`
+ :ivar function: path to Python function
+ :vartype function: basestring
+ :ivar arguments: arguments to send to Python function
+ :vartype arguments: {:obj:`basestring`: :class:`~aria.modeling.models.Argument`}
+ :ivar ignore_failure: whether to ignore failures
+ :vartype ignore_failure: bool
+ :ivar max_attempts: maximum number of attempts allowed in case of failure
+ :vartype max_attempts: int
+ :ivar retry_interval: interval between retries (in seconds)
+ :vartype retry_interval: float
+ """
+
+ NAME_FORMAT = '{interface}:{operation}@{type}:{name}'
+
+ def __init__(self,
+ actor,
+ interface_name,
+ operation_name,
+ arguments=None,
+ ignore_failure=None,
+ max_attempts=None,
+ retry_interval=None):
+ """
+ :param actor: node or relationship
+ :type actor: :class:`~aria.modeling.models.Node` or
+ :class:`~aria.modeling.models.Relationship`
+ :param interface_name: interface name on actor
+ :type interface_name: basestring
+ :param operation_name: operation name on interface
+ :type operation_name: basestring
+ :param arguments: override argument values
+ :type arguments: {:obj:`basestring`: object}
+ :param ignore_failure: override whether to ignore failures
+ :type ignore_failure: bool
+ :param max_attempts: override maximum number of attempts allowed in case of failure
+ :type max_attempts: int
+ :param retry_interval: override interval between retries (in seconds)
+ :type retry_interval: float
+ :raises ~aria.orchestrator.workflows.exceptions.OperationNotFoundException: if
+ ``interface_name`` and ``operation_name`` do not refer to an operation on the actor
+ """
+
+ # Creating OperationTask directly should raise an error when there is no
+ # interface/operation.
+ if not has_operation(actor, interface_name, operation_name):
+ raise exceptions.OperationNotFoundException(
+ 'Could not find operation "{operation_name}" on interface '
+ '"{interface_name}" for {actor_type} "{actor.name}"'.format(
+ operation_name=operation_name,
+ interface_name=interface_name,
+ actor_type=type(actor).__name__.lower(),
+ actor=actor)
+ )
+
+ super(OperationTask, self).__init__()
+
+ self.name = OperationTask.NAME_FORMAT.format(type=type(actor).__name__.lower(),
+ name=actor.name,
+ interface=interface_name,
+ operation=operation_name)
+ self.actor = actor
+ self.interface_name = interface_name
+ self.operation_name = operation_name
+ self.ignore_failure = \
+ self.workflow_context._task_ignore_failure if ignore_failure is None else ignore_failure
+ self.max_attempts = max_attempts or self.workflow_context._task_max_attempts
+ self.retry_interval = retry_interval or self.workflow_context._task_retry_interval
+
+ operation = self.actor.interfaces[self.interface_name].operations[self.operation_name]
+ self.plugin = operation.plugin
+ self.function = operation.function
+ self.arguments = modeling_utils.merge_parameter_values(arguments, operation.arguments)
+
+ actor = self.actor
+ if hasattr(actor, '_wrapped'):
+ # Unwrap instrumented model
+ actor = actor._wrapped
+
+ if isinstance(actor, models.Node):
+ self._context_cls = context.operation.NodeOperationContext
+ elif isinstance(actor, models.Relationship):
+ self._context_cls = context.operation.RelationshipOperationContext
+ else:
+ raise exceptions.TaskCreationException('Could not create valid context for '
+ '{actor.__class__}'.format(actor=actor))
+
+ def __repr__(self):
+ return self.name
+
+
+class StubTask(BaseTask):
+ """
+ Enables creating empty tasks.
+ """
+
+
+class WorkflowTask(BaseTask):
+ """
+ Executes a complete workflow.
+ """
+
+ def __init__(self, workflow_func, **kwargs):
+ """
+ :param workflow_func: function to run
+ :param kwargs: kwargs that would be passed to the workflow_func
+ """
+ super(WorkflowTask, self).__init__(**kwargs)
+ kwargs['ctx'] = self.workflow_context
+ self._graph = workflow_func(**kwargs)
+
+ @property
+ def graph(self):
+ """
+ Graph constructed by the sub workflow.
+ """
+ return self._graph
+
+ def __getattr__(self, item):
+ try:
+ return getattr(self._graph, item)
+ except AttributeError:
+ return super(WorkflowTask, self).__getattribute__(item)
+
+
+def create_task(actor, interface_name, operation_name, **kwargs):
+ """
+ Helper function that enables safe creation of :class:`OperationTask`. If the supplied interface
+ or operation do not exist, ``None`` is returned.
+
+ :param actor: actor for this task
+ :param interface_name: name of the interface
+ :param operation_name: name of the operation
+ :param kwargs: any additional kwargs to be passed to the OperationTask
+ :return: OperationTask or None (if the interface/operation does not exists)
+ """
+ try:
+ return OperationTask(
+ actor,
+ interface_name=interface_name,
+ operation_name=operation_name,
+ **kwargs
+ )
+ except exceptions.OperationNotFoundException:
+ return None
+
+
+def create_relationships_tasks(
+ node, interface_name, source_operation_name=None, target_operation_name=None, **kwargs):
+ """
+ Creates a relationship task (source and target) for all of a node relationships.
+
+ :param basestring source_operation_name: relationship operation name
+ :param basestring interface_name: name of the interface
+ :param source_operation_name:
+ :param target_operation_name:
+ :param node: source node
+ """
+ sub_tasks = []
+ for relationship in node.outbound_relationships:
+ relationship_operations = create_relationship_tasks(
+ relationship,
+ interface_name,
+ source_operation_name=source_operation_name,
+ target_operation_name=target_operation_name,
+ **kwargs)
+ sub_tasks.append(relationship_operations)
+ return sub_tasks
+
+
+def create_relationship_tasks(relationship, interface_name, source_operation_name=None,
+ target_operation_name=None, **kwargs):
+ """
+ Creates a relationship task (source and target).
+
+ :param relationship: relationship instance itself
+ :param source_operation_name:
+ :param target_operation_name:
+ """
+ operations = []
+ if source_operation_name:
+ operations.append(
+ create_task(
+ relationship,
+ interface_name=interface_name,
+ operation_name=source_operation_name,
+ **kwargs
+ )
+ )
+ if target_operation_name:
+ operations.append(
+ create_task(
+ relationship,
+ interface_name=interface_name,
+ operation_name=target_operation_name,
+ **kwargs
+ )
+ )
+
+ return [o for o in operations if o]
+
+
+def has_operation(actor, interface_name, operation_name):
+ interface = actor.interfaces.get(interface_name, None)
+ return interface and interface.operations.get(operation_name, False)
diff --git a/azure/aria/aria-extension-cloudify/src/aria/aria/orchestrator/workflows/api/task_graph.py b/azure/aria/aria-extension-cloudify/src/aria/aria/orchestrator/workflows/api/task_graph.py
new file mode 100644
index 0000000..900a0d1
--- /dev/null
+++ b/azure/aria/aria-extension-cloudify/src/aria/aria/orchestrator/workflows/api/task_graph.py
@@ -0,0 +1,295 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""
+Task graph.
+"""
+
+from collections import Iterable
+
+from networkx import DiGraph, topological_sort
+
+from ....utils.uuid import generate_uuid
+from . import task as api_task
+
+
+class TaskNotInGraphError(Exception):
+ """
+ An error representing a scenario where a given task is not in the graph as expected.
+ """
+ pass
+
+
+def _filter_out_empty_tasks(func=None):
+ if func is None:
+ return lambda f: _filter_out_empty_tasks(func=f)
+
+ def _wrapper(task, *tasks, **kwargs):
+ return func(*(t for t in (task,) + tuple(tasks) if t), **kwargs)
+ return _wrapper
+
+
+class TaskGraph(object):
+ """
+ Task graph builder.
+ """
+
+ def __init__(self, name):
+ self.name = name
+ self._id = generate_uuid(variant='uuid')
+ self._graph = DiGraph()
+
+ def __repr__(self):
+ return '{name}(id={self._id}, name={self.name}, graph={self._graph!r})'.format(
+ name=self.__class__.__name__, self=self)
+
+ @property
+ def id(self):
+ """
+ ID of the graph
+ """
+ return self._id
+
+ # graph traversal methods
+
+ @property
+ def tasks(self):
+ """
+ Iterator over tasks in the graph.
+ """
+ for _, data in self._graph.nodes_iter(data=True):
+ yield data['task']
+
+ def topological_order(self, reverse=False):
+ """
+ Topological sort of the graph.
+
+ :param reverse: whether to reverse the sort
+ :return: list which represents the topological sort
+ """
+ for task_id in topological_sort(self._graph, reverse=reverse):
+ yield self.get_task(task_id)
+
+ def get_dependencies(self, dependent_task):
+ """
+ Iterates over the task's dependencies.
+
+ :param dependent_task: task whose dependencies are requested
+ :raises ~aria.orchestrator.workflows.api.task_graph.TaskNotInGraphError: if
+ ``dependent_task`` is not in the graph
+ """
+ if not self.has_tasks(dependent_task):
+ raise TaskNotInGraphError('Task id: {0}'.format(dependent_task.id))
+ for _, dependency_id in self._graph.out_edges_iter(dependent_task.id):
+ yield self.get_task(dependency_id)
+
+ def get_dependents(self, dependency_task):
+ """
+ Iterates over the task's dependents.
+
+ :param dependency_task: task whose dependents are requested
+ :raises ~aria.orchestrator.workflows.api.task_graph.TaskNotInGraphError: if
+ ``dependency_task`` is not in the graph
+ """
+ if not self.has_tasks(dependency_task):
+ raise TaskNotInGraphError('Task id: {0}'.format(dependency_task.id))
+ for dependent_id, _ in self._graph.in_edges_iter(dependency_task.id):
+ yield self.get_task(dependent_id)
+
+ # task methods
+
+ def get_task(self, task_id):
+ """
+ Get a task instance that's been inserted to the graph by the task's ID.
+
+ :param basestring task_id: task ID
+ :raises ~aria.orchestrator.workflows.api.task_graph.TaskNotInGraphError: if no task found in
+ the graph with the given ID
+ """
+ if not self._graph.has_node(task_id):
+ raise TaskNotInGraphError('Task id: {0}'.format(task_id))
+ data = self._graph.node[task_id]
+ return data['task']
+
+ @_filter_out_empty_tasks
+ def add_tasks(self, *tasks):
+ """
+ Adds a task to the graph.
+
+ :param task: task
+ :return: list of added tasks
+ :rtype: list
+ """
+ assert all([isinstance(task, (api_task.BaseTask, Iterable)) for task in tasks])
+ return_tasks = []
+
+ for task in tasks:
+ if isinstance(task, Iterable):
+ return_tasks += self.add_tasks(*task)
+ elif not self.has_tasks(task):
+ self._graph.add_node(task.id, task=task)
+ return_tasks.append(task)
+
+ return return_tasks
+
+ @_filter_out_empty_tasks
+ def remove_tasks(self, *tasks):
+ """
+ Removes the provided task from the graph.
+
+ :param task: task
+ :return: list of removed tasks
+ :rtype: list
+ """
+ return_tasks = []
+
+ for task in tasks:
+ if isinstance(task, Iterable):
+ return_tasks += self.remove_tasks(*task)
+ elif self.has_tasks(task):
+ self._graph.remove_node(task.id)
+ return_tasks.append(task)
+
+ return return_tasks
+
+ @_filter_out_empty_tasks
+ def has_tasks(self, *tasks):
+ """
+ Checks whether a task is in the graph.
+
+ :param task: task
+ :return: ``True`` if all tasks are in the graph, otherwise ``False``
+ :rtype: list
+ """
+ assert all(isinstance(t, (api_task.BaseTask, Iterable)) for t in tasks)
+ return_value = True
+
+ for task in tasks:
+ if isinstance(task, Iterable):
+ return_value &= self.has_tasks(*task)
+ else:
+ return_value &= self._graph.has_node(task.id)
+
+ return return_value
+
+ def add_dependency(self, dependent, dependency):
+ """
+ Adds a dependency for one item (task, sequence or parallel) on another.
+
+ The dependent will only be executed after the dependency terminates. If either of the items
+ is either a sequence or a parallel, multiple dependencies may be added.
+
+ :param dependent: dependent (task, sequence or parallel)
+ :param dependency: dependency (task, sequence or parallel)
+ :return: ``True`` if the dependency between the two hadn't already existed, otherwise
+ ``False``
+ :rtype: bool
+ :raises ~aria.orchestrator.workflows.api.task_graph.TaskNotInGraphError: if either the
+ dependent or dependency are tasks which are not in the graph
+ """
+ if not (self.has_tasks(dependent) and self.has_tasks(dependency)):
+ raise TaskNotInGraphError()
+
+ if self.has_dependency(dependent, dependency):
+ return
+
+ if isinstance(dependent, Iterable):
+ for dependent_task in dependent:
+ self.add_dependency(dependent_task, dependency)
+ else:
+ if isinstance(dependency, Iterable):
+ for dependency_task in dependency:
+ self.add_dependency(dependent, dependency_task)
+ else:
+ self._graph.add_edge(dependent.id, dependency.id)
+
+ def has_dependency(self, dependent, dependency):
+ """
+ Checks whether one item (task, sequence or parallel) depends on another.
+
+ Note that if either of the items is either a sequence or a parallel, and some of the
+ dependencies exist in the graph but not all of them, this method will return ``False``.
+
+ :param dependent: dependent (task, sequence or parallel)
+ :param dependency: dependency (task, sequence or parallel)
+ :return: ``True`` if the dependency between the two exists, otherwise ``False``
+ :rtype: bool
+ :raises ~aria.orchestrator.workflows.api.task_graph.TaskNotInGraphError: if either the
+ dependent or dependency are tasks which are not in the graph
+ """
+ if not (dependent and dependency):
+ return False
+ elif not (self.has_tasks(dependent) and self.has_tasks(dependency)):
+ raise TaskNotInGraphError()
+
+ return_value = True
+
+ if isinstance(dependent, Iterable):
+ for dependent_task in dependent:
+ return_value &= self.has_dependency(dependent_task, dependency)
+ else:
+ if isinstance(dependency, Iterable):
+ for dependency_task in dependency:
+ return_value &= self.has_dependency(dependent, dependency_task)
+ else:
+ return_value &= self._graph.has_edge(dependent.id, dependency.id)
+
+ return return_value
+
+ def remove_dependency(self, dependent, dependency):
+ """
+ Removes a dependency for one item (task, sequence or parallel) on another.
+
+ Note that if either of the items is either a sequence or a parallel, and some of the
+ dependencies exist in the graph but not all of them, this method will not remove any of the
+ dependencies and return ``False``.
+
+ :param dependent: dependent (task, sequence or parallel)
+ :param dependency: dependency (task, sequence or parallel)
+ :return: ``False`` if the dependency between the two hadn't existed, otherwise ``True``
+ :rtype: bool
+ :raises ~aria.orchestrator.workflows.api.task_graph.TaskNotInGraphError: if either the
+ dependent or dependency are tasks which are not in the graph
+ """
+ if not (self.has_tasks(dependent) and self.has_tasks(dependency)):
+ raise TaskNotInGraphError()
+
+ if not self.has_dependency(dependent, dependency):
+ return
+
+ if isinstance(dependent, Iterable):
+ for dependent_task in dependent:
+ self.remove_dependency(dependent_task, dependency)
+ elif isinstance(dependency, Iterable):
+ for dependency_task in dependency:
+ self.remove_dependency(dependent, dependency_task)
+ else:
+ self._graph.remove_edge(dependent.id, dependency.id)
+
+ @_filter_out_empty_tasks
+ def sequence(self, *tasks):
+ """
+ Creates and inserts a sequence into the graph, effectively each task i depends on i-1.
+
+ :param tasks: iterable of dependencies
+ :return: provided tasks
+ """
+ if tasks:
+ self.add_tasks(*tasks)
+
+ for i in xrange(1, len(tasks)):
+ self.add_dependency(tasks[i], tasks[i-1])
+
+ return tasks
diff --git a/azure/aria/aria-extension-cloudify/src/aria/aria/orchestrator/workflows/builtin/__init__.py b/azure/aria/aria-extension-cloudify/src/aria/aria/orchestrator/workflows/builtin/__init__.py
new file mode 100644
index 0000000..1b2f390
--- /dev/null
+++ b/azure/aria/aria-extension-cloudify/src/aria/aria/orchestrator/workflows/builtin/__init__.py
@@ -0,0 +1,36 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""
+Built-in workflows.
+"""
+
+from .install import install
+from .uninstall import uninstall
+from .start import start
+from .stop import stop
+
+
+BUILTIN_WORKFLOWS = ('install', 'uninstall', 'start', 'stop')
+BUILTIN_WORKFLOWS_PATH_PREFIX = 'aria.orchestrator.workflows.builtin'
+
+
+__all__ = [
+ 'BUILTIN_WORKFLOWS',
+ 'install',
+ 'uninstall',
+ 'start',
+ 'stop'
+]
diff --git a/azure/aria/aria-extension-cloudify/src/aria/aria/orchestrator/workflows/builtin/execute_operation.py b/azure/aria/aria-extension-cloudify/src/aria/aria/orchestrator/workflows/builtin/execute_operation.py
new file mode 100644
index 0000000..949f864
--- /dev/null
+++ b/azure/aria/aria-extension-cloudify/src/aria/aria/orchestrator/workflows/builtin/execute_operation.py
@@ -0,0 +1,101 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""
+Built-in operation execution Workflow.
+"""
+
+from ... import workflow
+from ..api import task
+
+
+@workflow
+def execute_operation(
+ ctx,
+ graph,
+ interface_name,
+ operation_name,
+ operation_kwargs,
+ run_by_dependency_order,
+ type_names,
+ node_template_ids,
+ node_ids,
+ **kwargs):
+ """
+ Built-in operation execution Workflow.
+
+ :param workflow_context: workflow context
+ :param graph: graph which will describe the workflow
+ :param operation: operation name to execute
+ :param operation_kwargs:
+ :param run_by_dependency_order:
+ :param type_names:
+ :param node_template_ids:
+ :param node_ids:
+ :param kwargs:
+ :return:
+ """
+ subgraphs = {}
+ # filtering node instances
+ filtered_nodes = list(_filter_nodes(
+ context=ctx,
+ node_template_ids=node_template_ids,
+ node_ids=node_ids,
+ type_names=type_names))
+
+ if run_by_dependency_order:
+ filtered_node_ids = set(node_instance.id for node_instance in filtered_nodes)
+ for node in ctx.nodes:
+ if node.id not in filtered_node_ids:
+ subgraphs[node.id] = ctx.task_graph(
+ name='execute_operation_stub_{0}'.format(node.id))
+
+ # registering actual tasks to sequences
+ for node in filtered_nodes:
+ graph.add_tasks(
+ task.OperationTask(
+ node,
+ interface_name=interface_name,
+ operation_name=operation_name,
+ arguments=operation_kwargs
+ )
+ )
+
+ for _, node_sub_workflow in subgraphs.items():
+ graph.add_tasks(node_sub_workflow)
+
+ # adding tasks dependencies if required
+ if run_by_dependency_order:
+ for node in ctx.nodes:
+ for relationship in node.relationships:
+ graph.add_dependency(
+ source_task=subgraphs[node.id], after=[subgraphs[relationship.target_id]])
+
+
+def _filter_nodes(context, node_template_ids=(), node_ids=(), type_names=()):
+ def _is_node_template_by_id(node_template_id):
+ return not node_template_ids or node_template_id in node_template_ids
+
+ def _is_node_by_id(node_id):
+ return not node_ids or node_id in node_ids
+
+ def _is_node_by_type(node_type):
+ return not node_type.name in type_names
+
+ for node in context.nodes:
+ if all((_is_node_template_by_id(node.node_template.id),
+ _is_node_by_id(node.id),
+ _is_node_by_type(node.node_template.type))):
+ yield node
diff --git a/azure/aria/aria-extension-cloudify/src/aria/aria/orchestrator/workflows/builtin/heal.py b/azure/aria/aria-extension-cloudify/src/aria/aria/orchestrator/workflows/builtin/heal.py
new file mode 100644
index 0000000..07e27b1
--- /dev/null
+++ b/azure/aria/aria-extension-cloudify/src/aria/aria/orchestrator/workflows/builtin/heal.py
@@ -0,0 +1,179 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+# pylint: skip-file
+
+"""
+Built-in heal workflow.
+"""
+
+from aria import workflow
+
+from .workflows import (install_node, uninstall_node)
+from ..api import task
+
+
+@workflow
+def heal(ctx, graph, node_id):
+ """
+ Built-in heal workflow..
+
+ :param ctx: workflow context
+ :param graph: graph which will describe the workflow.
+ :param node_id: ID of the node to heal
+ :return:
+ """
+ failing_node = ctx.model.node.get(node_id)
+ host_node = ctx.model.node.get(failing_node.host.id)
+ failed_node_subgraph = _get_contained_subgraph(ctx, host_node)
+ failed_node_ids = list(n.id for n in failed_node_subgraph)
+
+ targeted_nodes = [node for node in ctx.nodes
+ if node.id not in failed_node_ids]
+
+ uninstall_subgraph = task.WorkflowTask(
+ heal_uninstall,
+ failing_nodes=failed_node_subgraph,
+ targeted_nodes=targeted_nodes
+ )
+
+ install_subgraph = task.WorkflowTask(
+ heal_install,
+ failing_nodes=failed_node_subgraph,
+ targeted_nodes=targeted_nodes)
+
+ graph.sequence(uninstall_subgraph, install_subgraph)
+
+
+@workflow(suffix_template='{failing_nodes}')
+def heal_uninstall(ctx, graph, failing_nodes, targeted_nodes):
+ """
+ Uninstall phase of the heal mechanism.
+
+ :param ctx: workflow context
+ :param graph: task graph to edit
+ :param failing_nodes: failing nodes to heal
+ :param targeted_nodes: targets of the relationships where the failing node are
+ """
+ node_sub_workflows = {}
+
+ # Create install stub workflow for each unaffected node
+ for node in targeted_nodes:
+ node_stub = task.StubTask()
+ node_sub_workflows[node.id] = node_stub
+ graph.add_tasks(node_stub)
+
+ # create install sub workflow for every node
+ for node in failing_nodes:
+ node_sub_workflow = task.WorkflowTask(uninstall_node,
+ node=node)
+ node_sub_workflows[node.id] = node_sub_workflow
+ graph.add_tasks(node_sub_workflow)
+
+ # create dependencies between the node sub workflow
+ for node in failing_nodes:
+ node_sub_workflow = node_sub_workflows[node.id]
+ for relationship in reversed(node.outbound_relationships):
+ graph.add_dependency(
+ node_sub_workflows[relationship.target_node.id],
+ node_sub_workflow)
+
+ # Add operations for intact nodes depending on a node belonging to nodes
+ for node in targeted_nodes:
+ node_sub_workflow = node_sub_workflows[node.id]
+
+ for relationship in reversed(node.outbound_relationships):
+
+ target_node = \
+ ctx.model.node.get(relationship.target_node.id)
+ target_node_subgraph = node_sub_workflows[target_node.id]
+ graph.add_dependency(target_node_subgraph, node_sub_workflow)
+
+ if target_node in failing_nodes:
+ dependency = task.create_relationship_tasks(
+ relationship=relationship,
+ operation_name='aria.interfaces.relationship_lifecycle.unlink')
+ graph.add_tasks(*dependency)
+ graph.add_dependency(node_sub_workflow, dependency)
+
+
+@workflow(suffix_template='{failing_nodes}')
+def heal_install(ctx, graph, failing_nodes, targeted_nodes):
+ """
+ Install phase of the heal mechanism.
+
+ :param ctx: workflow context
+ :param graph: task graph to edit.
+ :param failing_nodes: failing nodes to heal
+ :param targeted_nodes: targets of the relationships where the failing node are
+ """
+ node_sub_workflows = {}
+
+ # Create install sub workflow for each unaffected
+ for node in targeted_nodes:
+ node_stub = task.StubTask()
+ node_sub_workflows[node.id] = node_stub
+ graph.add_tasks(node_stub)
+
+ # create install sub workflow for every node
+ for node in failing_nodes:
+ node_sub_workflow = task.WorkflowTask(install_node,
+ node=node)
+ node_sub_workflows[node.id] = node_sub_workflow
+ graph.add_tasks(node_sub_workflow)
+
+ # create dependencies between the node sub workflow
+ for node in failing_nodes:
+ node_sub_workflow = node_sub_workflows[node.id]
+ if node.outbound_relationships:
+ dependencies = \
+ [node_sub_workflows[relationship.target_node.id]
+ for relationship in node.outbound_relationships]
+ graph.add_dependency(node_sub_workflow, dependencies)
+
+ # Add operations for intact nodes depending on a node
+ # belonging to nodes
+ for node in targeted_nodes:
+ node_sub_workflow = node_sub_workflows[node.id]
+
+ for relationship in node.outbound_relationships:
+ target_node = ctx.model.node.get(
+ relationship.target_node.id)
+ target_node_subworkflow = node_sub_workflows[target_node.id]
+ graph.add_dependency(node_sub_workflow, target_node_subworkflow)
+
+ if target_node in failing_nodes:
+ dependent = task.create_relationship_tasks(
+ relationship=relationship,
+ operation_name='aria.interfaces.relationship_lifecycle.establish')
+ graph.add_tasks(*dependent)
+ graph.add_dependency(dependent, node_sub_workflow)
+
+
+def _get_contained_subgraph(context, host_node):
+ contained_instances = [node
+ for node in context.nodes
+ if node.host_fk == host_node.id and
+ node.host_fk != node.id]
+ result = [host_node]
+
+ if not contained_instances:
+ return result
+
+ result.extend(contained_instances)
+ for node in contained_instances:
+ result.extend(_get_contained_subgraph(context, node))
+
+ return set(result)
diff --git a/azure/aria/aria-extension-cloudify/src/aria/aria/orchestrator/workflows/builtin/install.py b/azure/aria/aria-extension-cloudify/src/aria/aria/orchestrator/workflows/builtin/install.py
new file mode 100644
index 0000000..1e7c531
--- /dev/null
+++ b/azure/aria/aria-extension-cloudify/src/aria/aria/orchestrator/workflows/builtin/install.py
@@ -0,0 +1,34 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""
+Built-in install workflow.
+"""
+
+from ... import workflow
+from ..api import task as api_task
+from . import workflows
+
+
+@workflow
+def install(ctx, graph):
+ """
+ Built-in install workflow.
+ """
+ tasks_and_nodes = []
+ for node in ctx.nodes:
+ tasks_and_nodes.append((api_task.WorkflowTask(workflows.install_node, node=node), node))
+ graph.add_tasks([task for task, _ in tasks_and_nodes])
+ workflows.create_node_task_dependencies(graph, tasks_and_nodes)
diff --git a/azure/aria/aria-extension-cloudify/src/aria/aria/orchestrator/workflows/builtin/start.py b/azure/aria/aria-extension-cloudify/src/aria/aria/orchestrator/workflows/builtin/start.py
new file mode 100644
index 0000000..c02a26d
--- /dev/null
+++ b/azure/aria/aria-extension-cloudify/src/aria/aria/orchestrator/workflows/builtin/start.py
@@ -0,0 +1,31 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""
+Built-in start workflow.
+"""
+
+from .workflows import start_node
+from ... import workflow
+from ..api import task as api_task
+
+
+@workflow
+def start(ctx, graph):
+ """
+ Built-in start workflow.
+ """
+ for node in ctx.model.node.iter():
+ graph.add_tasks(api_task.WorkflowTask(start_node, node=node))
diff --git a/azure/aria/aria-extension-cloudify/src/aria/aria/orchestrator/workflows/builtin/stop.py b/azure/aria/aria-extension-cloudify/src/aria/aria/orchestrator/workflows/builtin/stop.py
new file mode 100644
index 0000000..6f9930b
--- /dev/null
+++ b/azure/aria/aria-extension-cloudify/src/aria/aria/orchestrator/workflows/builtin/stop.py
@@ -0,0 +1,31 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""
+Built-in stop workflow.
+"""
+
+from .workflows import stop_node
+from ..api import task as api_task
+from ... import workflow
+
+
+@workflow
+def stop(ctx, graph):
+ """
+ Built-in stop workflow.
+ """
+ for node in ctx.model.node.iter():
+ graph.add_tasks(api_task.WorkflowTask(stop_node, node=node))
diff --git a/azure/aria/aria-extension-cloudify/src/aria/aria/orchestrator/workflows/builtin/uninstall.py b/azure/aria/aria-extension-cloudify/src/aria/aria/orchestrator/workflows/builtin/uninstall.py
new file mode 100644
index 0000000..7925f4b
--- /dev/null
+++ b/azure/aria/aria-extension-cloudify/src/aria/aria/orchestrator/workflows/builtin/uninstall.py
@@ -0,0 +1,34 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""
+Built-in uninstall workflow.
+"""
+
+from ... import workflow
+from ..api import task as api_task
+from . import workflows
+
+
+@workflow
+def uninstall(ctx, graph):
+ """
+ Built-in uninstall workflow.
+ """
+ tasks_and_nodes = []
+ for node in ctx.nodes:
+ tasks_and_nodes.append((api_task.WorkflowTask(workflows.uninstall_node, node=node), node))
+ graph.add_tasks([task for task, _ in tasks_and_nodes])
+ workflows.create_node_task_dependencies(graph, tasks_and_nodes, reverse=True)
diff --git a/azure/aria/aria-extension-cloudify/src/aria/aria/orchestrator/workflows/builtin/workflows.py b/azure/aria/aria-extension-cloudify/src/aria/aria/orchestrator/workflows/builtin/workflows.py
new file mode 100644
index 0000000..b286e98
--- /dev/null
+++ b/azure/aria/aria-extension-cloudify/src/aria/aria/orchestrator/workflows/builtin/workflows.py
@@ -0,0 +1,149 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""
+TSOCA normative lifecycle workflows.
+"""
+
+from ... import workflow
+from ..api import task
+
+
+NORMATIVE_STANDARD_INTERFACE = 'Standard' # 'tosca.interfaces.node.lifecycle.Standard'
+NORMATIVE_CONFIGURE_INTERFACE = 'Configure' # 'tosca.interfaces.relationship.Configure'
+
+NORMATIVE_CREATE = 'create'
+NORMATIVE_CONFIGURE = 'configure'
+NORMATIVE_START = 'start'
+NORMATIVE_STOP = 'stop'
+NORMATIVE_DELETE = 'delete'
+
+NORMATIVE_PRE_CONFIGURE_SOURCE = 'pre_configure_source'
+NORMATIVE_PRE_CONFIGURE_TARGET = 'pre_configure_target'
+NORMATIVE_POST_CONFIGURE_SOURCE = 'post_configure_source'
+NORMATIVE_POST_CONFIGURE_TARGET = 'post_configure_target'
+
+NORMATIVE_ADD_SOURCE = 'add_source'
+NORMATIVE_ADD_TARGET = 'add_target'
+NORMATIVE_REMOVE_TARGET = 'remove_target'
+NORMATIVE_REMOVE_SOURCE = 'remove_source'
+NORMATIVE_TARGET_CHANGED = 'target_changed'
+
+
+__all__ = (
+ 'NORMATIVE_STANDARD_INTERFACE',
+ 'NORMATIVE_CONFIGURE_INTERFACE',
+ 'NORMATIVE_CREATE',
+ 'NORMATIVE_START',
+ 'NORMATIVE_STOP',
+ 'NORMATIVE_DELETE',
+ 'NORMATIVE_CONFIGURE',
+ 'NORMATIVE_PRE_CONFIGURE_SOURCE',
+ 'NORMATIVE_PRE_CONFIGURE_TARGET',
+ 'NORMATIVE_POST_CONFIGURE_SOURCE',
+ 'NORMATIVE_POST_CONFIGURE_TARGET',
+ 'NORMATIVE_ADD_SOURCE',
+ 'NORMATIVE_ADD_TARGET',
+ 'NORMATIVE_REMOVE_SOURCE',
+ 'NORMATIVE_REMOVE_TARGET',
+ 'NORMATIVE_TARGET_CHANGED',
+ 'install_node',
+ 'uninstall_node',
+ 'start_node',
+ 'stop_node',
+)
+
+
+@workflow(suffix_template='{node.name}')
+def install_node(graph, node, **kwargs):
+ # Create
+ sequence = [task.create_task(node, NORMATIVE_STANDARD_INTERFACE, NORMATIVE_CREATE)]
+
+ # Configure
+ sequence += task.create_relationships_tasks(node,
+ NORMATIVE_CONFIGURE_INTERFACE,
+ NORMATIVE_PRE_CONFIGURE_SOURCE,
+ NORMATIVE_PRE_CONFIGURE_TARGET)
+ sequence.append(task.create_task(node, NORMATIVE_STANDARD_INTERFACE, NORMATIVE_CONFIGURE))
+ sequence += task.create_relationships_tasks(node,
+ NORMATIVE_CONFIGURE_INTERFACE,
+ NORMATIVE_POST_CONFIGURE_SOURCE,
+ NORMATIVE_POST_CONFIGURE_TARGET)
+ # Start
+ sequence += _create_start_tasks(node)
+
+ graph.sequence(*sequence)
+
+
+@workflow(suffix_template='{node.name}')
+def uninstall_node(graph, node, **kwargs):
+ # Stop
+ sequence = _create_stop_tasks(node)
+
+ # Delete
+ sequence.append(task.create_task(node, NORMATIVE_STANDARD_INTERFACE, NORMATIVE_DELETE))
+
+ graph.sequence(*sequence)
+
+
+@workflow(suffix_template='{node.name}')
+def start_node(graph, node, **kwargs):
+ graph.sequence(*_create_start_tasks(node))
+
+
+@workflow(suffix_template='{node.name}')
+def stop_node(graph, node, **kwargs):
+ graph.sequence(*_create_stop_tasks(node))
+
+
+def _create_start_tasks(node):
+ sequence = [task.create_task(node, NORMATIVE_STANDARD_INTERFACE, NORMATIVE_START)]
+ sequence += task.create_relationships_tasks(node,
+ NORMATIVE_CONFIGURE_INTERFACE,
+ NORMATIVE_ADD_SOURCE, NORMATIVE_ADD_TARGET)
+ return sequence
+
+
+def _create_stop_tasks(node):
+ sequence = [task.create_task(node, NORMATIVE_STANDARD_INTERFACE, NORMATIVE_STOP)]
+ sequence += task.create_relationships_tasks(node,
+ NORMATIVE_CONFIGURE_INTERFACE,
+ NORMATIVE_REMOVE_SOURCE, NORMATIVE_REMOVE_TARGET)
+ return sequence
+
+
+def create_node_task_dependencies(graph, tasks_and_nodes, reverse=False):
+ """
+ Creates dependencies between tasks if there is a relationship (outbound) between their nodes.
+ """
+
+ def get_task(node_name):
+ for api_task, task_node in tasks_and_nodes:
+ if task_node.name == node_name:
+ return api_task
+ return None
+
+ for api_task, node in tasks_and_nodes:
+ dependencies = []
+ for relationship in node.outbound_relationships:
+ dependency = get_task(relationship.target_node.name)
+ if dependency:
+ dependencies.append(dependency)
+ if dependencies:
+ if reverse:
+ for dependency in dependencies:
+ graph.add_dependency(dependency, api_task)
+ else:
+ graph.add_dependency(api_task, dependencies)
diff --git a/azure/aria/aria-extension-cloudify/src/aria/aria/orchestrator/workflows/core/__init__.py b/azure/aria/aria-extension-cloudify/src/aria/aria/orchestrator/workflows/core/__init__.py
new file mode 100644
index 0000000..3f28136
--- /dev/null
+++ b/azure/aria/aria-extension-cloudify/src/aria/aria/orchestrator/workflows/core/__init__.py
@@ -0,0 +1,20 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""
+Workflow core.
+"""
+
+from . import engine
diff --git a/azure/aria/aria-extension-cloudify/src/aria/aria/orchestrator/workflows/core/engine.py b/azure/aria/aria-extension-cloudify/src/aria/aria/orchestrator/workflows/core/engine.py
new file mode 100644
index 0000000..0ec3cd8
--- /dev/null
+++ b/azure/aria/aria-extension-cloudify/src/aria/aria/orchestrator/workflows/core/engine.py
@@ -0,0 +1,185 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""
+Workflow execution.
+"""
+
+import time
+from datetime import datetime
+
+from aria import logger
+from aria.modeling import models
+from aria.orchestrator import events
+from aria.orchestrator.context import operation
+
+from .. import exceptions
+from ..executor.base import StubTaskExecutor
+# Import required so all signals are registered
+from . import events_handler # pylint: disable=unused-import
+
+
+class Engine(logger.LoggerMixin):
+ """
+ Executes workflows.
+ """
+
+ def __init__(self, executors, **kwargs):
+ super(Engine, self).__init__(**kwargs)
+ self._executors = executors.copy()
+ self._executors.setdefault(StubTaskExecutor, StubTaskExecutor())
+
+ def execute(self, ctx, resuming=False, retry_failed=False):
+ """
+ Executes the workflow.
+ """
+ if resuming:
+ events.on_resume_workflow_signal.send(ctx, retry_failed=retry_failed)
+
+ tasks_tracker = _TasksTracker(ctx)
+
+ try:
+ events.start_workflow_signal.send(ctx)
+ while True:
+ cancel = self._is_cancel(ctx)
+ if cancel:
+ break
+ for task in tasks_tracker.ended_tasks:
+ self._handle_ended_tasks(task)
+ tasks_tracker.finished(task)
+ for task in tasks_tracker.executable_tasks:
+ tasks_tracker.executing(task)
+ self._handle_executable_task(ctx, task)
+ if tasks_tracker.all_tasks_consumed:
+ break
+ else:
+ time.sleep(0.1)
+ if cancel:
+ self._terminate_tasks(tasks_tracker.executing_tasks)
+ events.on_cancelled_workflow_signal.send(ctx)
+ else:
+ events.on_success_workflow_signal.send(ctx)
+ except BaseException as e:
+ # Cleanup any remaining tasks
+ self._terminate_tasks(tasks_tracker.executing_tasks)
+ events.on_failure_workflow_signal.send(ctx, exception=e)
+ raise
+
+ def _terminate_tasks(self, tasks):
+ for task in tasks:
+ try:
+ self._executors[task._executor].terminate(task.id)
+ except BaseException:
+ pass
+
+ @staticmethod
+ def cancel_execution(ctx):
+ """
+ Send a cancel request to the engine. If execution already started, execution status
+ will be modified to ``cancelling`` status. If execution is in pending mode, execution status
+ will be modified to ``cancelled`` directly.
+ """
+ events.on_cancelling_workflow_signal.send(ctx)
+
+ @staticmethod
+ def _is_cancel(ctx):
+ execution = ctx.model.execution.refresh(ctx.execution)
+ return execution.status in (models.Execution.CANCELLING, models.Execution.CANCELLED)
+
+ def _handle_executable_task(self, ctx, task):
+ task_executor = self._executors[task._executor]
+
+ # If the task is a stub, a default context is provided, else it should hold the context cls
+ context_cls = operation.BaseOperationContext if task._stub_type else task._context_cls
+ op_ctx = context_cls(
+ model_storage=ctx.model,
+ resource_storage=ctx.resource,
+ workdir=ctx._workdir,
+ task_id=task.id,
+ actor_id=task.actor.id if task.actor else None,
+ service_id=task.execution.service.id,
+ execution_id=task.execution.id,
+ name=task.name
+ )
+
+ if not task._stub_type:
+ events.sent_task_signal.send(op_ctx)
+ task_executor.execute(op_ctx)
+
+ @staticmethod
+ def _handle_ended_tasks(task):
+ if task.status == models.Task.FAILED and not task.ignore_failure:
+ raise exceptions.ExecutorException('Workflow failed')
+
+
+class _TasksTracker(object):
+
+ def __init__(self, ctx):
+ self._ctx = ctx
+
+ self._tasks = ctx.execution.tasks
+ self._executed_tasks = [task for task in self._tasks if task.has_ended()]
+ self._executable_tasks = list(set(self._tasks) - set(self._executed_tasks))
+ self._executing_tasks = []
+
+ @property
+ def all_tasks_consumed(self):
+ return len(self._executed_tasks) == len(self._tasks) and len(self._executing_tasks) == 0
+
+ def executing(self, task):
+ # Task executing could be retrying (thus removed and added earlier)
+ if task not in self._executing_tasks:
+ self._executable_tasks.remove(task)
+ self._executing_tasks.append(task)
+
+ def finished(self, task):
+ self._executing_tasks.remove(task)
+ self._executed_tasks.append(task)
+
+ @property
+ def ended_tasks(self):
+ for task in self.executing_tasks:
+ if task.has_ended():
+ yield task
+
+ @property
+ def executable_tasks(self):
+ now = datetime.utcnow()
+ # we need both lists since retrying task are in the executing task list.
+ for task in self._update_tasks(set(self._executing_tasks + self._executable_tasks)):
+ if all([task.is_waiting(),
+ task.due_at <= now,
+ all(dependency in self._executed_tasks for dependency in task.dependencies)
+ ]):
+ yield task
+
+ @property
+ def executing_tasks(self):
+ for task in self._update_tasks(self._executing_tasks):
+ yield task
+
+ @property
+ def executed_tasks(self):
+ for task in self._update_tasks(self._executed_tasks):
+ yield task
+
+ @property
+ def tasks(self):
+ for task in self._update_tasks(self._tasks):
+ yield task
+
+ def _update_tasks(self, tasks):
+ for task in tasks:
+ yield self._ctx.model.task.refresh(task)
diff --git a/azure/aria/aria-extension-cloudify/src/aria/aria/orchestrator/workflows/core/events_handler.py b/azure/aria/aria-extension-cloudify/src/aria/aria/orchestrator/workflows/core/events_handler.py
new file mode 100644
index 0000000..473475e
--- /dev/null
+++ b/azure/aria/aria-extension-cloudify/src/aria/aria/orchestrator/workflows/core/events_handler.py
@@ -0,0 +1,170 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""
+Workflow event handling.
+"""
+
+from datetime import (
+ datetime,
+ timedelta,
+)
+
+from ... import events
+from ... import exceptions
+
+
+@events.sent_task_signal.connect
+def _task_sent(ctx, *args, **kwargs):
+ with ctx.persist_changes:
+ ctx.task.status = ctx.task.SENT
+
+
+@events.start_task_signal.connect
+def _task_started(ctx, *args, **kwargs):
+ with ctx.persist_changes:
+ ctx.task.started_at = datetime.utcnow()
+ ctx.task.status = ctx.task.STARTED
+ _update_node_state_if_necessary(ctx, is_transitional=True)
+
+
+@events.on_failure_task_signal.connect
+def _task_failed(ctx, exception, *args, **kwargs):
+ with ctx.persist_changes:
+ should_retry = all([
+ not isinstance(exception, exceptions.TaskAbortException),
+ ctx.task.attempts_count < ctx.task.max_attempts or
+ ctx.task.max_attempts == ctx.task.INFINITE_RETRIES,
+ # ignore_failure check here means the task will not be retried and it will be marked
+ # as failed. The engine will also look at ignore_failure so it won't fail the
+ # workflow.
+ not ctx.task.ignore_failure
+ ])
+ if should_retry:
+ retry_interval = None
+ if isinstance(exception, exceptions.TaskRetryException):
+ retry_interval = exception.retry_interval
+ if retry_interval is None:
+ retry_interval = ctx.task.retry_interval
+ ctx.task.status = ctx.task.RETRYING
+ ctx.task.attempts_count += 1
+ ctx.task.due_at = datetime.utcnow() + timedelta(seconds=retry_interval)
+ else:
+ ctx.task.ended_at = datetime.utcnow()
+ ctx.task.status = ctx.task.FAILED
+
+
+@events.on_success_task_signal.connect
+def _task_succeeded(ctx, *args, **kwargs):
+ with ctx.persist_changes:
+ ctx.task.ended_at = datetime.utcnow()
+ ctx.task.status = ctx.task.SUCCESS
+ ctx.task.attempts_count += 1
+
+ _update_node_state_if_necessary(ctx)
+
+
+@events.start_workflow_signal.connect
+def _workflow_started(workflow_context, *args, **kwargs):
+ with workflow_context.persist_changes:
+ execution = workflow_context.execution
+ # the execution may already be in the process of cancelling
+ if execution.status in (execution.CANCELLING, execution.CANCELLED):
+ return
+ execution.status = execution.STARTED
+ execution.started_at = datetime.utcnow()
+
+
+@events.on_failure_workflow_signal.connect
+def _workflow_failed(workflow_context, exception, *args, **kwargs):
+ with workflow_context.persist_changes:
+ execution = workflow_context.execution
+ execution.error = str(exception)
+ execution.status = execution.FAILED
+ execution.ended_at = datetime.utcnow()
+
+
+@events.on_success_workflow_signal.connect
+def _workflow_succeeded(workflow_context, *args, **kwargs):
+ with workflow_context.persist_changes:
+ execution = workflow_context.execution
+ execution.status = execution.SUCCEEDED
+ execution.ended_at = datetime.utcnow()
+
+
+@events.on_cancelled_workflow_signal.connect
+def _workflow_cancelled(workflow_context, *args, **kwargs):
+ with workflow_context.persist_changes:
+ execution = workflow_context.execution
+ # _workflow_cancelling function may have called this function already
+ if execution.status == execution.CANCELLED:
+ return
+ # the execution may have already been finished
+ elif execution.status in (execution.SUCCEEDED, execution.FAILED):
+ _log_tried_to_cancel_execution_but_it_already_ended(workflow_context, execution.status)
+ else:
+ execution.status = execution.CANCELLED
+ execution.ended_at = datetime.utcnow()
+
+
+@events.on_resume_workflow_signal.connect
+def _workflow_resume(workflow_context, retry_failed=False, *args, **kwargs):
+ with workflow_context.persist_changes:
+ execution = workflow_context.execution
+ execution.status = execution.PENDING
+ # Any non ended task would be put back to pending state
+ for task in execution.tasks:
+ if not task.has_ended():
+ task.status = task.PENDING
+
+ if retry_failed:
+ for task in execution.tasks:
+ if task.status == task.FAILED and not task.ignore_failure:
+ task.attempts_count = 0
+ task.status = task.PENDING
+
+
+
+@events.on_cancelling_workflow_signal.connect
+def _workflow_cancelling(workflow_context, *args, **kwargs):
+ with workflow_context.persist_changes:
+ execution = workflow_context.execution
+ if execution.status == execution.PENDING:
+ return _workflow_cancelled(workflow_context=workflow_context)
+ # the execution may have already been finished
+ elif execution.status in (execution.SUCCEEDED, execution.FAILED):
+ _log_tried_to_cancel_execution_but_it_already_ended(workflow_context, execution.status)
+ else:
+ execution.status = execution.CANCELLING
+
+
+def _update_node_state_if_necessary(ctx, is_transitional=False):
+ # TODO: this is not the right way to check! the interface name is arbitrary
+ # and also will *never* be the type name
+ node = ctx.task.node if ctx.task is not None else None
+ if (node is not None) and \
+ (ctx.task.interface_name in ('Standard', 'tosca.interfaces.node.lifecycle.Standard',
+ 'tosca:Standard')):
+ state = node.determine_state(op_name=ctx.task.operation_name,
+ is_transitional=is_transitional)
+ if state:
+ node.state = state
+ ctx.model.node.update(node)
+
+
+def _log_tried_to_cancel_execution_but_it_already_ended(workflow_context, status):
+ workflow_context.logger.info(
+ "'{workflow_name}' workflow execution {status} before the cancel request"
+ "was fully processed".format(workflow_name=workflow_context.workflow_name, status=status))
diff --git a/azure/aria/aria-extension-cloudify/src/aria/aria/orchestrator/workflows/core/graph_compiler.py b/azure/aria/aria-extension-cloudify/src/aria/aria/orchestrator/workflows/core/graph_compiler.py
new file mode 100644
index 0000000..81543d5
--- /dev/null
+++ b/azure/aria/aria-extension-cloudify/src/aria/aria/orchestrator/workflows/core/graph_compiler.py
@@ -0,0 +1,118 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+from ....modeling import models
+from .. import executor, api
+
+
+class GraphCompiler(object):
+ def __init__(self, ctx, default_executor):
+ self._ctx = ctx
+ self._default_executor = default_executor
+ self._stub_executor = executor.base.StubTaskExecutor
+ self._model_to_api_id = {}
+
+ def compile(self,
+ task_graph,
+ start_stub_type=models.Task.START_WORKFLOW,
+ end_stub_type=models.Task.END_WORKFLOW,
+ depends_on=()):
+ """
+ Translates the user graph to the execution graph
+ :param task_graph: The user's graph
+ :param start_stub_type: internal use
+ :param end_stub_type: internal use
+ :param depends_on: internal use
+ """
+ depends_on = list(depends_on)
+
+ # Insert start marker
+ start_task = self._create_stub_task(
+ start_stub_type, depends_on, self._start_graph_suffix(task_graph.id), task_graph.name,
+ )
+
+ for task in task_graph.topological_order(reverse=True):
+ dependencies = \
+ (self._get_tasks_from_dependencies(task_graph.get_dependencies(task))
+ or [start_task])
+
+ if isinstance(task, api.task.OperationTask):
+ self._create_operation_task(task, dependencies)
+
+ elif isinstance(task, api.task.WorkflowTask):
+ # Build the graph recursively while adding start and end markers
+ self.compile(
+ task, models.Task.START_SUBWROFKLOW, models.Task.END_SUBWORKFLOW, dependencies
+ )
+ elif isinstance(task, api.task.StubTask):
+ self._create_stub_task(models.Task.STUB, dependencies, task.id)
+ else:
+ raise RuntimeError('Undefined state')
+
+ # Insert end marker
+ self._create_stub_task(
+ end_stub_type,
+ self._get_non_dependent_tasks(self._ctx.execution) or [start_task],
+ self._end_graph_suffix(task_graph.id),
+ task_graph.name
+ )
+
+ def _create_stub_task(self, stub_type, dependencies, api_id, name=None):
+ model_task = models.Task(
+ name=name,
+ dependencies=dependencies,
+ execution=self._ctx.execution,
+ _executor=self._stub_executor,
+ _stub_type=stub_type)
+ self._ctx.model.task.put(model_task)
+ self._model_to_api_id[model_task.id] = api_id
+ return model_task
+
+ def _create_operation_task(self, api_task, dependencies):
+ model_task = models.Task.from_api_task(
+ api_task, self._default_executor, dependencies=dependencies)
+ self._ctx.model.task.put(model_task)
+ self._model_to_api_id[model_task.id] = api_task.id
+ return model_task
+
+ @staticmethod
+ def _start_graph_suffix(api_id):
+ return '{0}-Start'.format(api_id)
+
+ @staticmethod
+ def _end_graph_suffix(api_id):
+ return '{0}-End'.format(api_id)
+
+ @staticmethod
+ def _get_non_dependent_tasks(execution):
+ tasks_with_dependencies = set()
+ for task in execution.tasks:
+ tasks_with_dependencies.update(task.dependencies)
+ return list(set(execution.tasks) - set(tasks_with_dependencies))
+
+ def _get_tasks_from_dependencies(self, dependencies):
+ """
+ Returns task list from dependencies.
+ """
+ tasks = []
+ for dependency in dependencies:
+ if isinstance(dependency, (api.task.StubTask, api.task.OperationTask)):
+ dependency_name = dependency.id
+ else:
+ dependency_name = self._end_graph_suffix(dependency.id)
+ tasks.extend(task for task in self._ctx.execution.tasks
+ if self._model_to_api_id.get(task.id, None) == dependency_name)
+ return tasks
diff --git a/azure/aria/aria-extension-cloudify/src/aria/aria/orchestrator/workflows/events_logging.py b/azure/aria/aria-extension-cloudify/src/aria/aria/orchestrator/workflows/events_logging.py
new file mode 100644
index 0000000..9eee1e1
--- /dev/null
+++ b/azure/aria/aria-extension-cloudify/src/aria/aria/orchestrator/workflows/events_logging.py
@@ -0,0 +1,85 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+"""
+Workflow event logging.
+"""
+
+from .. import events
+from ... import modeling
+
+
+def _get_task_name(task):
+ if isinstance(task.actor, modeling.model_bases.service_instance.RelationshipBase):
+ return '{source_node.name}->{target_node.name}'.format(
+ source_node=task.actor.source_node, target_node=task.actor.target_node)
+ else:
+ return task.actor.name
+
+
+@events.start_task_signal.connect
+def _start_task_handler(ctx, **kwargs):
+ # If the task has no function this is an empty task.
+ if ctx.task.function:
+ suffix = 'started...'
+ logger = ctx.logger.info
+ else:
+ suffix = 'has no implementation'
+ logger = ctx.logger.debug
+
+ logger('{name} {task.interface_name}.{task.operation_name} {suffix}'.format(
+ name=_get_task_name(ctx.task), task=ctx.task, suffix=suffix))
+
+
+@events.on_success_task_signal.connect
+def _success_task_handler(ctx, **kwargs):
+ if not ctx.task.function:
+ return
+ ctx.logger.info('{name} {task.interface_name}.{task.operation_name} successful'
+ .format(name=_get_task_name(ctx.task), task=ctx.task))
+
+
+@events.on_failure_task_signal.connect
+def _failure_operation_handler(ctx, traceback, **kwargs):
+ ctx.logger.error(
+ '{name} {task.interface_name}.{task.operation_name} failed'
+ .format(name=_get_task_name(ctx.task), task=ctx.task), extra=dict(traceback=traceback)
+ )
+
+
+@events.start_workflow_signal.connect
+def _start_workflow_handler(context, **kwargs):
+ context.logger.info("Starting '{ctx.workflow_name}' workflow execution".format(ctx=context))
+
+
+@events.on_failure_workflow_signal.connect
+def _failure_workflow_handler(context, **kwargs):
+ context.logger.info("'{ctx.workflow_name}' workflow execution failed".format(ctx=context))
+
+
+@events.on_success_workflow_signal.connect
+def _success_workflow_handler(context, **kwargs):
+ context.logger.info("'{ctx.workflow_name}' workflow execution succeeded".format(ctx=context))
+
+
+@events.on_cancelled_workflow_signal.connect
+def _cancel_workflow_handler(context, **kwargs):
+ context.logger.info("'{ctx.workflow_name}' workflow execution canceled".format(ctx=context))
+
+
+@events.on_cancelling_workflow_signal.connect
+def _cancelling_workflow_handler(context, **kwargs):
+ context.logger.info("Cancelling '{ctx.workflow_name}' workflow execution".format(ctx=context))
diff --git a/azure/aria/aria-extension-cloudify/src/aria/aria/orchestrator/workflows/exceptions.py b/azure/aria/aria-extension-cloudify/src/aria/aria/orchestrator/workflows/exceptions.py
new file mode 100644
index 0000000..2a1d6b1
--- /dev/null
+++ b/azure/aria/aria-extension-cloudify/src/aria/aria/orchestrator/workflows/exceptions.py
@@ -0,0 +1,91 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""
+Workflow exceptions.
+"""
+
+import os
+
+from .. import exceptions
+
+
+class ExecutorException(exceptions.AriaError):
+ """
+ General executor exception.
+ """
+ pass
+
+
+class ProcessException(ExecutorException):
+ """
+ Raised when subprocess execution fails.
+ """
+
+ def __init__(self, command, stderr=None, stdout=None, return_code=None):
+ """
+ Process class Exception
+ :param list command: child process command
+ :param str message: custom message
+ :param str stderr: child process stderr
+ :param str stdout: child process stdout
+ :param int return_code: child process exit code
+ """
+ super(ProcessException, self).__init__("child process failed")
+ self.command = command
+ self.stderr = stderr
+ self.stdout = stdout
+ self.return_code = return_code
+
+ @property
+ def explanation(self):
+ """
+ Describes the error in detail
+ """
+ return (
+ 'Command "{error.command}" executed with an error.{0}'
+ 'code: {error.return_code}{0}'
+ 'error: {error.stderr}{0}'
+ 'output: {error.stdout}'.format(os.linesep, error=self))
+
+
+class AriaEngineError(exceptions.AriaError):
+ """
+ Raised by the workflow engine.
+ """
+
+
+class TaskException(exceptions.AriaError):
+ """
+ Raised by the task.
+ """
+
+
+class TaskCreationException(TaskException):
+ """
+ Could not create the task.
+ """
+
+
+class OperationNotFoundException(TaskCreationException):
+ """
+ Could not find an operation on the node or relationship.
+ """
+
+
+class PluginNotFoundException(TaskCreationException):
+ """
+ Could not find a plugin matching the plugin specification.
+ """
diff --git a/azure/aria/aria-extension-cloudify/src/aria/aria/orchestrator/workflows/executor/__init__.py b/azure/aria/aria-extension-cloudify/src/aria/aria/orchestrator/workflows/executor/__init__.py
new file mode 100644
index 0000000..cafab74
--- /dev/null
+++ b/azure/aria/aria-extension-cloudify/src/aria/aria/orchestrator/workflows/executor/__init__.py
@@ -0,0 +1,22 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""
+Task executors.
+"""
+
+
+from . import process, thread
+from .base import BaseExecutor
diff --git a/azure/aria/aria-extension-cloudify/src/aria/aria/orchestrator/workflows/executor/base.py b/azure/aria/aria-extension-cloudify/src/aria/aria/orchestrator/workflows/executor/base.py
new file mode 100644
index 0000000..e7d03ea
--- /dev/null
+++ b/azure/aria/aria-extension-cloudify/src/aria/aria/orchestrator/workflows/executor/base.py
@@ -0,0 +1,75 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""
+Base class for task executors.
+"""
+
+from aria import logger
+from aria.orchestrator import events
+
+
+class BaseExecutor(logger.LoggerMixin):
+ """
+ Base class for task executors.
+ """
+ def _execute(self, ctx):
+ raise NotImplementedError
+
+ def execute(self, ctx):
+ """
+ Executes a task.
+
+ :param task: task to execute
+ """
+ if ctx.task.function:
+ self._execute(ctx)
+ else:
+ # In this case the task is missing a function. This task still gets to an
+ # executor, but since there is nothing to run, we by default simply skip the
+ # execution itself.
+ self._task_started(ctx)
+ self._task_succeeded(ctx)
+
+ def close(self):
+ """
+ Closes the executor.
+ """
+ pass
+
+ def terminate(self, task_id):
+ """
+ Terminate the executing task
+ :return:
+ """
+ pass
+
+ @staticmethod
+ def _task_started(ctx):
+ events.start_task_signal.send(ctx)
+
+ @staticmethod
+ def _task_failed(ctx, exception, traceback=None):
+ events.on_failure_task_signal.send(ctx, exception=exception, traceback=traceback)
+
+ @staticmethod
+ def _task_succeeded(ctx):
+ events.on_success_task_signal.send(ctx)
+
+
+class StubTaskExecutor(BaseExecutor): # pylint: disable=abstract-method
+ def execute(self, ctx, *args, **kwargs):
+ with ctx.persist_changes:
+ ctx.task.status = ctx.task.SUCCESS
diff --git a/azure/aria/aria-extension-cloudify/src/aria/aria/orchestrator/workflows/executor/celery.py b/azure/aria/aria-extension-cloudify/src/aria/aria/orchestrator/workflows/executor/celery.py
new file mode 100644
index 0000000..a2b3513
--- /dev/null
+++ b/azure/aria/aria-extension-cloudify/src/aria/aria/orchestrator/workflows/executor/celery.py
@@ -0,0 +1,97 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""
+Celery task executor.
+"""
+
+import threading
+import Queue
+
+from aria.orchestrator.workflows.executor import BaseExecutor
+
+
+class CeleryExecutor(BaseExecutor):
+ """
+ Celery task executor.
+ """
+
+ def __init__(self, app, *args, **kwargs):
+ super(CeleryExecutor, self).__init__(*args, **kwargs)
+ self._app = app
+ self._started_signaled = False
+ self._started_queue = Queue.Queue(maxsize=1)
+ self._tasks = {}
+ self._results = {}
+ self._receiver = None
+ self._stopped = False
+ self._receiver_thread = threading.Thread(target=self._events_receiver)
+ self._receiver_thread.daemon = True
+ self._receiver_thread.start()
+ self._started_queue.get(timeout=30)
+
+ def _execute(self, ctx):
+ self._tasks[ctx.id] = ctx
+ arguments = dict(arg.unwrapped for arg in ctx.task.arguments.itervalues())
+ arguments['ctx'] = ctx.context
+ self._results[ctx.id] = self._app.send_task(
+ ctx.operation_mapping,
+ kwargs=arguments,
+ task_id=ctx.task.id,
+ queue=self._get_queue(ctx))
+
+ def close(self):
+ self._stopped = True
+ if self._receiver:
+ self._receiver.should_stop = True
+ self._receiver_thread.join()
+
+ @staticmethod
+ def _get_queue(task):
+ return None if task else None # TODO
+
+ def _events_receiver(self):
+ with self._app.connection() as connection:
+ self._receiver = self._app.events.Receiver(connection, handlers={
+ 'task-started': self._celery_task_started,
+ 'task-succeeded': self._celery_task_succeeded,
+ 'task-failed': self._celery_task_failed,
+ })
+ for _ in self._receiver.itercapture(limit=None, timeout=None, wakeup=True):
+ if not self._started_signaled:
+ self._started_queue.put(True)
+ self._started_signaled = True
+ if self._stopped:
+ return
+
+ def _celery_task_started(self, event):
+ self._task_started(self._tasks[event['uuid']])
+
+ def _celery_task_succeeded(self, event):
+ task, _ = self._remove_task(event['uuid'])
+ self._task_succeeded(task)
+
+ def _celery_task_failed(self, event):
+ task, async_result = self._remove_task(event['uuid'])
+ try:
+ exception = async_result.result
+ except BaseException as e:
+ exception = RuntimeError(
+ 'Could not de-serialize exception of task {0} --> {1}: {2}'
+ .format(task.name, type(e).__name__, str(e)))
+ self._task_failed(task, exception=exception)
+
+ def _remove_task(self, task_id):
+ return self._tasks.pop(task_id), self._results.pop(task_id)
diff --git a/azure/aria/aria-extension-cloudify/src/aria/aria/orchestrator/workflows/executor/dry.py b/azure/aria/aria-extension-cloudify/src/aria/aria/orchestrator/workflows/executor/dry.py
new file mode 100644
index 0000000..9314e5d
--- /dev/null
+++ b/azure/aria/aria-extension-cloudify/src/aria/aria/orchestrator/workflows/executor/dry.py
@@ -0,0 +1,54 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""
+Dry task executor.
+"""
+
+from datetime import datetime
+
+from . import base
+
+
+class DryExecutor(base.BaseExecutor): # pylint: disable=abstract-method
+ """
+ Dry task executor: prints task information without causing any side effects.
+ """
+ def execute(self, ctx):
+ with ctx.persist_changes:
+ # updating the task manually instead of calling self._task_started(task),
+ # to avoid any side effects raising that event might cause
+ ctx.task.started_at = datetime.utcnow()
+ ctx.task.status = ctx.task.STARTED
+
+ dry_msg = '<dry> {name} {task.interface_name}.{task.operation_name} {suffix}'
+ logger = ctx.logger.info if ctx.task.function else ctx.logger.debug
+
+ if hasattr(ctx.task.actor, 'source_node'):
+ name = '{source_node.name}->{target_node.name}'.format(
+ source_node=ctx.task.actor.source_node, target_node=ctx.task.actor.target_node)
+ else:
+ name = ctx.task.actor.name
+
+ if ctx.task.function:
+ logger(dry_msg.format(name=name, task=ctx.task, suffix='started...'))
+ logger(dry_msg.format(name=name, task=ctx.task, suffix='successful'))
+ else:
+ logger(dry_msg.format(name=name, task=ctx.task, suffix='has no implementation'))
+
+ # updating the task manually instead of calling self._task_succeeded(task),
+ # to avoid any side effects raising that event might cause
+ ctx.task.ended_at = datetime.utcnow()
+ ctx.task.status = ctx.task.SUCCESS
diff --git a/azure/aria/aria-extension-cloudify/src/aria/aria/orchestrator/workflows/executor/process.py b/azure/aria/aria-extension-cloudify/src/aria/aria/orchestrator/workflows/executor/process.py
new file mode 100644
index 0000000..185f15f
--- /dev/null
+++ b/azure/aria/aria-extension-cloudify/src/aria/aria/orchestrator/workflows/executor/process.py
@@ -0,0 +1,350 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""
+Sub-process task executor.
+"""
+
+# pylint: disable=wrong-import-position
+
+import os
+import sys
+
+# As part of the process executor implementation, subprocess are started with this module as their
+# entry point. We thus remove this module's directory from the python path if it happens to be
+# there
+
+from collections import namedtuple
+
+script_dir = os.path.dirname(__file__)
+if script_dir in sys.path:
+ sys.path.remove(script_dir)
+
+import contextlib
+import io
+import threading
+import socket
+import struct
+import subprocess
+import tempfile
+import Queue
+import pickle
+
+import psutil
+import jsonpickle
+
+import aria
+from aria.orchestrator.workflows.executor import base
+from aria.extension import process_executor
+from aria.utils import (
+ imports,
+ exceptions,
+ process as process_utils
+)
+
+
+_INT_FMT = 'I'
+_INT_SIZE = struct.calcsize(_INT_FMT)
+UPDATE_TRACKED_CHANGES_FAILED_STR = \
+ 'Some changes failed writing to storage. For more info refer to the log.'
+
+
+_Task = namedtuple('_Task', 'proc, ctx')
+
+
+class ProcessExecutor(base.BaseExecutor):
+ """
+ Sub-process task executor.
+ """
+
+ def __init__(self, plugin_manager=None, python_path=None, *args, **kwargs):
+ super(ProcessExecutor, self).__init__(*args, **kwargs)
+ self._plugin_manager = plugin_manager
+
+ # Optional list of additional directories that should be added to
+ # subprocesses python path
+ self._python_path = python_path or []
+
+ # Flag that denotes whether this executor has been stopped
+ self._stopped = False
+
+ # Contains reference to all currently running tasks
+ self._tasks = {}
+
+ self._request_handlers = {
+ 'started': self._handle_task_started_request,
+ 'succeeded': self._handle_task_succeeded_request,
+ 'failed': self._handle_task_failed_request,
+ }
+
+ # Server socket used to accept task status messages from subprocesses
+ self._server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+ self._server_socket.bind(('localhost', 0))
+ self._server_socket.listen(10)
+ self._server_port = self._server_socket.getsockname()[1]
+
+ # Used to send a "closed" message to the listener when this executor is closed
+ self._messenger = _Messenger(task_id=None, port=self._server_port)
+
+ # Queue object used by the listener thread to notify this constructed it has started
+ # (see last line of this __init__ method)
+ self._listener_started = Queue.Queue()
+
+ # Listener thread to handle subprocesses task status messages
+ self._listener_thread = threading.Thread(target=self._listener)
+ self._listener_thread.daemon = True
+ self._listener_thread.start()
+
+ # Wait for listener thread to actually start before returning
+ self._listener_started.get(timeout=60)
+
+ def close(self):
+ if self._stopped:
+ return
+ self._stopped = True
+ # Listener thread may be blocked on "accept" call. This will wake it up with an explicit
+ # "closed" message
+ self._messenger.closed()
+ self._server_socket.close()
+ self._listener_thread.join(timeout=60)
+
+ # we use set(self._tasks) since tasks may change in the process of closing
+ for task_id in set(self._tasks):
+ self.terminate(task_id)
+
+ def terminate(self, task_id):
+ task = self._remove_task(task_id)
+ # The process might have managed to finish, thus it would not be in the tasks list
+ if task:
+ try:
+ parent_process = psutil.Process(task.proc.pid)
+ for child_process in reversed(parent_process.children(recursive=True)):
+ try:
+ child_process.kill()
+ except BaseException:
+ pass
+ parent_process.kill()
+ except BaseException:
+ pass
+
+ def _execute(self, ctx):
+ self._check_closed()
+
+ # Temporary file used to pass arguments to the started subprocess
+ file_descriptor, arguments_json_path = tempfile.mkstemp(prefix='executor-', suffix='.json')
+ os.close(file_descriptor)
+ with open(arguments_json_path, 'wb') as f:
+ f.write(pickle.dumps(self._create_arguments_dict(ctx)))
+
+ env = self._construct_subprocess_env(task=ctx.task)
+ # Asynchronously start the operation in a subprocess
+ proc = subprocess.Popen(
+ [
+ sys.executable,
+ os.path.expanduser(os.path.expandvars(__file__)),
+ os.path.expanduser(os.path.expandvars(arguments_json_path))
+ ],
+ env=env)
+
+ self._tasks[ctx.task.id] = _Task(ctx=ctx, proc=proc)
+
+ def _remove_task(self, task_id):
+ return self._tasks.pop(task_id, None)
+
+ def _check_closed(self):
+ if self._stopped:
+ raise RuntimeError('Executor closed')
+
+ def _create_arguments_dict(self, ctx):
+ return {
+ 'task_id': ctx.task.id,
+ 'function': ctx.task.function,
+ 'operation_arguments': dict(arg.unwrapped for arg in ctx.task.arguments.itervalues()),
+ 'port': self._server_port,
+ 'context': ctx.serialization_dict
+ }
+
+ def _construct_subprocess_env(self, task):
+ env = os.environ.copy()
+
+ if task.plugin_fk and self._plugin_manager:
+ # If this is a plugin operation,
+ # load the plugin on the subprocess env we're constructing
+ self._plugin_manager.load_plugin(task.plugin, env=env)
+
+ # Add user supplied directories to injected PYTHONPATH
+ if self._python_path:
+ process_utils.append_to_pythonpath(*self._python_path, env=env)
+
+ return env
+
+ def _listener(self):
+ # Notify __init__ method this thread has actually started
+ self._listener_started.put(True)
+ while not self._stopped:
+ try:
+ with self._accept_request() as (request, response):
+ request_type = request['type']
+ if request_type == 'closed':
+ break
+ request_handler = self._request_handlers.get(request_type)
+ if not request_handler:
+ raise RuntimeError('Invalid request type: {0}'.format(request_type))
+ task_id = request['task_id']
+ request_handler(task_id=task_id, request=request, response=response)
+ except BaseException as e:
+ self.logger.debug('Error in process executor listener: {0}'.format(e))
+
+ @contextlib.contextmanager
+ def _accept_request(self):
+ with contextlib.closing(self._server_socket.accept()[0]) as connection:
+ message = _recv_message(connection)
+ response = {}
+ try:
+ yield message, response
+ except BaseException as e:
+ response['exception'] = exceptions.wrap_if_needed(e)
+ raise
+ finally:
+ _send_message(connection, response)
+
+ def _handle_task_started_request(self, task_id, **kwargs):
+ self._task_started(self._tasks[task_id].ctx)
+
+ def _handle_task_succeeded_request(self, task_id, **kwargs):
+ task = self._remove_task(task_id)
+ if task:
+ self._task_succeeded(task.ctx)
+
+ def _handle_task_failed_request(self, task_id, request, **kwargs):
+ task = self._remove_task(task_id)
+ if task:
+ self._task_failed(
+ task.ctx, exception=request['exception'], traceback=request['traceback'])
+
+
+def _send_message(connection, message):
+
+ # Packing the length of the entire msg using struct.pack.
+ # This enables later reading of the content.
+ def _pack(data):
+ return struct.pack(_INT_FMT, len(data))
+
+ data = jsonpickle.dumps(message)
+ msg_metadata = _pack(data)
+ connection.send(msg_metadata)
+ connection.sendall(data)
+
+
+def _recv_message(connection):
+ # Retrieving the length of the msg to come.
+ def _unpack(conn):
+ return struct.unpack(_INT_FMT, _recv_bytes(conn, _INT_SIZE))[0]
+
+ msg_metadata_len = _unpack(connection)
+ msg = _recv_bytes(connection, msg_metadata_len)
+ return jsonpickle.loads(msg)
+
+
+def _recv_bytes(connection, count):
+ result = io.BytesIO()
+ while True:
+ if not count:
+ return result.getvalue()
+ read = connection.recv(count)
+ if not read:
+ return result.getvalue()
+ result.write(read)
+ count -= len(read)
+
+
+class _Messenger(object):
+
+ def __init__(self, task_id, port):
+ self.task_id = task_id
+ self.port = port
+
+ def started(self):
+ """Task started message"""
+ self._send_message(type='started')
+
+ def succeeded(self):
+ """Task succeeded message"""
+ self._send_message(type='succeeded')
+
+ def failed(self, exception):
+ """Task failed message"""
+ self._send_message(type='failed', exception=exception)
+
+ def closed(self):
+ """Executor closed message"""
+ self._send_message(type='closed')
+
+ def _send_message(self, type, exception=None):
+ sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+ sock.connect(('localhost', self.port))
+ try:
+ _send_message(sock, {
+ 'type': type,
+ 'task_id': self.task_id,
+ 'exception': exceptions.wrap_if_needed(exception),
+ 'traceback': exceptions.get_exception_as_string(*sys.exc_info()),
+ })
+ response = _recv_message(sock)
+ response_exception = response.get('exception')
+ if response_exception:
+ raise response_exception
+ finally:
+ sock.close()
+
+
+def _main():
+ arguments_json_path = sys.argv[1]
+ with open(arguments_json_path) as f:
+ arguments = pickle.loads(f.read())
+
+ # arguments_json_path is a temporary file created by the parent process.
+ # so we remove it here
+ os.remove(arguments_json_path)
+
+ task_id = arguments['task_id']
+ port = arguments['port']
+ messenger = _Messenger(task_id=task_id, port=port)
+
+ function = arguments['function']
+ operation_arguments = arguments['operation_arguments']
+ context_dict = arguments['context']
+
+ try:
+ ctx = context_dict['context_cls'].instantiate_from_dict(**context_dict['context'])
+ except BaseException as e:
+ messenger.failed(e)
+ return
+
+ try:
+ messenger.started()
+ task_func = imports.load_attribute(function)
+ aria.install_aria_extensions()
+ for decorate in process_executor.decorate():
+ task_func = decorate(task_func)
+ task_func(ctx=ctx, **operation_arguments)
+ ctx.close()
+ messenger.succeeded()
+ except BaseException as e:
+ ctx.close()
+ messenger.failed(e)
+
+if __name__ == '__main__':
+ _main()
diff --git a/azure/aria/aria-extension-cloudify/src/aria/aria/orchestrator/workflows/executor/thread.py b/azure/aria/aria-extension-cloudify/src/aria/aria/orchestrator/workflows/executor/thread.py
new file mode 100644
index 0000000..170620e
--- /dev/null
+++ b/azure/aria/aria-extension-cloudify/src/aria/aria/orchestrator/workflows/executor/thread.py
@@ -0,0 +1,79 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""
+Thread task executor.
+"""
+
+import Queue
+import threading
+
+import sys
+
+from aria.utils import imports, exceptions
+
+from .base import BaseExecutor
+
+
+class ThreadExecutor(BaseExecutor):
+ """
+ Thread task executor.
+
+ It's easier writing tests using this executor rather than the full-blown sub-process executor.
+
+ Note: This executor is incapable of running plugin operations.
+ """
+
+ def __init__(self, pool_size=1, close_timeout=5, *args, **kwargs):
+ super(ThreadExecutor, self).__init__(*args, **kwargs)
+ self._stopped = False
+ self._close_timeout = close_timeout
+ self._queue = Queue.Queue()
+ self._pool = []
+ for i in range(pool_size):
+ name = 'ThreadExecutor-{index}'.format(index=i+1)
+ thread = threading.Thread(target=self._processor, name=name)
+ thread.daemon = True
+ thread.start()
+ self._pool.append(thread)
+
+ def _execute(self, ctx):
+ self._queue.put(ctx)
+
+ def close(self):
+ self._stopped = True
+ for thread in self._pool:
+ if self._close_timeout is None:
+ thread.join()
+ else:
+ thread.join(self._close_timeout)
+
+ def _processor(self):
+ while not self._stopped:
+ try:
+ ctx = self._queue.get(timeout=1)
+ self._task_started(ctx)
+ try:
+ task_func = imports.load_attribute(ctx.task.function)
+ arguments = dict(arg.unwrapped for arg in ctx.task.arguments.itervalues())
+ task_func(ctx=ctx, **arguments)
+ self._task_succeeded(ctx)
+ except BaseException as e:
+ self._task_failed(ctx,
+ exception=e,
+ traceback=exceptions.get_exception_as_string(*sys.exc_info()))
+ # Daemon threads
+ except BaseException as e:
+ pass