diff options
Diffstat (limited to 'azure/aria/aria-extension-cloudify/src/aria/aria/orchestrator/workflows')
24 files changed, 2549 insertions, 0 deletions
diff --git a/azure/aria/aria-extension-cloudify/src/aria/aria/orchestrator/workflows/__init__.py b/azure/aria/aria-extension-cloudify/src/aria/aria/orchestrator/workflows/__init__.py new file mode 100644 index 0000000..1f6c368 --- /dev/null +++ b/azure/aria/aria-extension-cloudify/src/aria/aria/orchestrator/workflows/__init__.py @@ -0,0 +1,21 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +Workflows package. +""" + +# Import required so that logging signals are registered +from . import events_logging diff --git a/azure/aria/aria-extension-cloudify/src/aria/aria/orchestrator/workflows/api/__init__.py b/azure/aria/aria-extension-cloudify/src/aria/aria/orchestrator/workflows/api/__init__.py new file mode 100644 index 0000000..587eee3 --- /dev/null +++ b/azure/aria/aria-extension-cloudify/src/aria/aria/orchestrator/workflows/api/__init__.py @@ -0,0 +1,20 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +Workflow API. +""" + +from . import task, task_graph diff --git a/azure/aria/aria-extension-cloudify/src/aria/aria/orchestrator/workflows/api/task.py b/azure/aria/aria-extension-cloudify/src/aria/aria/orchestrator/workflows/api/task.py new file mode 100644 index 0000000..6ce4a00 --- /dev/null +++ b/azure/aria/aria-extension-cloudify/src/aria/aria/orchestrator/workflows/api/task.py @@ -0,0 +1,272 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +Provides the tasks to be entered into the task graph +""" + +from ... import context +from ....modeling import models +from ....modeling import utils as modeling_utils +from ....utils.uuid import generate_uuid +from .. import exceptions + + +class BaseTask(object): + """ + Base class for tasks. + """ + + def __init__(self, ctx=None, **kwargs): + if ctx is not None: + self._workflow_context = ctx + else: + self._workflow_context = context.workflow.current.get() + self._id = generate_uuid(variant='uuid') + + @property + def id(self): + """ + UUID4 ID. + """ + return self._id + + @property + def workflow_context(self): + """ + Context of the current workflow. + """ + return self._workflow_context + + +class OperationTask(BaseTask): + """ + Executes an operation. + + :ivar name: formatted name (includes actor type, actor name, and interface/operation names) + :vartype name: basestring + :ivar actor: node or relationship + :vartype actor: :class:`~aria.modeling.models.Node` or + :class:`~aria.modeling.models.Relationship` + :ivar interface_name: interface name on actor + :vartype interface_name: basestring + :ivar operation_name: operation name on interface + :vartype operation_name: basestring + :ivar plugin: plugin (or None for default plugin) + :vartype plugin: :class:`~aria.modeling.models.Plugin` + :ivar function: path to Python function + :vartype function: basestring + :ivar arguments: arguments to send to Python function + :vartype arguments: {:obj:`basestring`: :class:`~aria.modeling.models.Argument`} + :ivar ignore_failure: whether to ignore failures + :vartype ignore_failure: bool + :ivar max_attempts: maximum number of attempts allowed in case of failure + :vartype max_attempts: int + :ivar retry_interval: interval between retries (in seconds) + :vartype retry_interval: float + """ + + NAME_FORMAT = '{interface}:{operation}@{type}:{name}' + + def __init__(self, + actor, + interface_name, + operation_name, + arguments=None, + ignore_failure=None, + max_attempts=None, + retry_interval=None): + """ + :param actor: node or relationship + :type actor: :class:`~aria.modeling.models.Node` or + :class:`~aria.modeling.models.Relationship` + :param interface_name: interface name on actor + :type interface_name: basestring + :param operation_name: operation name on interface + :type operation_name: basestring + :param arguments: override argument values + :type arguments: {:obj:`basestring`: object} + :param ignore_failure: override whether to ignore failures + :type ignore_failure: bool + :param max_attempts: override maximum number of attempts allowed in case of failure + :type max_attempts: int + :param retry_interval: override interval between retries (in seconds) + :type retry_interval: float + :raises ~aria.orchestrator.workflows.exceptions.OperationNotFoundException: if + ``interface_name`` and ``operation_name`` do not refer to an operation on the actor + """ + + # Creating OperationTask directly should raise an error when there is no + # interface/operation. + if not has_operation(actor, interface_name, operation_name): + raise exceptions.OperationNotFoundException( + 'Could not find operation "{operation_name}" on interface ' + '"{interface_name}" for {actor_type} "{actor.name}"'.format( + operation_name=operation_name, + interface_name=interface_name, + actor_type=type(actor).__name__.lower(), + actor=actor) + ) + + super(OperationTask, self).__init__() + + self.name = OperationTask.NAME_FORMAT.format(type=type(actor).__name__.lower(), + name=actor.name, + interface=interface_name, + operation=operation_name) + self.actor = actor + self.interface_name = interface_name + self.operation_name = operation_name + self.ignore_failure = \ + self.workflow_context._task_ignore_failure if ignore_failure is None else ignore_failure + self.max_attempts = max_attempts or self.workflow_context._task_max_attempts + self.retry_interval = retry_interval or self.workflow_context._task_retry_interval + + operation = self.actor.interfaces[self.interface_name].operations[self.operation_name] + self.plugin = operation.plugin + self.function = operation.function + self.arguments = modeling_utils.merge_parameter_values(arguments, operation.arguments) + + actor = self.actor + if hasattr(actor, '_wrapped'): + # Unwrap instrumented model + actor = actor._wrapped + + if isinstance(actor, models.Node): + self._context_cls = context.operation.NodeOperationContext + elif isinstance(actor, models.Relationship): + self._context_cls = context.operation.RelationshipOperationContext + else: + raise exceptions.TaskCreationException('Could not create valid context for ' + '{actor.__class__}'.format(actor=actor)) + + def __repr__(self): + return self.name + + +class StubTask(BaseTask): + """ + Enables creating empty tasks. + """ + + +class WorkflowTask(BaseTask): + """ + Executes a complete workflow. + """ + + def __init__(self, workflow_func, **kwargs): + """ + :param workflow_func: function to run + :param kwargs: kwargs that would be passed to the workflow_func + """ + super(WorkflowTask, self).__init__(**kwargs) + kwargs['ctx'] = self.workflow_context + self._graph = workflow_func(**kwargs) + + @property + def graph(self): + """ + Graph constructed by the sub workflow. + """ + return self._graph + + def __getattr__(self, item): + try: + return getattr(self._graph, item) + except AttributeError: + return super(WorkflowTask, self).__getattribute__(item) + + +def create_task(actor, interface_name, operation_name, **kwargs): + """ + Helper function that enables safe creation of :class:`OperationTask`. If the supplied interface + or operation do not exist, ``None`` is returned. + + :param actor: actor for this task + :param interface_name: name of the interface + :param operation_name: name of the operation + :param kwargs: any additional kwargs to be passed to the OperationTask + :return: OperationTask or None (if the interface/operation does not exists) + """ + try: + return OperationTask( + actor, + interface_name=interface_name, + operation_name=operation_name, + **kwargs + ) + except exceptions.OperationNotFoundException: + return None + + +def create_relationships_tasks( + node, interface_name, source_operation_name=None, target_operation_name=None, **kwargs): + """ + Creates a relationship task (source and target) for all of a node relationships. + + :param basestring source_operation_name: relationship operation name + :param basestring interface_name: name of the interface + :param source_operation_name: + :param target_operation_name: + :param node: source node + """ + sub_tasks = [] + for relationship in node.outbound_relationships: + relationship_operations = create_relationship_tasks( + relationship, + interface_name, + source_operation_name=source_operation_name, + target_operation_name=target_operation_name, + **kwargs) + sub_tasks.append(relationship_operations) + return sub_tasks + + +def create_relationship_tasks(relationship, interface_name, source_operation_name=None, + target_operation_name=None, **kwargs): + """ + Creates a relationship task (source and target). + + :param relationship: relationship instance itself + :param source_operation_name: + :param target_operation_name: + """ + operations = [] + if source_operation_name: + operations.append( + create_task( + relationship, + interface_name=interface_name, + operation_name=source_operation_name, + **kwargs + ) + ) + if target_operation_name: + operations.append( + create_task( + relationship, + interface_name=interface_name, + operation_name=target_operation_name, + **kwargs + ) + ) + + return [o for o in operations if o] + + +def has_operation(actor, interface_name, operation_name): + interface = actor.interfaces.get(interface_name, None) + return interface and interface.operations.get(operation_name, False) diff --git a/azure/aria/aria-extension-cloudify/src/aria/aria/orchestrator/workflows/api/task_graph.py b/azure/aria/aria-extension-cloudify/src/aria/aria/orchestrator/workflows/api/task_graph.py new file mode 100644 index 0000000..900a0d1 --- /dev/null +++ b/azure/aria/aria-extension-cloudify/src/aria/aria/orchestrator/workflows/api/task_graph.py @@ -0,0 +1,295 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +Task graph. +""" + +from collections import Iterable + +from networkx import DiGraph, topological_sort + +from ....utils.uuid import generate_uuid +from . import task as api_task + + +class TaskNotInGraphError(Exception): + """ + An error representing a scenario where a given task is not in the graph as expected. + """ + pass + + +def _filter_out_empty_tasks(func=None): + if func is None: + return lambda f: _filter_out_empty_tasks(func=f) + + def _wrapper(task, *tasks, **kwargs): + return func(*(t for t in (task,) + tuple(tasks) if t), **kwargs) + return _wrapper + + +class TaskGraph(object): + """ + Task graph builder. + """ + + def __init__(self, name): + self.name = name + self._id = generate_uuid(variant='uuid') + self._graph = DiGraph() + + def __repr__(self): + return '{name}(id={self._id}, name={self.name}, graph={self._graph!r})'.format( + name=self.__class__.__name__, self=self) + + @property + def id(self): + """ + ID of the graph + """ + return self._id + + # graph traversal methods + + @property + def tasks(self): + """ + Iterator over tasks in the graph. + """ + for _, data in self._graph.nodes_iter(data=True): + yield data['task'] + + def topological_order(self, reverse=False): + """ + Topological sort of the graph. + + :param reverse: whether to reverse the sort + :return: list which represents the topological sort + """ + for task_id in topological_sort(self._graph, reverse=reverse): + yield self.get_task(task_id) + + def get_dependencies(self, dependent_task): + """ + Iterates over the task's dependencies. + + :param dependent_task: task whose dependencies are requested + :raises ~aria.orchestrator.workflows.api.task_graph.TaskNotInGraphError: if + ``dependent_task`` is not in the graph + """ + if not self.has_tasks(dependent_task): + raise TaskNotInGraphError('Task id: {0}'.format(dependent_task.id)) + for _, dependency_id in self._graph.out_edges_iter(dependent_task.id): + yield self.get_task(dependency_id) + + def get_dependents(self, dependency_task): + """ + Iterates over the task's dependents. + + :param dependency_task: task whose dependents are requested + :raises ~aria.orchestrator.workflows.api.task_graph.TaskNotInGraphError: if + ``dependency_task`` is not in the graph + """ + if not self.has_tasks(dependency_task): + raise TaskNotInGraphError('Task id: {0}'.format(dependency_task.id)) + for dependent_id, _ in self._graph.in_edges_iter(dependency_task.id): + yield self.get_task(dependent_id) + + # task methods + + def get_task(self, task_id): + """ + Get a task instance that's been inserted to the graph by the task's ID. + + :param basestring task_id: task ID + :raises ~aria.orchestrator.workflows.api.task_graph.TaskNotInGraphError: if no task found in + the graph with the given ID + """ + if not self._graph.has_node(task_id): + raise TaskNotInGraphError('Task id: {0}'.format(task_id)) + data = self._graph.node[task_id] + return data['task'] + + @_filter_out_empty_tasks + def add_tasks(self, *tasks): + """ + Adds a task to the graph. + + :param task: task + :return: list of added tasks + :rtype: list + """ + assert all([isinstance(task, (api_task.BaseTask, Iterable)) for task in tasks]) + return_tasks = [] + + for task in tasks: + if isinstance(task, Iterable): + return_tasks += self.add_tasks(*task) + elif not self.has_tasks(task): + self._graph.add_node(task.id, task=task) + return_tasks.append(task) + + return return_tasks + + @_filter_out_empty_tasks + def remove_tasks(self, *tasks): + """ + Removes the provided task from the graph. + + :param task: task + :return: list of removed tasks + :rtype: list + """ + return_tasks = [] + + for task in tasks: + if isinstance(task, Iterable): + return_tasks += self.remove_tasks(*task) + elif self.has_tasks(task): + self._graph.remove_node(task.id) + return_tasks.append(task) + + return return_tasks + + @_filter_out_empty_tasks + def has_tasks(self, *tasks): + """ + Checks whether a task is in the graph. + + :param task: task + :return: ``True`` if all tasks are in the graph, otherwise ``False`` + :rtype: list + """ + assert all(isinstance(t, (api_task.BaseTask, Iterable)) for t in tasks) + return_value = True + + for task in tasks: + if isinstance(task, Iterable): + return_value &= self.has_tasks(*task) + else: + return_value &= self._graph.has_node(task.id) + + return return_value + + def add_dependency(self, dependent, dependency): + """ + Adds a dependency for one item (task, sequence or parallel) on another. + + The dependent will only be executed after the dependency terminates. If either of the items + is either a sequence or a parallel, multiple dependencies may be added. + + :param dependent: dependent (task, sequence or parallel) + :param dependency: dependency (task, sequence or parallel) + :return: ``True`` if the dependency between the two hadn't already existed, otherwise + ``False`` + :rtype: bool + :raises ~aria.orchestrator.workflows.api.task_graph.TaskNotInGraphError: if either the + dependent or dependency are tasks which are not in the graph + """ + if not (self.has_tasks(dependent) and self.has_tasks(dependency)): + raise TaskNotInGraphError() + + if self.has_dependency(dependent, dependency): + return + + if isinstance(dependent, Iterable): + for dependent_task in dependent: + self.add_dependency(dependent_task, dependency) + else: + if isinstance(dependency, Iterable): + for dependency_task in dependency: + self.add_dependency(dependent, dependency_task) + else: + self._graph.add_edge(dependent.id, dependency.id) + + def has_dependency(self, dependent, dependency): + """ + Checks whether one item (task, sequence or parallel) depends on another. + + Note that if either of the items is either a sequence or a parallel, and some of the + dependencies exist in the graph but not all of them, this method will return ``False``. + + :param dependent: dependent (task, sequence or parallel) + :param dependency: dependency (task, sequence or parallel) + :return: ``True`` if the dependency between the two exists, otherwise ``False`` + :rtype: bool + :raises ~aria.orchestrator.workflows.api.task_graph.TaskNotInGraphError: if either the + dependent or dependency are tasks which are not in the graph + """ + if not (dependent and dependency): + return False + elif not (self.has_tasks(dependent) and self.has_tasks(dependency)): + raise TaskNotInGraphError() + + return_value = True + + if isinstance(dependent, Iterable): + for dependent_task in dependent: + return_value &= self.has_dependency(dependent_task, dependency) + else: + if isinstance(dependency, Iterable): + for dependency_task in dependency: + return_value &= self.has_dependency(dependent, dependency_task) + else: + return_value &= self._graph.has_edge(dependent.id, dependency.id) + + return return_value + + def remove_dependency(self, dependent, dependency): + """ + Removes a dependency for one item (task, sequence or parallel) on another. + + Note that if either of the items is either a sequence or a parallel, and some of the + dependencies exist in the graph but not all of them, this method will not remove any of the + dependencies and return ``False``. + + :param dependent: dependent (task, sequence or parallel) + :param dependency: dependency (task, sequence or parallel) + :return: ``False`` if the dependency between the two hadn't existed, otherwise ``True`` + :rtype: bool + :raises ~aria.orchestrator.workflows.api.task_graph.TaskNotInGraphError: if either the + dependent or dependency are tasks which are not in the graph + """ + if not (self.has_tasks(dependent) and self.has_tasks(dependency)): + raise TaskNotInGraphError() + + if not self.has_dependency(dependent, dependency): + return + + if isinstance(dependent, Iterable): + for dependent_task in dependent: + self.remove_dependency(dependent_task, dependency) + elif isinstance(dependency, Iterable): + for dependency_task in dependency: + self.remove_dependency(dependent, dependency_task) + else: + self._graph.remove_edge(dependent.id, dependency.id) + + @_filter_out_empty_tasks + def sequence(self, *tasks): + """ + Creates and inserts a sequence into the graph, effectively each task i depends on i-1. + + :param tasks: iterable of dependencies + :return: provided tasks + """ + if tasks: + self.add_tasks(*tasks) + + for i in xrange(1, len(tasks)): + self.add_dependency(tasks[i], tasks[i-1]) + + return tasks diff --git a/azure/aria/aria-extension-cloudify/src/aria/aria/orchestrator/workflows/builtin/__init__.py b/azure/aria/aria-extension-cloudify/src/aria/aria/orchestrator/workflows/builtin/__init__.py new file mode 100644 index 0000000..1b2f390 --- /dev/null +++ b/azure/aria/aria-extension-cloudify/src/aria/aria/orchestrator/workflows/builtin/__init__.py @@ -0,0 +1,36 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +Built-in workflows. +""" + +from .install import install +from .uninstall import uninstall +from .start import start +from .stop import stop + + +BUILTIN_WORKFLOWS = ('install', 'uninstall', 'start', 'stop') +BUILTIN_WORKFLOWS_PATH_PREFIX = 'aria.orchestrator.workflows.builtin' + + +__all__ = [ + 'BUILTIN_WORKFLOWS', + 'install', + 'uninstall', + 'start', + 'stop' +] diff --git a/azure/aria/aria-extension-cloudify/src/aria/aria/orchestrator/workflows/builtin/execute_operation.py b/azure/aria/aria-extension-cloudify/src/aria/aria/orchestrator/workflows/builtin/execute_operation.py new file mode 100644 index 0000000..949f864 --- /dev/null +++ b/azure/aria/aria-extension-cloudify/src/aria/aria/orchestrator/workflows/builtin/execute_operation.py @@ -0,0 +1,101 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +Built-in operation execution Workflow. +""" + +from ... import workflow +from ..api import task + + +@workflow +def execute_operation( + ctx, + graph, + interface_name, + operation_name, + operation_kwargs, + run_by_dependency_order, + type_names, + node_template_ids, + node_ids, + **kwargs): + """ + Built-in operation execution Workflow. + + :param workflow_context: workflow context + :param graph: graph which will describe the workflow + :param operation: operation name to execute + :param operation_kwargs: + :param run_by_dependency_order: + :param type_names: + :param node_template_ids: + :param node_ids: + :param kwargs: + :return: + """ + subgraphs = {} + # filtering node instances + filtered_nodes = list(_filter_nodes( + context=ctx, + node_template_ids=node_template_ids, + node_ids=node_ids, + type_names=type_names)) + + if run_by_dependency_order: + filtered_node_ids = set(node_instance.id for node_instance in filtered_nodes) + for node in ctx.nodes: + if node.id not in filtered_node_ids: + subgraphs[node.id] = ctx.task_graph( + name='execute_operation_stub_{0}'.format(node.id)) + + # registering actual tasks to sequences + for node in filtered_nodes: + graph.add_tasks( + task.OperationTask( + node, + interface_name=interface_name, + operation_name=operation_name, + arguments=operation_kwargs + ) + ) + + for _, node_sub_workflow in subgraphs.items(): + graph.add_tasks(node_sub_workflow) + + # adding tasks dependencies if required + if run_by_dependency_order: + for node in ctx.nodes: + for relationship in node.relationships: + graph.add_dependency( + source_task=subgraphs[node.id], after=[subgraphs[relationship.target_id]]) + + +def _filter_nodes(context, node_template_ids=(), node_ids=(), type_names=()): + def _is_node_template_by_id(node_template_id): + return not node_template_ids or node_template_id in node_template_ids + + def _is_node_by_id(node_id): + return not node_ids or node_id in node_ids + + def _is_node_by_type(node_type): + return not node_type.name in type_names + + for node in context.nodes: + if all((_is_node_template_by_id(node.node_template.id), + _is_node_by_id(node.id), + _is_node_by_type(node.node_template.type))): + yield node diff --git a/azure/aria/aria-extension-cloudify/src/aria/aria/orchestrator/workflows/builtin/heal.py b/azure/aria/aria-extension-cloudify/src/aria/aria/orchestrator/workflows/builtin/heal.py new file mode 100644 index 0000000..07e27b1 --- /dev/null +++ b/azure/aria/aria-extension-cloudify/src/aria/aria/orchestrator/workflows/builtin/heal.py @@ -0,0 +1,179 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# pylint: skip-file + +""" +Built-in heal workflow. +""" + +from aria import workflow + +from .workflows import (install_node, uninstall_node) +from ..api import task + + +@workflow +def heal(ctx, graph, node_id): + """ + Built-in heal workflow.. + + :param ctx: workflow context + :param graph: graph which will describe the workflow. + :param node_id: ID of the node to heal + :return: + """ + failing_node = ctx.model.node.get(node_id) + host_node = ctx.model.node.get(failing_node.host.id) + failed_node_subgraph = _get_contained_subgraph(ctx, host_node) + failed_node_ids = list(n.id for n in failed_node_subgraph) + + targeted_nodes = [node for node in ctx.nodes + if node.id not in failed_node_ids] + + uninstall_subgraph = task.WorkflowTask( + heal_uninstall, + failing_nodes=failed_node_subgraph, + targeted_nodes=targeted_nodes + ) + + install_subgraph = task.WorkflowTask( + heal_install, + failing_nodes=failed_node_subgraph, + targeted_nodes=targeted_nodes) + + graph.sequence(uninstall_subgraph, install_subgraph) + + +@workflow(suffix_template='{failing_nodes}') +def heal_uninstall(ctx, graph, failing_nodes, targeted_nodes): + """ + Uninstall phase of the heal mechanism. + + :param ctx: workflow context + :param graph: task graph to edit + :param failing_nodes: failing nodes to heal + :param targeted_nodes: targets of the relationships where the failing node are + """ + node_sub_workflows = {} + + # Create install stub workflow for each unaffected node + for node in targeted_nodes: + node_stub = task.StubTask() + node_sub_workflows[node.id] = node_stub + graph.add_tasks(node_stub) + + # create install sub workflow for every node + for node in failing_nodes: + node_sub_workflow = task.WorkflowTask(uninstall_node, + node=node) + node_sub_workflows[node.id] = node_sub_workflow + graph.add_tasks(node_sub_workflow) + + # create dependencies between the node sub workflow + for node in failing_nodes: + node_sub_workflow = node_sub_workflows[node.id] + for relationship in reversed(node.outbound_relationships): + graph.add_dependency( + node_sub_workflows[relationship.target_node.id], + node_sub_workflow) + + # Add operations for intact nodes depending on a node belonging to nodes + for node in targeted_nodes: + node_sub_workflow = node_sub_workflows[node.id] + + for relationship in reversed(node.outbound_relationships): + + target_node = \ + ctx.model.node.get(relationship.target_node.id) + target_node_subgraph = node_sub_workflows[target_node.id] + graph.add_dependency(target_node_subgraph, node_sub_workflow) + + if target_node in failing_nodes: + dependency = task.create_relationship_tasks( + relationship=relationship, + operation_name='aria.interfaces.relationship_lifecycle.unlink') + graph.add_tasks(*dependency) + graph.add_dependency(node_sub_workflow, dependency) + + +@workflow(suffix_template='{failing_nodes}') +def heal_install(ctx, graph, failing_nodes, targeted_nodes): + """ + Install phase of the heal mechanism. + + :param ctx: workflow context + :param graph: task graph to edit. + :param failing_nodes: failing nodes to heal + :param targeted_nodes: targets of the relationships where the failing node are + """ + node_sub_workflows = {} + + # Create install sub workflow for each unaffected + for node in targeted_nodes: + node_stub = task.StubTask() + node_sub_workflows[node.id] = node_stub + graph.add_tasks(node_stub) + + # create install sub workflow for every node + for node in failing_nodes: + node_sub_workflow = task.WorkflowTask(install_node, + node=node) + node_sub_workflows[node.id] = node_sub_workflow + graph.add_tasks(node_sub_workflow) + + # create dependencies between the node sub workflow + for node in failing_nodes: + node_sub_workflow = node_sub_workflows[node.id] + if node.outbound_relationships: + dependencies = \ + [node_sub_workflows[relationship.target_node.id] + for relationship in node.outbound_relationships] + graph.add_dependency(node_sub_workflow, dependencies) + + # Add operations for intact nodes depending on a node + # belonging to nodes + for node in targeted_nodes: + node_sub_workflow = node_sub_workflows[node.id] + + for relationship in node.outbound_relationships: + target_node = ctx.model.node.get( + relationship.target_node.id) + target_node_subworkflow = node_sub_workflows[target_node.id] + graph.add_dependency(node_sub_workflow, target_node_subworkflow) + + if target_node in failing_nodes: + dependent = task.create_relationship_tasks( + relationship=relationship, + operation_name='aria.interfaces.relationship_lifecycle.establish') + graph.add_tasks(*dependent) + graph.add_dependency(dependent, node_sub_workflow) + + +def _get_contained_subgraph(context, host_node): + contained_instances = [node + for node in context.nodes + if node.host_fk == host_node.id and + node.host_fk != node.id] + result = [host_node] + + if not contained_instances: + return result + + result.extend(contained_instances) + for node in contained_instances: + result.extend(_get_contained_subgraph(context, node)) + + return set(result) diff --git a/azure/aria/aria-extension-cloudify/src/aria/aria/orchestrator/workflows/builtin/install.py b/azure/aria/aria-extension-cloudify/src/aria/aria/orchestrator/workflows/builtin/install.py new file mode 100644 index 0000000..1e7c531 --- /dev/null +++ b/azure/aria/aria-extension-cloudify/src/aria/aria/orchestrator/workflows/builtin/install.py @@ -0,0 +1,34 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +Built-in install workflow. +""" + +from ... import workflow +from ..api import task as api_task +from . import workflows + + +@workflow +def install(ctx, graph): + """ + Built-in install workflow. + """ + tasks_and_nodes = [] + for node in ctx.nodes: + tasks_and_nodes.append((api_task.WorkflowTask(workflows.install_node, node=node), node)) + graph.add_tasks([task for task, _ in tasks_and_nodes]) + workflows.create_node_task_dependencies(graph, tasks_and_nodes) diff --git a/azure/aria/aria-extension-cloudify/src/aria/aria/orchestrator/workflows/builtin/start.py b/azure/aria/aria-extension-cloudify/src/aria/aria/orchestrator/workflows/builtin/start.py new file mode 100644 index 0000000..c02a26d --- /dev/null +++ b/azure/aria/aria-extension-cloudify/src/aria/aria/orchestrator/workflows/builtin/start.py @@ -0,0 +1,31 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +Built-in start workflow. +""" + +from .workflows import start_node +from ... import workflow +from ..api import task as api_task + + +@workflow +def start(ctx, graph): + """ + Built-in start workflow. + """ + for node in ctx.model.node.iter(): + graph.add_tasks(api_task.WorkflowTask(start_node, node=node)) diff --git a/azure/aria/aria-extension-cloudify/src/aria/aria/orchestrator/workflows/builtin/stop.py b/azure/aria/aria-extension-cloudify/src/aria/aria/orchestrator/workflows/builtin/stop.py new file mode 100644 index 0000000..6f9930b --- /dev/null +++ b/azure/aria/aria-extension-cloudify/src/aria/aria/orchestrator/workflows/builtin/stop.py @@ -0,0 +1,31 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +Built-in stop workflow. +""" + +from .workflows import stop_node +from ..api import task as api_task +from ... import workflow + + +@workflow +def stop(ctx, graph): + """ + Built-in stop workflow. + """ + for node in ctx.model.node.iter(): + graph.add_tasks(api_task.WorkflowTask(stop_node, node=node)) diff --git a/azure/aria/aria-extension-cloudify/src/aria/aria/orchestrator/workflows/builtin/uninstall.py b/azure/aria/aria-extension-cloudify/src/aria/aria/orchestrator/workflows/builtin/uninstall.py new file mode 100644 index 0000000..7925f4b --- /dev/null +++ b/azure/aria/aria-extension-cloudify/src/aria/aria/orchestrator/workflows/builtin/uninstall.py @@ -0,0 +1,34 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +Built-in uninstall workflow. +""" + +from ... import workflow +from ..api import task as api_task +from . import workflows + + +@workflow +def uninstall(ctx, graph): + """ + Built-in uninstall workflow. + """ + tasks_and_nodes = [] + for node in ctx.nodes: + tasks_and_nodes.append((api_task.WorkflowTask(workflows.uninstall_node, node=node), node)) + graph.add_tasks([task for task, _ in tasks_and_nodes]) + workflows.create_node_task_dependencies(graph, tasks_and_nodes, reverse=True) diff --git a/azure/aria/aria-extension-cloudify/src/aria/aria/orchestrator/workflows/builtin/workflows.py b/azure/aria/aria-extension-cloudify/src/aria/aria/orchestrator/workflows/builtin/workflows.py new file mode 100644 index 0000000..b286e98 --- /dev/null +++ b/azure/aria/aria-extension-cloudify/src/aria/aria/orchestrator/workflows/builtin/workflows.py @@ -0,0 +1,149 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +TSOCA normative lifecycle workflows. +""" + +from ... import workflow +from ..api import task + + +NORMATIVE_STANDARD_INTERFACE = 'Standard' # 'tosca.interfaces.node.lifecycle.Standard' +NORMATIVE_CONFIGURE_INTERFACE = 'Configure' # 'tosca.interfaces.relationship.Configure' + +NORMATIVE_CREATE = 'create' +NORMATIVE_CONFIGURE = 'configure' +NORMATIVE_START = 'start' +NORMATIVE_STOP = 'stop' +NORMATIVE_DELETE = 'delete' + +NORMATIVE_PRE_CONFIGURE_SOURCE = 'pre_configure_source' +NORMATIVE_PRE_CONFIGURE_TARGET = 'pre_configure_target' +NORMATIVE_POST_CONFIGURE_SOURCE = 'post_configure_source' +NORMATIVE_POST_CONFIGURE_TARGET = 'post_configure_target' + +NORMATIVE_ADD_SOURCE = 'add_source' +NORMATIVE_ADD_TARGET = 'add_target' +NORMATIVE_REMOVE_TARGET = 'remove_target' +NORMATIVE_REMOVE_SOURCE = 'remove_source' +NORMATIVE_TARGET_CHANGED = 'target_changed' + + +__all__ = ( + 'NORMATIVE_STANDARD_INTERFACE', + 'NORMATIVE_CONFIGURE_INTERFACE', + 'NORMATIVE_CREATE', + 'NORMATIVE_START', + 'NORMATIVE_STOP', + 'NORMATIVE_DELETE', + 'NORMATIVE_CONFIGURE', + 'NORMATIVE_PRE_CONFIGURE_SOURCE', + 'NORMATIVE_PRE_CONFIGURE_TARGET', + 'NORMATIVE_POST_CONFIGURE_SOURCE', + 'NORMATIVE_POST_CONFIGURE_TARGET', + 'NORMATIVE_ADD_SOURCE', + 'NORMATIVE_ADD_TARGET', + 'NORMATIVE_REMOVE_SOURCE', + 'NORMATIVE_REMOVE_TARGET', + 'NORMATIVE_TARGET_CHANGED', + 'install_node', + 'uninstall_node', + 'start_node', + 'stop_node', +) + + +@workflow(suffix_template='{node.name}') +def install_node(graph, node, **kwargs): + # Create + sequence = [task.create_task(node, NORMATIVE_STANDARD_INTERFACE, NORMATIVE_CREATE)] + + # Configure + sequence += task.create_relationships_tasks(node, + NORMATIVE_CONFIGURE_INTERFACE, + NORMATIVE_PRE_CONFIGURE_SOURCE, + NORMATIVE_PRE_CONFIGURE_TARGET) + sequence.append(task.create_task(node, NORMATIVE_STANDARD_INTERFACE, NORMATIVE_CONFIGURE)) + sequence += task.create_relationships_tasks(node, + NORMATIVE_CONFIGURE_INTERFACE, + NORMATIVE_POST_CONFIGURE_SOURCE, + NORMATIVE_POST_CONFIGURE_TARGET) + # Start + sequence += _create_start_tasks(node) + + graph.sequence(*sequence) + + +@workflow(suffix_template='{node.name}') +def uninstall_node(graph, node, **kwargs): + # Stop + sequence = _create_stop_tasks(node) + + # Delete + sequence.append(task.create_task(node, NORMATIVE_STANDARD_INTERFACE, NORMATIVE_DELETE)) + + graph.sequence(*sequence) + + +@workflow(suffix_template='{node.name}') +def start_node(graph, node, **kwargs): + graph.sequence(*_create_start_tasks(node)) + + +@workflow(suffix_template='{node.name}') +def stop_node(graph, node, **kwargs): + graph.sequence(*_create_stop_tasks(node)) + + +def _create_start_tasks(node): + sequence = [task.create_task(node, NORMATIVE_STANDARD_INTERFACE, NORMATIVE_START)] + sequence += task.create_relationships_tasks(node, + NORMATIVE_CONFIGURE_INTERFACE, + NORMATIVE_ADD_SOURCE, NORMATIVE_ADD_TARGET) + return sequence + + +def _create_stop_tasks(node): + sequence = [task.create_task(node, NORMATIVE_STANDARD_INTERFACE, NORMATIVE_STOP)] + sequence += task.create_relationships_tasks(node, + NORMATIVE_CONFIGURE_INTERFACE, + NORMATIVE_REMOVE_SOURCE, NORMATIVE_REMOVE_TARGET) + return sequence + + +def create_node_task_dependencies(graph, tasks_and_nodes, reverse=False): + """ + Creates dependencies between tasks if there is a relationship (outbound) between their nodes. + """ + + def get_task(node_name): + for api_task, task_node in tasks_and_nodes: + if task_node.name == node_name: + return api_task + return None + + for api_task, node in tasks_and_nodes: + dependencies = [] + for relationship in node.outbound_relationships: + dependency = get_task(relationship.target_node.name) + if dependency: + dependencies.append(dependency) + if dependencies: + if reverse: + for dependency in dependencies: + graph.add_dependency(dependency, api_task) + else: + graph.add_dependency(api_task, dependencies) diff --git a/azure/aria/aria-extension-cloudify/src/aria/aria/orchestrator/workflows/core/__init__.py b/azure/aria/aria-extension-cloudify/src/aria/aria/orchestrator/workflows/core/__init__.py new file mode 100644 index 0000000..3f28136 --- /dev/null +++ b/azure/aria/aria-extension-cloudify/src/aria/aria/orchestrator/workflows/core/__init__.py @@ -0,0 +1,20 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +Workflow core. +""" + +from . import engine diff --git a/azure/aria/aria-extension-cloudify/src/aria/aria/orchestrator/workflows/core/engine.py b/azure/aria/aria-extension-cloudify/src/aria/aria/orchestrator/workflows/core/engine.py new file mode 100644 index 0000000..0ec3cd8 --- /dev/null +++ b/azure/aria/aria-extension-cloudify/src/aria/aria/orchestrator/workflows/core/engine.py @@ -0,0 +1,185 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +Workflow execution. +""" + +import time +from datetime import datetime + +from aria import logger +from aria.modeling import models +from aria.orchestrator import events +from aria.orchestrator.context import operation + +from .. import exceptions +from ..executor.base import StubTaskExecutor +# Import required so all signals are registered +from . import events_handler # pylint: disable=unused-import + + +class Engine(logger.LoggerMixin): + """ + Executes workflows. + """ + + def __init__(self, executors, **kwargs): + super(Engine, self).__init__(**kwargs) + self._executors = executors.copy() + self._executors.setdefault(StubTaskExecutor, StubTaskExecutor()) + + def execute(self, ctx, resuming=False, retry_failed=False): + """ + Executes the workflow. + """ + if resuming: + events.on_resume_workflow_signal.send(ctx, retry_failed=retry_failed) + + tasks_tracker = _TasksTracker(ctx) + + try: + events.start_workflow_signal.send(ctx) + while True: + cancel = self._is_cancel(ctx) + if cancel: + break + for task in tasks_tracker.ended_tasks: + self._handle_ended_tasks(task) + tasks_tracker.finished(task) + for task in tasks_tracker.executable_tasks: + tasks_tracker.executing(task) + self._handle_executable_task(ctx, task) + if tasks_tracker.all_tasks_consumed: + break + else: + time.sleep(0.1) + if cancel: + self._terminate_tasks(tasks_tracker.executing_tasks) + events.on_cancelled_workflow_signal.send(ctx) + else: + events.on_success_workflow_signal.send(ctx) + except BaseException as e: + # Cleanup any remaining tasks + self._terminate_tasks(tasks_tracker.executing_tasks) + events.on_failure_workflow_signal.send(ctx, exception=e) + raise + + def _terminate_tasks(self, tasks): + for task in tasks: + try: + self._executors[task._executor].terminate(task.id) + except BaseException: + pass + + @staticmethod + def cancel_execution(ctx): + """ + Send a cancel request to the engine. If execution already started, execution status + will be modified to ``cancelling`` status. If execution is in pending mode, execution status + will be modified to ``cancelled`` directly. + """ + events.on_cancelling_workflow_signal.send(ctx) + + @staticmethod + def _is_cancel(ctx): + execution = ctx.model.execution.refresh(ctx.execution) + return execution.status in (models.Execution.CANCELLING, models.Execution.CANCELLED) + + def _handle_executable_task(self, ctx, task): + task_executor = self._executors[task._executor] + + # If the task is a stub, a default context is provided, else it should hold the context cls + context_cls = operation.BaseOperationContext if task._stub_type else task._context_cls + op_ctx = context_cls( + model_storage=ctx.model, + resource_storage=ctx.resource, + workdir=ctx._workdir, + task_id=task.id, + actor_id=task.actor.id if task.actor else None, + service_id=task.execution.service.id, + execution_id=task.execution.id, + name=task.name + ) + + if not task._stub_type: + events.sent_task_signal.send(op_ctx) + task_executor.execute(op_ctx) + + @staticmethod + def _handle_ended_tasks(task): + if task.status == models.Task.FAILED and not task.ignore_failure: + raise exceptions.ExecutorException('Workflow failed') + + +class _TasksTracker(object): + + def __init__(self, ctx): + self._ctx = ctx + + self._tasks = ctx.execution.tasks + self._executed_tasks = [task for task in self._tasks if task.has_ended()] + self._executable_tasks = list(set(self._tasks) - set(self._executed_tasks)) + self._executing_tasks = [] + + @property + def all_tasks_consumed(self): + return len(self._executed_tasks) == len(self._tasks) and len(self._executing_tasks) == 0 + + def executing(self, task): + # Task executing could be retrying (thus removed and added earlier) + if task not in self._executing_tasks: + self._executable_tasks.remove(task) + self._executing_tasks.append(task) + + def finished(self, task): + self._executing_tasks.remove(task) + self._executed_tasks.append(task) + + @property + def ended_tasks(self): + for task in self.executing_tasks: + if task.has_ended(): + yield task + + @property + def executable_tasks(self): + now = datetime.utcnow() + # we need both lists since retrying task are in the executing task list. + for task in self._update_tasks(set(self._executing_tasks + self._executable_tasks)): + if all([task.is_waiting(), + task.due_at <= now, + all(dependency in self._executed_tasks for dependency in task.dependencies) + ]): + yield task + + @property + def executing_tasks(self): + for task in self._update_tasks(self._executing_tasks): + yield task + + @property + def executed_tasks(self): + for task in self._update_tasks(self._executed_tasks): + yield task + + @property + def tasks(self): + for task in self._update_tasks(self._tasks): + yield task + + def _update_tasks(self, tasks): + for task in tasks: + yield self._ctx.model.task.refresh(task) diff --git a/azure/aria/aria-extension-cloudify/src/aria/aria/orchestrator/workflows/core/events_handler.py b/azure/aria/aria-extension-cloudify/src/aria/aria/orchestrator/workflows/core/events_handler.py new file mode 100644 index 0000000..473475e --- /dev/null +++ b/azure/aria/aria-extension-cloudify/src/aria/aria/orchestrator/workflows/core/events_handler.py @@ -0,0 +1,170 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +Workflow event handling. +""" + +from datetime import ( + datetime, + timedelta, +) + +from ... import events +from ... import exceptions + + +@events.sent_task_signal.connect +def _task_sent(ctx, *args, **kwargs): + with ctx.persist_changes: + ctx.task.status = ctx.task.SENT + + +@events.start_task_signal.connect +def _task_started(ctx, *args, **kwargs): + with ctx.persist_changes: + ctx.task.started_at = datetime.utcnow() + ctx.task.status = ctx.task.STARTED + _update_node_state_if_necessary(ctx, is_transitional=True) + + +@events.on_failure_task_signal.connect +def _task_failed(ctx, exception, *args, **kwargs): + with ctx.persist_changes: + should_retry = all([ + not isinstance(exception, exceptions.TaskAbortException), + ctx.task.attempts_count < ctx.task.max_attempts or + ctx.task.max_attempts == ctx.task.INFINITE_RETRIES, + # ignore_failure check here means the task will not be retried and it will be marked + # as failed. The engine will also look at ignore_failure so it won't fail the + # workflow. + not ctx.task.ignore_failure + ]) + if should_retry: + retry_interval = None + if isinstance(exception, exceptions.TaskRetryException): + retry_interval = exception.retry_interval + if retry_interval is None: + retry_interval = ctx.task.retry_interval + ctx.task.status = ctx.task.RETRYING + ctx.task.attempts_count += 1 + ctx.task.due_at = datetime.utcnow() + timedelta(seconds=retry_interval) + else: + ctx.task.ended_at = datetime.utcnow() + ctx.task.status = ctx.task.FAILED + + +@events.on_success_task_signal.connect +def _task_succeeded(ctx, *args, **kwargs): + with ctx.persist_changes: + ctx.task.ended_at = datetime.utcnow() + ctx.task.status = ctx.task.SUCCESS + ctx.task.attempts_count += 1 + + _update_node_state_if_necessary(ctx) + + +@events.start_workflow_signal.connect +def _workflow_started(workflow_context, *args, **kwargs): + with workflow_context.persist_changes: + execution = workflow_context.execution + # the execution may already be in the process of cancelling + if execution.status in (execution.CANCELLING, execution.CANCELLED): + return + execution.status = execution.STARTED + execution.started_at = datetime.utcnow() + + +@events.on_failure_workflow_signal.connect +def _workflow_failed(workflow_context, exception, *args, **kwargs): + with workflow_context.persist_changes: + execution = workflow_context.execution + execution.error = str(exception) + execution.status = execution.FAILED + execution.ended_at = datetime.utcnow() + + +@events.on_success_workflow_signal.connect +def _workflow_succeeded(workflow_context, *args, **kwargs): + with workflow_context.persist_changes: + execution = workflow_context.execution + execution.status = execution.SUCCEEDED + execution.ended_at = datetime.utcnow() + + +@events.on_cancelled_workflow_signal.connect +def _workflow_cancelled(workflow_context, *args, **kwargs): + with workflow_context.persist_changes: + execution = workflow_context.execution + # _workflow_cancelling function may have called this function already + if execution.status == execution.CANCELLED: + return + # the execution may have already been finished + elif execution.status in (execution.SUCCEEDED, execution.FAILED): + _log_tried_to_cancel_execution_but_it_already_ended(workflow_context, execution.status) + else: + execution.status = execution.CANCELLED + execution.ended_at = datetime.utcnow() + + +@events.on_resume_workflow_signal.connect +def _workflow_resume(workflow_context, retry_failed=False, *args, **kwargs): + with workflow_context.persist_changes: + execution = workflow_context.execution + execution.status = execution.PENDING + # Any non ended task would be put back to pending state + for task in execution.tasks: + if not task.has_ended(): + task.status = task.PENDING + + if retry_failed: + for task in execution.tasks: + if task.status == task.FAILED and not task.ignore_failure: + task.attempts_count = 0 + task.status = task.PENDING + + + +@events.on_cancelling_workflow_signal.connect +def _workflow_cancelling(workflow_context, *args, **kwargs): + with workflow_context.persist_changes: + execution = workflow_context.execution + if execution.status == execution.PENDING: + return _workflow_cancelled(workflow_context=workflow_context) + # the execution may have already been finished + elif execution.status in (execution.SUCCEEDED, execution.FAILED): + _log_tried_to_cancel_execution_but_it_already_ended(workflow_context, execution.status) + else: + execution.status = execution.CANCELLING + + +def _update_node_state_if_necessary(ctx, is_transitional=False): + # TODO: this is not the right way to check! the interface name is arbitrary + # and also will *never* be the type name + node = ctx.task.node if ctx.task is not None else None + if (node is not None) and \ + (ctx.task.interface_name in ('Standard', 'tosca.interfaces.node.lifecycle.Standard', + 'tosca:Standard')): + state = node.determine_state(op_name=ctx.task.operation_name, + is_transitional=is_transitional) + if state: + node.state = state + ctx.model.node.update(node) + + +def _log_tried_to_cancel_execution_but_it_already_ended(workflow_context, status): + workflow_context.logger.info( + "'{workflow_name}' workflow execution {status} before the cancel request" + "was fully processed".format(workflow_name=workflow_context.workflow_name, status=status)) diff --git a/azure/aria/aria-extension-cloudify/src/aria/aria/orchestrator/workflows/core/graph_compiler.py b/azure/aria/aria-extension-cloudify/src/aria/aria/orchestrator/workflows/core/graph_compiler.py new file mode 100644 index 0000000..81543d5 --- /dev/null +++ b/azure/aria/aria-extension-cloudify/src/aria/aria/orchestrator/workflows/core/graph_compiler.py @@ -0,0 +1,118 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +from ....modeling import models +from .. import executor, api + + +class GraphCompiler(object): + def __init__(self, ctx, default_executor): + self._ctx = ctx + self._default_executor = default_executor + self._stub_executor = executor.base.StubTaskExecutor + self._model_to_api_id = {} + + def compile(self, + task_graph, + start_stub_type=models.Task.START_WORKFLOW, + end_stub_type=models.Task.END_WORKFLOW, + depends_on=()): + """ + Translates the user graph to the execution graph + :param task_graph: The user's graph + :param start_stub_type: internal use + :param end_stub_type: internal use + :param depends_on: internal use + """ + depends_on = list(depends_on) + + # Insert start marker + start_task = self._create_stub_task( + start_stub_type, depends_on, self._start_graph_suffix(task_graph.id), task_graph.name, + ) + + for task in task_graph.topological_order(reverse=True): + dependencies = \ + (self._get_tasks_from_dependencies(task_graph.get_dependencies(task)) + or [start_task]) + + if isinstance(task, api.task.OperationTask): + self._create_operation_task(task, dependencies) + + elif isinstance(task, api.task.WorkflowTask): + # Build the graph recursively while adding start and end markers + self.compile( + task, models.Task.START_SUBWROFKLOW, models.Task.END_SUBWORKFLOW, dependencies + ) + elif isinstance(task, api.task.StubTask): + self._create_stub_task(models.Task.STUB, dependencies, task.id) + else: + raise RuntimeError('Undefined state') + + # Insert end marker + self._create_stub_task( + end_stub_type, + self._get_non_dependent_tasks(self._ctx.execution) or [start_task], + self._end_graph_suffix(task_graph.id), + task_graph.name + ) + + def _create_stub_task(self, stub_type, dependencies, api_id, name=None): + model_task = models.Task( + name=name, + dependencies=dependencies, + execution=self._ctx.execution, + _executor=self._stub_executor, + _stub_type=stub_type) + self._ctx.model.task.put(model_task) + self._model_to_api_id[model_task.id] = api_id + return model_task + + def _create_operation_task(self, api_task, dependencies): + model_task = models.Task.from_api_task( + api_task, self._default_executor, dependencies=dependencies) + self._ctx.model.task.put(model_task) + self._model_to_api_id[model_task.id] = api_task.id + return model_task + + @staticmethod + def _start_graph_suffix(api_id): + return '{0}-Start'.format(api_id) + + @staticmethod + def _end_graph_suffix(api_id): + return '{0}-End'.format(api_id) + + @staticmethod + def _get_non_dependent_tasks(execution): + tasks_with_dependencies = set() + for task in execution.tasks: + tasks_with_dependencies.update(task.dependencies) + return list(set(execution.tasks) - set(tasks_with_dependencies)) + + def _get_tasks_from_dependencies(self, dependencies): + """ + Returns task list from dependencies. + """ + tasks = [] + for dependency in dependencies: + if isinstance(dependency, (api.task.StubTask, api.task.OperationTask)): + dependency_name = dependency.id + else: + dependency_name = self._end_graph_suffix(dependency.id) + tasks.extend(task for task in self._ctx.execution.tasks + if self._model_to_api_id.get(task.id, None) == dependency_name) + return tasks diff --git a/azure/aria/aria-extension-cloudify/src/aria/aria/orchestrator/workflows/events_logging.py b/azure/aria/aria-extension-cloudify/src/aria/aria/orchestrator/workflows/events_logging.py new file mode 100644 index 0000000..9eee1e1 --- /dev/null +++ b/azure/aria/aria-extension-cloudify/src/aria/aria/orchestrator/workflows/events_logging.py @@ -0,0 +1,85 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +""" +Workflow event logging. +""" + +from .. import events +from ... import modeling + + +def _get_task_name(task): + if isinstance(task.actor, modeling.model_bases.service_instance.RelationshipBase): + return '{source_node.name}->{target_node.name}'.format( + source_node=task.actor.source_node, target_node=task.actor.target_node) + else: + return task.actor.name + + +@events.start_task_signal.connect +def _start_task_handler(ctx, **kwargs): + # If the task has no function this is an empty task. + if ctx.task.function: + suffix = 'started...' + logger = ctx.logger.info + else: + suffix = 'has no implementation' + logger = ctx.logger.debug + + logger('{name} {task.interface_name}.{task.operation_name} {suffix}'.format( + name=_get_task_name(ctx.task), task=ctx.task, suffix=suffix)) + + +@events.on_success_task_signal.connect +def _success_task_handler(ctx, **kwargs): + if not ctx.task.function: + return + ctx.logger.info('{name} {task.interface_name}.{task.operation_name} successful' + .format(name=_get_task_name(ctx.task), task=ctx.task)) + + +@events.on_failure_task_signal.connect +def _failure_operation_handler(ctx, traceback, **kwargs): + ctx.logger.error( + '{name} {task.interface_name}.{task.operation_name} failed' + .format(name=_get_task_name(ctx.task), task=ctx.task), extra=dict(traceback=traceback) + ) + + +@events.start_workflow_signal.connect +def _start_workflow_handler(context, **kwargs): + context.logger.info("Starting '{ctx.workflow_name}' workflow execution".format(ctx=context)) + + +@events.on_failure_workflow_signal.connect +def _failure_workflow_handler(context, **kwargs): + context.logger.info("'{ctx.workflow_name}' workflow execution failed".format(ctx=context)) + + +@events.on_success_workflow_signal.connect +def _success_workflow_handler(context, **kwargs): + context.logger.info("'{ctx.workflow_name}' workflow execution succeeded".format(ctx=context)) + + +@events.on_cancelled_workflow_signal.connect +def _cancel_workflow_handler(context, **kwargs): + context.logger.info("'{ctx.workflow_name}' workflow execution canceled".format(ctx=context)) + + +@events.on_cancelling_workflow_signal.connect +def _cancelling_workflow_handler(context, **kwargs): + context.logger.info("Cancelling '{ctx.workflow_name}' workflow execution".format(ctx=context)) diff --git a/azure/aria/aria-extension-cloudify/src/aria/aria/orchestrator/workflows/exceptions.py b/azure/aria/aria-extension-cloudify/src/aria/aria/orchestrator/workflows/exceptions.py new file mode 100644 index 0000000..2a1d6b1 --- /dev/null +++ b/azure/aria/aria-extension-cloudify/src/aria/aria/orchestrator/workflows/exceptions.py @@ -0,0 +1,91 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +Workflow exceptions. +""" + +import os + +from .. import exceptions + + +class ExecutorException(exceptions.AriaError): + """ + General executor exception. + """ + pass + + +class ProcessException(ExecutorException): + """ + Raised when subprocess execution fails. + """ + + def __init__(self, command, stderr=None, stdout=None, return_code=None): + """ + Process class Exception + :param list command: child process command + :param str message: custom message + :param str stderr: child process stderr + :param str stdout: child process stdout + :param int return_code: child process exit code + """ + super(ProcessException, self).__init__("child process failed") + self.command = command + self.stderr = stderr + self.stdout = stdout + self.return_code = return_code + + @property + def explanation(self): + """ + Describes the error in detail + """ + return ( + 'Command "{error.command}" executed with an error.{0}' + 'code: {error.return_code}{0}' + 'error: {error.stderr}{0}' + 'output: {error.stdout}'.format(os.linesep, error=self)) + + +class AriaEngineError(exceptions.AriaError): + """ + Raised by the workflow engine. + """ + + +class TaskException(exceptions.AriaError): + """ + Raised by the task. + """ + + +class TaskCreationException(TaskException): + """ + Could not create the task. + """ + + +class OperationNotFoundException(TaskCreationException): + """ + Could not find an operation on the node or relationship. + """ + + +class PluginNotFoundException(TaskCreationException): + """ + Could not find a plugin matching the plugin specification. + """ diff --git a/azure/aria/aria-extension-cloudify/src/aria/aria/orchestrator/workflows/executor/__init__.py b/azure/aria/aria-extension-cloudify/src/aria/aria/orchestrator/workflows/executor/__init__.py new file mode 100644 index 0000000..cafab74 --- /dev/null +++ b/azure/aria/aria-extension-cloudify/src/aria/aria/orchestrator/workflows/executor/__init__.py @@ -0,0 +1,22 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +Task executors. +""" + + +from . import process, thread +from .base import BaseExecutor diff --git a/azure/aria/aria-extension-cloudify/src/aria/aria/orchestrator/workflows/executor/base.py b/azure/aria/aria-extension-cloudify/src/aria/aria/orchestrator/workflows/executor/base.py new file mode 100644 index 0000000..e7d03ea --- /dev/null +++ b/azure/aria/aria-extension-cloudify/src/aria/aria/orchestrator/workflows/executor/base.py @@ -0,0 +1,75 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +Base class for task executors. +""" + +from aria import logger +from aria.orchestrator import events + + +class BaseExecutor(logger.LoggerMixin): + """ + Base class for task executors. + """ + def _execute(self, ctx): + raise NotImplementedError + + def execute(self, ctx): + """ + Executes a task. + + :param task: task to execute + """ + if ctx.task.function: + self._execute(ctx) + else: + # In this case the task is missing a function. This task still gets to an + # executor, but since there is nothing to run, we by default simply skip the + # execution itself. + self._task_started(ctx) + self._task_succeeded(ctx) + + def close(self): + """ + Closes the executor. + """ + pass + + def terminate(self, task_id): + """ + Terminate the executing task + :return: + """ + pass + + @staticmethod + def _task_started(ctx): + events.start_task_signal.send(ctx) + + @staticmethod + def _task_failed(ctx, exception, traceback=None): + events.on_failure_task_signal.send(ctx, exception=exception, traceback=traceback) + + @staticmethod + def _task_succeeded(ctx): + events.on_success_task_signal.send(ctx) + + +class StubTaskExecutor(BaseExecutor): # pylint: disable=abstract-method + def execute(self, ctx, *args, **kwargs): + with ctx.persist_changes: + ctx.task.status = ctx.task.SUCCESS diff --git a/azure/aria/aria-extension-cloudify/src/aria/aria/orchestrator/workflows/executor/celery.py b/azure/aria/aria-extension-cloudify/src/aria/aria/orchestrator/workflows/executor/celery.py new file mode 100644 index 0000000..a2b3513 --- /dev/null +++ b/azure/aria/aria-extension-cloudify/src/aria/aria/orchestrator/workflows/executor/celery.py @@ -0,0 +1,97 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +Celery task executor. +""" + +import threading +import Queue + +from aria.orchestrator.workflows.executor import BaseExecutor + + +class CeleryExecutor(BaseExecutor): + """ + Celery task executor. + """ + + def __init__(self, app, *args, **kwargs): + super(CeleryExecutor, self).__init__(*args, **kwargs) + self._app = app + self._started_signaled = False + self._started_queue = Queue.Queue(maxsize=1) + self._tasks = {} + self._results = {} + self._receiver = None + self._stopped = False + self._receiver_thread = threading.Thread(target=self._events_receiver) + self._receiver_thread.daemon = True + self._receiver_thread.start() + self._started_queue.get(timeout=30) + + def _execute(self, ctx): + self._tasks[ctx.id] = ctx + arguments = dict(arg.unwrapped for arg in ctx.task.arguments.itervalues()) + arguments['ctx'] = ctx.context + self._results[ctx.id] = self._app.send_task( + ctx.operation_mapping, + kwargs=arguments, + task_id=ctx.task.id, + queue=self._get_queue(ctx)) + + def close(self): + self._stopped = True + if self._receiver: + self._receiver.should_stop = True + self._receiver_thread.join() + + @staticmethod + def _get_queue(task): + return None if task else None # TODO + + def _events_receiver(self): + with self._app.connection() as connection: + self._receiver = self._app.events.Receiver(connection, handlers={ + 'task-started': self._celery_task_started, + 'task-succeeded': self._celery_task_succeeded, + 'task-failed': self._celery_task_failed, + }) + for _ in self._receiver.itercapture(limit=None, timeout=None, wakeup=True): + if not self._started_signaled: + self._started_queue.put(True) + self._started_signaled = True + if self._stopped: + return + + def _celery_task_started(self, event): + self._task_started(self._tasks[event['uuid']]) + + def _celery_task_succeeded(self, event): + task, _ = self._remove_task(event['uuid']) + self._task_succeeded(task) + + def _celery_task_failed(self, event): + task, async_result = self._remove_task(event['uuid']) + try: + exception = async_result.result + except BaseException as e: + exception = RuntimeError( + 'Could not de-serialize exception of task {0} --> {1}: {2}' + .format(task.name, type(e).__name__, str(e))) + self._task_failed(task, exception=exception) + + def _remove_task(self, task_id): + return self._tasks.pop(task_id), self._results.pop(task_id) diff --git a/azure/aria/aria-extension-cloudify/src/aria/aria/orchestrator/workflows/executor/dry.py b/azure/aria/aria-extension-cloudify/src/aria/aria/orchestrator/workflows/executor/dry.py new file mode 100644 index 0000000..9314e5d --- /dev/null +++ b/azure/aria/aria-extension-cloudify/src/aria/aria/orchestrator/workflows/executor/dry.py @@ -0,0 +1,54 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +Dry task executor. +""" + +from datetime import datetime + +from . import base + + +class DryExecutor(base.BaseExecutor): # pylint: disable=abstract-method + """ + Dry task executor: prints task information without causing any side effects. + """ + def execute(self, ctx): + with ctx.persist_changes: + # updating the task manually instead of calling self._task_started(task), + # to avoid any side effects raising that event might cause + ctx.task.started_at = datetime.utcnow() + ctx.task.status = ctx.task.STARTED + + dry_msg = '<dry> {name} {task.interface_name}.{task.operation_name} {suffix}' + logger = ctx.logger.info if ctx.task.function else ctx.logger.debug + + if hasattr(ctx.task.actor, 'source_node'): + name = '{source_node.name}->{target_node.name}'.format( + source_node=ctx.task.actor.source_node, target_node=ctx.task.actor.target_node) + else: + name = ctx.task.actor.name + + if ctx.task.function: + logger(dry_msg.format(name=name, task=ctx.task, suffix='started...')) + logger(dry_msg.format(name=name, task=ctx.task, suffix='successful')) + else: + logger(dry_msg.format(name=name, task=ctx.task, suffix='has no implementation')) + + # updating the task manually instead of calling self._task_succeeded(task), + # to avoid any side effects raising that event might cause + ctx.task.ended_at = datetime.utcnow() + ctx.task.status = ctx.task.SUCCESS diff --git a/azure/aria/aria-extension-cloudify/src/aria/aria/orchestrator/workflows/executor/process.py b/azure/aria/aria-extension-cloudify/src/aria/aria/orchestrator/workflows/executor/process.py new file mode 100644 index 0000000..185f15f --- /dev/null +++ b/azure/aria/aria-extension-cloudify/src/aria/aria/orchestrator/workflows/executor/process.py @@ -0,0 +1,350 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +Sub-process task executor. +""" + +# pylint: disable=wrong-import-position + +import os +import sys + +# As part of the process executor implementation, subprocess are started with this module as their +# entry point. We thus remove this module's directory from the python path if it happens to be +# there + +from collections import namedtuple + +script_dir = os.path.dirname(__file__) +if script_dir in sys.path: + sys.path.remove(script_dir) + +import contextlib +import io +import threading +import socket +import struct +import subprocess +import tempfile +import Queue +import pickle + +import psutil +import jsonpickle + +import aria +from aria.orchestrator.workflows.executor import base +from aria.extension import process_executor +from aria.utils import ( + imports, + exceptions, + process as process_utils +) + + +_INT_FMT = 'I' +_INT_SIZE = struct.calcsize(_INT_FMT) +UPDATE_TRACKED_CHANGES_FAILED_STR = \ + 'Some changes failed writing to storage. For more info refer to the log.' + + +_Task = namedtuple('_Task', 'proc, ctx') + + +class ProcessExecutor(base.BaseExecutor): + """ + Sub-process task executor. + """ + + def __init__(self, plugin_manager=None, python_path=None, *args, **kwargs): + super(ProcessExecutor, self).__init__(*args, **kwargs) + self._plugin_manager = plugin_manager + + # Optional list of additional directories that should be added to + # subprocesses python path + self._python_path = python_path or [] + + # Flag that denotes whether this executor has been stopped + self._stopped = False + + # Contains reference to all currently running tasks + self._tasks = {} + + self._request_handlers = { + 'started': self._handle_task_started_request, + 'succeeded': self._handle_task_succeeded_request, + 'failed': self._handle_task_failed_request, + } + + # Server socket used to accept task status messages from subprocesses + self._server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + self._server_socket.bind(('localhost', 0)) + self._server_socket.listen(10) + self._server_port = self._server_socket.getsockname()[1] + + # Used to send a "closed" message to the listener when this executor is closed + self._messenger = _Messenger(task_id=None, port=self._server_port) + + # Queue object used by the listener thread to notify this constructed it has started + # (see last line of this __init__ method) + self._listener_started = Queue.Queue() + + # Listener thread to handle subprocesses task status messages + self._listener_thread = threading.Thread(target=self._listener) + self._listener_thread.daemon = True + self._listener_thread.start() + + # Wait for listener thread to actually start before returning + self._listener_started.get(timeout=60) + + def close(self): + if self._stopped: + return + self._stopped = True + # Listener thread may be blocked on "accept" call. This will wake it up with an explicit + # "closed" message + self._messenger.closed() + self._server_socket.close() + self._listener_thread.join(timeout=60) + + # we use set(self._tasks) since tasks may change in the process of closing + for task_id in set(self._tasks): + self.terminate(task_id) + + def terminate(self, task_id): + task = self._remove_task(task_id) + # The process might have managed to finish, thus it would not be in the tasks list + if task: + try: + parent_process = psutil.Process(task.proc.pid) + for child_process in reversed(parent_process.children(recursive=True)): + try: + child_process.kill() + except BaseException: + pass + parent_process.kill() + except BaseException: + pass + + def _execute(self, ctx): + self._check_closed() + + # Temporary file used to pass arguments to the started subprocess + file_descriptor, arguments_json_path = tempfile.mkstemp(prefix='executor-', suffix='.json') + os.close(file_descriptor) + with open(arguments_json_path, 'wb') as f: + f.write(pickle.dumps(self._create_arguments_dict(ctx))) + + env = self._construct_subprocess_env(task=ctx.task) + # Asynchronously start the operation in a subprocess + proc = subprocess.Popen( + [ + sys.executable, + os.path.expanduser(os.path.expandvars(__file__)), + os.path.expanduser(os.path.expandvars(arguments_json_path)) + ], + env=env) + + self._tasks[ctx.task.id] = _Task(ctx=ctx, proc=proc) + + def _remove_task(self, task_id): + return self._tasks.pop(task_id, None) + + def _check_closed(self): + if self._stopped: + raise RuntimeError('Executor closed') + + def _create_arguments_dict(self, ctx): + return { + 'task_id': ctx.task.id, + 'function': ctx.task.function, + 'operation_arguments': dict(arg.unwrapped for arg in ctx.task.arguments.itervalues()), + 'port': self._server_port, + 'context': ctx.serialization_dict + } + + def _construct_subprocess_env(self, task): + env = os.environ.copy() + + if task.plugin_fk and self._plugin_manager: + # If this is a plugin operation, + # load the plugin on the subprocess env we're constructing + self._plugin_manager.load_plugin(task.plugin, env=env) + + # Add user supplied directories to injected PYTHONPATH + if self._python_path: + process_utils.append_to_pythonpath(*self._python_path, env=env) + + return env + + def _listener(self): + # Notify __init__ method this thread has actually started + self._listener_started.put(True) + while not self._stopped: + try: + with self._accept_request() as (request, response): + request_type = request['type'] + if request_type == 'closed': + break + request_handler = self._request_handlers.get(request_type) + if not request_handler: + raise RuntimeError('Invalid request type: {0}'.format(request_type)) + task_id = request['task_id'] + request_handler(task_id=task_id, request=request, response=response) + except BaseException as e: + self.logger.debug('Error in process executor listener: {0}'.format(e)) + + @contextlib.contextmanager + def _accept_request(self): + with contextlib.closing(self._server_socket.accept()[0]) as connection: + message = _recv_message(connection) + response = {} + try: + yield message, response + except BaseException as e: + response['exception'] = exceptions.wrap_if_needed(e) + raise + finally: + _send_message(connection, response) + + def _handle_task_started_request(self, task_id, **kwargs): + self._task_started(self._tasks[task_id].ctx) + + def _handle_task_succeeded_request(self, task_id, **kwargs): + task = self._remove_task(task_id) + if task: + self._task_succeeded(task.ctx) + + def _handle_task_failed_request(self, task_id, request, **kwargs): + task = self._remove_task(task_id) + if task: + self._task_failed( + task.ctx, exception=request['exception'], traceback=request['traceback']) + + +def _send_message(connection, message): + + # Packing the length of the entire msg using struct.pack. + # This enables later reading of the content. + def _pack(data): + return struct.pack(_INT_FMT, len(data)) + + data = jsonpickle.dumps(message) + msg_metadata = _pack(data) + connection.send(msg_metadata) + connection.sendall(data) + + +def _recv_message(connection): + # Retrieving the length of the msg to come. + def _unpack(conn): + return struct.unpack(_INT_FMT, _recv_bytes(conn, _INT_SIZE))[0] + + msg_metadata_len = _unpack(connection) + msg = _recv_bytes(connection, msg_metadata_len) + return jsonpickle.loads(msg) + + +def _recv_bytes(connection, count): + result = io.BytesIO() + while True: + if not count: + return result.getvalue() + read = connection.recv(count) + if not read: + return result.getvalue() + result.write(read) + count -= len(read) + + +class _Messenger(object): + + def __init__(self, task_id, port): + self.task_id = task_id + self.port = port + + def started(self): + """Task started message""" + self._send_message(type='started') + + def succeeded(self): + """Task succeeded message""" + self._send_message(type='succeeded') + + def failed(self, exception): + """Task failed message""" + self._send_message(type='failed', exception=exception) + + def closed(self): + """Executor closed message""" + self._send_message(type='closed') + + def _send_message(self, type, exception=None): + sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + sock.connect(('localhost', self.port)) + try: + _send_message(sock, { + 'type': type, + 'task_id': self.task_id, + 'exception': exceptions.wrap_if_needed(exception), + 'traceback': exceptions.get_exception_as_string(*sys.exc_info()), + }) + response = _recv_message(sock) + response_exception = response.get('exception') + if response_exception: + raise response_exception + finally: + sock.close() + + +def _main(): + arguments_json_path = sys.argv[1] + with open(arguments_json_path) as f: + arguments = pickle.loads(f.read()) + + # arguments_json_path is a temporary file created by the parent process. + # so we remove it here + os.remove(arguments_json_path) + + task_id = arguments['task_id'] + port = arguments['port'] + messenger = _Messenger(task_id=task_id, port=port) + + function = arguments['function'] + operation_arguments = arguments['operation_arguments'] + context_dict = arguments['context'] + + try: + ctx = context_dict['context_cls'].instantiate_from_dict(**context_dict['context']) + except BaseException as e: + messenger.failed(e) + return + + try: + messenger.started() + task_func = imports.load_attribute(function) + aria.install_aria_extensions() + for decorate in process_executor.decorate(): + task_func = decorate(task_func) + task_func(ctx=ctx, **operation_arguments) + ctx.close() + messenger.succeeded() + except BaseException as e: + ctx.close() + messenger.failed(e) + +if __name__ == '__main__': + _main() diff --git a/azure/aria/aria-extension-cloudify/src/aria/aria/orchestrator/workflows/executor/thread.py b/azure/aria/aria-extension-cloudify/src/aria/aria/orchestrator/workflows/executor/thread.py new file mode 100644 index 0000000..170620e --- /dev/null +++ b/azure/aria/aria-extension-cloudify/src/aria/aria/orchestrator/workflows/executor/thread.py @@ -0,0 +1,79 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +Thread task executor. +""" + +import Queue +import threading + +import sys + +from aria.utils import imports, exceptions + +from .base import BaseExecutor + + +class ThreadExecutor(BaseExecutor): + """ + Thread task executor. + + It's easier writing tests using this executor rather than the full-blown sub-process executor. + + Note: This executor is incapable of running plugin operations. + """ + + def __init__(self, pool_size=1, close_timeout=5, *args, **kwargs): + super(ThreadExecutor, self).__init__(*args, **kwargs) + self._stopped = False + self._close_timeout = close_timeout + self._queue = Queue.Queue() + self._pool = [] + for i in range(pool_size): + name = 'ThreadExecutor-{index}'.format(index=i+1) + thread = threading.Thread(target=self._processor, name=name) + thread.daemon = True + thread.start() + self._pool.append(thread) + + def _execute(self, ctx): + self._queue.put(ctx) + + def close(self): + self._stopped = True + for thread in self._pool: + if self._close_timeout is None: + thread.join() + else: + thread.join(self._close_timeout) + + def _processor(self): + while not self._stopped: + try: + ctx = self._queue.get(timeout=1) + self._task_started(ctx) + try: + task_func = imports.load_attribute(ctx.task.function) + arguments = dict(arg.unwrapped for arg in ctx.task.arguments.itervalues()) + task_func(ctx=ctx, **arguments) + self._task_succeeded(ctx) + except BaseException as e: + self._task_failed(ctx, + exception=e, + traceback=exceptions.get_exception_as_string(*sys.exc_info())) + # Daemon threads + except BaseException as e: + pass |