diff options
Diffstat (limited to 'azure/aria/aria-extension-cloudify/src/aria/aria/orchestrator/workflows/core')
4 files changed, 493 insertions, 0 deletions
diff --git a/azure/aria/aria-extension-cloudify/src/aria/aria/orchestrator/workflows/core/__init__.py b/azure/aria/aria-extension-cloudify/src/aria/aria/orchestrator/workflows/core/__init__.py new file mode 100644 index 0000000..3f28136 --- /dev/null +++ b/azure/aria/aria-extension-cloudify/src/aria/aria/orchestrator/workflows/core/__init__.py @@ -0,0 +1,20 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +Workflow core. +""" + +from . import engine diff --git a/azure/aria/aria-extension-cloudify/src/aria/aria/orchestrator/workflows/core/engine.py b/azure/aria/aria-extension-cloudify/src/aria/aria/orchestrator/workflows/core/engine.py new file mode 100644 index 0000000..0ec3cd8 --- /dev/null +++ b/azure/aria/aria-extension-cloudify/src/aria/aria/orchestrator/workflows/core/engine.py @@ -0,0 +1,185 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +Workflow execution. +""" + +import time +from datetime import datetime + +from aria import logger +from aria.modeling import models +from aria.orchestrator import events +from aria.orchestrator.context import operation + +from .. import exceptions +from ..executor.base import StubTaskExecutor +# Import required so all signals are registered +from . import events_handler # pylint: disable=unused-import + + +class Engine(logger.LoggerMixin): + """ + Executes workflows. + """ + + def __init__(self, executors, **kwargs): + super(Engine, self).__init__(**kwargs) + self._executors = executors.copy() + self._executors.setdefault(StubTaskExecutor, StubTaskExecutor()) + + def execute(self, ctx, resuming=False, retry_failed=False): + """ + Executes the workflow. + """ + if resuming: + events.on_resume_workflow_signal.send(ctx, retry_failed=retry_failed) + + tasks_tracker = _TasksTracker(ctx) + + try: + events.start_workflow_signal.send(ctx) + while True: + cancel = self._is_cancel(ctx) + if cancel: + break + for task in tasks_tracker.ended_tasks: + self._handle_ended_tasks(task) + tasks_tracker.finished(task) + for task in tasks_tracker.executable_tasks: + tasks_tracker.executing(task) + self._handle_executable_task(ctx, task) + if tasks_tracker.all_tasks_consumed: + break + else: + time.sleep(0.1) + if cancel: + self._terminate_tasks(tasks_tracker.executing_tasks) + events.on_cancelled_workflow_signal.send(ctx) + else: + events.on_success_workflow_signal.send(ctx) + except BaseException as e: + # Cleanup any remaining tasks + self._terminate_tasks(tasks_tracker.executing_tasks) + events.on_failure_workflow_signal.send(ctx, exception=e) + raise + + def _terminate_tasks(self, tasks): + for task in tasks: + try: + self._executors[task._executor].terminate(task.id) + except BaseException: + pass + + @staticmethod + def cancel_execution(ctx): + """ + Send a cancel request to the engine. If execution already started, execution status + will be modified to ``cancelling`` status. If execution is in pending mode, execution status + will be modified to ``cancelled`` directly. + """ + events.on_cancelling_workflow_signal.send(ctx) + + @staticmethod + def _is_cancel(ctx): + execution = ctx.model.execution.refresh(ctx.execution) + return execution.status in (models.Execution.CANCELLING, models.Execution.CANCELLED) + + def _handle_executable_task(self, ctx, task): + task_executor = self._executors[task._executor] + + # If the task is a stub, a default context is provided, else it should hold the context cls + context_cls = operation.BaseOperationContext if task._stub_type else task._context_cls + op_ctx = context_cls( + model_storage=ctx.model, + resource_storage=ctx.resource, + workdir=ctx._workdir, + task_id=task.id, + actor_id=task.actor.id if task.actor else None, + service_id=task.execution.service.id, + execution_id=task.execution.id, + name=task.name + ) + + if not task._stub_type: + events.sent_task_signal.send(op_ctx) + task_executor.execute(op_ctx) + + @staticmethod + def _handle_ended_tasks(task): + if task.status == models.Task.FAILED and not task.ignore_failure: + raise exceptions.ExecutorException('Workflow failed') + + +class _TasksTracker(object): + + def __init__(self, ctx): + self._ctx = ctx + + self._tasks = ctx.execution.tasks + self._executed_tasks = [task for task in self._tasks if task.has_ended()] + self._executable_tasks = list(set(self._tasks) - set(self._executed_tasks)) + self._executing_tasks = [] + + @property + def all_tasks_consumed(self): + return len(self._executed_tasks) == len(self._tasks) and len(self._executing_tasks) == 0 + + def executing(self, task): + # Task executing could be retrying (thus removed and added earlier) + if task not in self._executing_tasks: + self._executable_tasks.remove(task) + self._executing_tasks.append(task) + + def finished(self, task): + self._executing_tasks.remove(task) + self._executed_tasks.append(task) + + @property + def ended_tasks(self): + for task in self.executing_tasks: + if task.has_ended(): + yield task + + @property + def executable_tasks(self): + now = datetime.utcnow() + # we need both lists since retrying task are in the executing task list. + for task in self._update_tasks(set(self._executing_tasks + self._executable_tasks)): + if all([task.is_waiting(), + task.due_at <= now, + all(dependency in self._executed_tasks for dependency in task.dependencies) + ]): + yield task + + @property + def executing_tasks(self): + for task in self._update_tasks(self._executing_tasks): + yield task + + @property + def executed_tasks(self): + for task in self._update_tasks(self._executed_tasks): + yield task + + @property + def tasks(self): + for task in self._update_tasks(self._tasks): + yield task + + def _update_tasks(self, tasks): + for task in tasks: + yield self._ctx.model.task.refresh(task) diff --git a/azure/aria/aria-extension-cloudify/src/aria/aria/orchestrator/workflows/core/events_handler.py b/azure/aria/aria-extension-cloudify/src/aria/aria/orchestrator/workflows/core/events_handler.py new file mode 100644 index 0000000..473475e --- /dev/null +++ b/azure/aria/aria-extension-cloudify/src/aria/aria/orchestrator/workflows/core/events_handler.py @@ -0,0 +1,170 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +Workflow event handling. +""" + +from datetime import ( + datetime, + timedelta, +) + +from ... import events +from ... import exceptions + + +@events.sent_task_signal.connect +def _task_sent(ctx, *args, **kwargs): + with ctx.persist_changes: + ctx.task.status = ctx.task.SENT + + +@events.start_task_signal.connect +def _task_started(ctx, *args, **kwargs): + with ctx.persist_changes: + ctx.task.started_at = datetime.utcnow() + ctx.task.status = ctx.task.STARTED + _update_node_state_if_necessary(ctx, is_transitional=True) + + +@events.on_failure_task_signal.connect +def _task_failed(ctx, exception, *args, **kwargs): + with ctx.persist_changes: + should_retry = all([ + not isinstance(exception, exceptions.TaskAbortException), + ctx.task.attempts_count < ctx.task.max_attempts or + ctx.task.max_attempts == ctx.task.INFINITE_RETRIES, + # ignore_failure check here means the task will not be retried and it will be marked + # as failed. The engine will also look at ignore_failure so it won't fail the + # workflow. + not ctx.task.ignore_failure + ]) + if should_retry: + retry_interval = None + if isinstance(exception, exceptions.TaskRetryException): + retry_interval = exception.retry_interval + if retry_interval is None: + retry_interval = ctx.task.retry_interval + ctx.task.status = ctx.task.RETRYING + ctx.task.attempts_count += 1 + ctx.task.due_at = datetime.utcnow() + timedelta(seconds=retry_interval) + else: + ctx.task.ended_at = datetime.utcnow() + ctx.task.status = ctx.task.FAILED + + +@events.on_success_task_signal.connect +def _task_succeeded(ctx, *args, **kwargs): + with ctx.persist_changes: + ctx.task.ended_at = datetime.utcnow() + ctx.task.status = ctx.task.SUCCESS + ctx.task.attempts_count += 1 + + _update_node_state_if_necessary(ctx) + + +@events.start_workflow_signal.connect +def _workflow_started(workflow_context, *args, **kwargs): + with workflow_context.persist_changes: + execution = workflow_context.execution + # the execution may already be in the process of cancelling + if execution.status in (execution.CANCELLING, execution.CANCELLED): + return + execution.status = execution.STARTED + execution.started_at = datetime.utcnow() + + +@events.on_failure_workflow_signal.connect +def _workflow_failed(workflow_context, exception, *args, **kwargs): + with workflow_context.persist_changes: + execution = workflow_context.execution + execution.error = str(exception) + execution.status = execution.FAILED + execution.ended_at = datetime.utcnow() + + +@events.on_success_workflow_signal.connect +def _workflow_succeeded(workflow_context, *args, **kwargs): + with workflow_context.persist_changes: + execution = workflow_context.execution + execution.status = execution.SUCCEEDED + execution.ended_at = datetime.utcnow() + + +@events.on_cancelled_workflow_signal.connect +def _workflow_cancelled(workflow_context, *args, **kwargs): + with workflow_context.persist_changes: + execution = workflow_context.execution + # _workflow_cancelling function may have called this function already + if execution.status == execution.CANCELLED: + return + # the execution may have already been finished + elif execution.status in (execution.SUCCEEDED, execution.FAILED): + _log_tried_to_cancel_execution_but_it_already_ended(workflow_context, execution.status) + else: + execution.status = execution.CANCELLED + execution.ended_at = datetime.utcnow() + + +@events.on_resume_workflow_signal.connect +def _workflow_resume(workflow_context, retry_failed=False, *args, **kwargs): + with workflow_context.persist_changes: + execution = workflow_context.execution + execution.status = execution.PENDING + # Any non ended task would be put back to pending state + for task in execution.tasks: + if not task.has_ended(): + task.status = task.PENDING + + if retry_failed: + for task in execution.tasks: + if task.status == task.FAILED and not task.ignore_failure: + task.attempts_count = 0 + task.status = task.PENDING + + + +@events.on_cancelling_workflow_signal.connect +def _workflow_cancelling(workflow_context, *args, **kwargs): + with workflow_context.persist_changes: + execution = workflow_context.execution + if execution.status == execution.PENDING: + return _workflow_cancelled(workflow_context=workflow_context) + # the execution may have already been finished + elif execution.status in (execution.SUCCEEDED, execution.FAILED): + _log_tried_to_cancel_execution_but_it_already_ended(workflow_context, execution.status) + else: + execution.status = execution.CANCELLING + + +def _update_node_state_if_necessary(ctx, is_transitional=False): + # TODO: this is not the right way to check! the interface name is arbitrary + # and also will *never* be the type name + node = ctx.task.node if ctx.task is not None else None + if (node is not None) and \ + (ctx.task.interface_name in ('Standard', 'tosca.interfaces.node.lifecycle.Standard', + 'tosca:Standard')): + state = node.determine_state(op_name=ctx.task.operation_name, + is_transitional=is_transitional) + if state: + node.state = state + ctx.model.node.update(node) + + +def _log_tried_to_cancel_execution_but_it_already_ended(workflow_context, status): + workflow_context.logger.info( + "'{workflow_name}' workflow execution {status} before the cancel request" + "was fully processed".format(workflow_name=workflow_context.workflow_name, status=status)) diff --git a/azure/aria/aria-extension-cloudify/src/aria/aria/orchestrator/workflows/core/graph_compiler.py b/azure/aria/aria-extension-cloudify/src/aria/aria/orchestrator/workflows/core/graph_compiler.py new file mode 100644 index 0000000..81543d5 --- /dev/null +++ b/azure/aria/aria-extension-cloudify/src/aria/aria/orchestrator/workflows/core/graph_compiler.py @@ -0,0 +1,118 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +from ....modeling import models +from .. import executor, api + + +class GraphCompiler(object): + def __init__(self, ctx, default_executor): + self._ctx = ctx + self._default_executor = default_executor + self._stub_executor = executor.base.StubTaskExecutor + self._model_to_api_id = {} + + def compile(self, + task_graph, + start_stub_type=models.Task.START_WORKFLOW, + end_stub_type=models.Task.END_WORKFLOW, + depends_on=()): + """ + Translates the user graph to the execution graph + :param task_graph: The user's graph + :param start_stub_type: internal use + :param end_stub_type: internal use + :param depends_on: internal use + """ + depends_on = list(depends_on) + + # Insert start marker + start_task = self._create_stub_task( + start_stub_type, depends_on, self._start_graph_suffix(task_graph.id), task_graph.name, + ) + + for task in task_graph.topological_order(reverse=True): + dependencies = \ + (self._get_tasks_from_dependencies(task_graph.get_dependencies(task)) + or [start_task]) + + if isinstance(task, api.task.OperationTask): + self._create_operation_task(task, dependencies) + + elif isinstance(task, api.task.WorkflowTask): + # Build the graph recursively while adding start and end markers + self.compile( + task, models.Task.START_SUBWROFKLOW, models.Task.END_SUBWORKFLOW, dependencies + ) + elif isinstance(task, api.task.StubTask): + self._create_stub_task(models.Task.STUB, dependencies, task.id) + else: + raise RuntimeError('Undefined state') + + # Insert end marker + self._create_stub_task( + end_stub_type, + self._get_non_dependent_tasks(self._ctx.execution) or [start_task], + self._end_graph_suffix(task_graph.id), + task_graph.name + ) + + def _create_stub_task(self, stub_type, dependencies, api_id, name=None): + model_task = models.Task( + name=name, + dependencies=dependencies, + execution=self._ctx.execution, + _executor=self._stub_executor, + _stub_type=stub_type) + self._ctx.model.task.put(model_task) + self._model_to_api_id[model_task.id] = api_id + return model_task + + def _create_operation_task(self, api_task, dependencies): + model_task = models.Task.from_api_task( + api_task, self._default_executor, dependencies=dependencies) + self._ctx.model.task.put(model_task) + self._model_to_api_id[model_task.id] = api_task.id + return model_task + + @staticmethod + def _start_graph_suffix(api_id): + return '{0}-Start'.format(api_id) + + @staticmethod + def _end_graph_suffix(api_id): + return '{0}-End'.format(api_id) + + @staticmethod + def _get_non_dependent_tasks(execution): + tasks_with_dependencies = set() + for task in execution.tasks: + tasks_with_dependencies.update(task.dependencies) + return list(set(execution.tasks) - set(tasks_with_dependencies)) + + def _get_tasks_from_dependencies(self, dependencies): + """ + Returns task list from dependencies. + """ + tasks = [] + for dependency in dependencies: + if isinstance(dependency, (api.task.StubTask, api.task.OperationTask)): + dependency_name = dependency.id + else: + dependency_name = self._end_graph_suffix(dependency.id) + tasks.extend(task for task in self._ctx.execution.tasks + if self._model_to_api_id.get(task.id, None) == dependency_name) + return tasks |