summaryrefslogtreecommitdiffstats
path: root/azure/aria/aria-extension-cloudify/src/aria/aria/orchestrator/workflows/core
diff options
context:
space:
mode:
Diffstat (limited to 'azure/aria/aria-extension-cloudify/src/aria/aria/orchestrator/workflows/core')
-rw-r--r--azure/aria/aria-extension-cloudify/src/aria/aria/orchestrator/workflows/core/__init__.py20
-rw-r--r--azure/aria/aria-extension-cloudify/src/aria/aria/orchestrator/workflows/core/engine.py185
-rw-r--r--azure/aria/aria-extension-cloudify/src/aria/aria/orchestrator/workflows/core/events_handler.py170
-rw-r--r--azure/aria/aria-extension-cloudify/src/aria/aria/orchestrator/workflows/core/graph_compiler.py118
4 files changed, 493 insertions, 0 deletions
diff --git a/azure/aria/aria-extension-cloudify/src/aria/aria/orchestrator/workflows/core/__init__.py b/azure/aria/aria-extension-cloudify/src/aria/aria/orchestrator/workflows/core/__init__.py
new file mode 100644
index 0000000..3f28136
--- /dev/null
+++ b/azure/aria/aria-extension-cloudify/src/aria/aria/orchestrator/workflows/core/__init__.py
@@ -0,0 +1,20 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""
+Workflow core.
+"""
+
+from . import engine
diff --git a/azure/aria/aria-extension-cloudify/src/aria/aria/orchestrator/workflows/core/engine.py b/azure/aria/aria-extension-cloudify/src/aria/aria/orchestrator/workflows/core/engine.py
new file mode 100644
index 0000000..0ec3cd8
--- /dev/null
+++ b/azure/aria/aria-extension-cloudify/src/aria/aria/orchestrator/workflows/core/engine.py
@@ -0,0 +1,185 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""
+Workflow execution.
+"""
+
+import time
+from datetime import datetime
+
+from aria import logger
+from aria.modeling import models
+from aria.orchestrator import events
+from aria.orchestrator.context import operation
+
+from .. import exceptions
+from ..executor.base import StubTaskExecutor
+# Import required so all signals are registered
+from . import events_handler # pylint: disable=unused-import
+
+
+class Engine(logger.LoggerMixin):
+ """
+ Executes workflows.
+ """
+
+ def __init__(self, executors, **kwargs):
+ super(Engine, self).__init__(**kwargs)
+ self._executors = executors.copy()
+ self._executors.setdefault(StubTaskExecutor, StubTaskExecutor())
+
+ def execute(self, ctx, resuming=False, retry_failed=False):
+ """
+ Executes the workflow.
+ """
+ if resuming:
+ events.on_resume_workflow_signal.send(ctx, retry_failed=retry_failed)
+
+ tasks_tracker = _TasksTracker(ctx)
+
+ try:
+ events.start_workflow_signal.send(ctx)
+ while True:
+ cancel = self._is_cancel(ctx)
+ if cancel:
+ break
+ for task in tasks_tracker.ended_tasks:
+ self._handle_ended_tasks(task)
+ tasks_tracker.finished(task)
+ for task in tasks_tracker.executable_tasks:
+ tasks_tracker.executing(task)
+ self._handle_executable_task(ctx, task)
+ if tasks_tracker.all_tasks_consumed:
+ break
+ else:
+ time.sleep(0.1)
+ if cancel:
+ self._terminate_tasks(tasks_tracker.executing_tasks)
+ events.on_cancelled_workflow_signal.send(ctx)
+ else:
+ events.on_success_workflow_signal.send(ctx)
+ except BaseException as e:
+ # Cleanup any remaining tasks
+ self._terminate_tasks(tasks_tracker.executing_tasks)
+ events.on_failure_workflow_signal.send(ctx, exception=e)
+ raise
+
+ def _terminate_tasks(self, tasks):
+ for task in tasks:
+ try:
+ self._executors[task._executor].terminate(task.id)
+ except BaseException:
+ pass
+
+ @staticmethod
+ def cancel_execution(ctx):
+ """
+ Send a cancel request to the engine. If execution already started, execution status
+ will be modified to ``cancelling`` status. If execution is in pending mode, execution status
+ will be modified to ``cancelled`` directly.
+ """
+ events.on_cancelling_workflow_signal.send(ctx)
+
+ @staticmethod
+ def _is_cancel(ctx):
+ execution = ctx.model.execution.refresh(ctx.execution)
+ return execution.status in (models.Execution.CANCELLING, models.Execution.CANCELLED)
+
+ def _handle_executable_task(self, ctx, task):
+ task_executor = self._executors[task._executor]
+
+ # If the task is a stub, a default context is provided, else it should hold the context cls
+ context_cls = operation.BaseOperationContext if task._stub_type else task._context_cls
+ op_ctx = context_cls(
+ model_storage=ctx.model,
+ resource_storage=ctx.resource,
+ workdir=ctx._workdir,
+ task_id=task.id,
+ actor_id=task.actor.id if task.actor else None,
+ service_id=task.execution.service.id,
+ execution_id=task.execution.id,
+ name=task.name
+ )
+
+ if not task._stub_type:
+ events.sent_task_signal.send(op_ctx)
+ task_executor.execute(op_ctx)
+
+ @staticmethod
+ def _handle_ended_tasks(task):
+ if task.status == models.Task.FAILED and not task.ignore_failure:
+ raise exceptions.ExecutorException('Workflow failed')
+
+
+class _TasksTracker(object):
+
+ def __init__(self, ctx):
+ self._ctx = ctx
+
+ self._tasks = ctx.execution.tasks
+ self._executed_tasks = [task for task in self._tasks if task.has_ended()]
+ self._executable_tasks = list(set(self._tasks) - set(self._executed_tasks))
+ self._executing_tasks = []
+
+ @property
+ def all_tasks_consumed(self):
+ return len(self._executed_tasks) == len(self._tasks) and len(self._executing_tasks) == 0
+
+ def executing(self, task):
+ # Task executing could be retrying (thus removed and added earlier)
+ if task not in self._executing_tasks:
+ self._executable_tasks.remove(task)
+ self._executing_tasks.append(task)
+
+ def finished(self, task):
+ self._executing_tasks.remove(task)
+ self._executed_tasks.append(task)
+
+ @property
+ def ended_tasks(self):
+ for task in self.executing_tasks:
+ if task.has_ended():
+ yield task
+
+ @property
+ def executable_tasks(self):
+ now = datetime.utcnow()
+ # we need both lists since retrying task are in the executing task list.
+ for task in self._update_tasks(set(self._executing_tasks + self._executable_tasks)):
+ if all([task.is_waiting(),
+ task.due_at <= now,
+ all(dependency in self._executed_tasks for dependency in task.dependencies)
+ ]):
+ yield task
+
+ @property
+ def executing_tasks(self):
+ for task in self._update_tasks(self._executing_tasks):
+ yield task
+
+ @property
+ def executed_tasks(self):
+ for task in self._update_tasks(self._executed_tasks):
+ yield task
+
+ @property
+ def tasks(self):
+ for task in self._update_tasks(self._tasks):
+ yield task
+
+ def _update_tasks(self, tasks):
+ for task in tasks:
+ yield self._ctx.model.task.refresh(task)
diff --git a/azure/aria/aria-extension-cloudify/src/aria/aria/orchestrator/workflows/core/events_handler.py b/azure/aria/aria-extension-cloudify/src/aria/aria/orchestrator/workflows/core/events_handler.py
new file mode 100644
index 0000000..473475e
--- /dev/null
+++ b/azure/aria/aria-extension-cloudify/src/aria/aria/orchestrator/workflows/core/events_handler.py
@@ -0,0 +1,170 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""
+Workflow event handling.
+"""
+
+from datetime import (
+ datetime,
+ timedelta,
+)
+
+from ... import events
+from ... import exceptions
+
+
+@events.sent_task_signal.connect
+def _task_sent(ctx, *args, **kwargs):
+ with ctx.persist_changes:
+ ctx.task.status = ctx.task.SENT
+
+
+@events.start_task_signal.connect
+def _task_started(ctx, *args, **kwargs):
+ with ctx.persist_changes:
+ ctx.task.started_at = datetime.utcnow()
+ ctx.task.status = ctx.task.STARTED
+ _update_node_state_if_necessary(ctx, is_transitional=True)
+
+
+@events.on_failure_task_signal.connect
+def _task_failed(ctx, exception, *args, **kwargs):
+ with ctx.persist_changes:
+ should_retry = all([
+ not isinstance(exception, exceptions.TaskAbortException),
+ ctx.task.attempts_count < ctx.task.max_attempts or
+ ctx.task.max_attempts == ctx.task.INFINITE_RETRIES,
+ # ignore_failure check here means the task will not be retried and it will be marked
+ # as failed. The engine will also look at ignore_failure so it won't fail the
+ # workflow.
+ not ctx.task.ignore_failure
+ ])
+ if should_retry:
+ retry_interval = None
+ if isinstance(exception, exceptions.TaskRetryException):
+ retry_interval = exception.retry_interval
+ if retry_interval is None:
+ retry_interval = ctx.task.retry_interval
+ ctx.task.status = ctx.task.RETRYING
+ ctx.task.attempts_count += 1
+ ctx.task.due_at = datetime.utcnow() + timedelta(seconds=retry_interval)
+ else:
+ ctx.task.ended_at = datetime.utcnow()
+ ctx.task.status = ctx.task.FAILED
+
+
+@events.on_success_task_signal.connect
+def _task_succeeded(ctx, *args, **kwargs):
+ with ctx.persist_changes:
+ ctx.task.ended_at = datetime.utcnow()
+ ctx.task.status = ctx.task.SUCCESS
+ ctx.task.attempts_count += 1
+
+ _update_node_state_if_necessary(ctx)
+
+
+@events.start_workflow_signal.connect
+def _workflow_started(workflow_context, *args, **kwargs):
+ with workflow_context.persist_changes:
+ execution = workflow_context.execution
+ # the execution may already be in the process of cancelling
+ if execution.status in (execution.CANCELLING, execution.CANCELLED):
+ return
+ execution.status = execution.STARTED
+ execution.started_at = datetime.utcnow()
+
+
+@events.on_failure_workflow_signal.connect
+def _workflow_failed(workflow_context, exception, *args, **kwargs):
+ with workflow_context.persist_changes:
+ execution = workflow_context.execution
+ execution.error = str(exception)
+ execution.status = execution.FAILED
+ execution.ended_at = datetime.utcnow()
+
+
+@events.on_success_workflow_signal.connect
+def _workflow_succeeded(workflow_context, *args, **kwargs):
+ with workflow_context.persist_changes:
+ execution = workflow_context.execution
+ execution.status = execution.SUCCEEDED
+ execution.ended_at = datetime.utcnow()
+
+
+@events.on_cancelled_workflow_signal.connect
+def _workflow_cancelled(workflow_context, *args, **kwargs):
+ with workflow_context.persist_changes:
+ execution = workflow_context.execution
+ # _workflow_cancelling function may have called this function already
+ if execution.status == execution.CANCELLED:
+ return
+ # the execution may have already been finished
+ elif execution.status in (execution.SUCCEEDED, execution.FAILED):
+ _log_tried_to_cancel_execution_but_it_already_ended(workflow_context, execution.status)
+ else:
+ execution.status = execution.CANCELLED
+ execution.ended_at = datetime.utcnow()
+
+
+@events.on_resume_workflow_signal.connect
+def _workflow_resume(workflow_context, retry_failed=False, *args, **kwargs):
+ with workflow_context.persist_changes:
+ execution = workflow_context.execution
+ execution.status = execution.PENDING
+ # Any non ended task would be put back to pending state
+ for task in execution.tasks:
+ if not task.has_ended():
+ task.status = task.PENDING
+
+ if retry_failed:
+ for task in execution.tasks:
+ if task.status == task.FAILED and not task.ignore_failure:
+ task.attempts_count = 0
+ task.status = task.PENDING
+
+
+
+@events.on_cancelling_workflow_signal.connect
+def _workflow_cancelling(workflow_context, *args, **kwargs):
+ with workflow_context.persist_changes:
+ execution = workflow_context.execution
+ if execution.status == execution.PENDING:
+ return _workflow_cancelled(workflow_context=workflow_context)
+ # the execution may have already been finished
+ elif execution.status in (execution.SUCCEEDED, execution.FAILED):
+ _log_tried_to_cancel_execution_but_it_already_ended(workflow_context, execution.status)
+ else:
+ execution.status = execution.CANCELLING
+
+
+def _update_node_state_if_necessary(ctx, is_transitional=False):
+ # TODO: this is not the right way to check! the interface name is arbitrary
+ # and also will *never* be the type name
+ node = ctx.task.node if ctx.task is not None else None
+ if (node is not None) and \
+ (ctx.task.interface_name in ('Standard', 'tosca.interfaces.node.lifecycle.Standard',
+ 'tosca:Standard')):
+ state = node.determine_state(op_name=ctx.task.operation_name,
+ is_transitional=is_transitional)
+ if state:
+ node.state = state
+ ctx.model.node.update(node)
+
+
+def _log_tried_to_cancel_execution_but_it_already_ended(workflow_context, status):
+ workflow_context.logger.info(
+ "'{workflow_name}' workflow execution {status} before the cancel request"
+ "was fully processed".format(workflow_name=workflow_context.workflow_name, status=status))
diff --git a/azure/aria/aria-extension-cloudify/src/aria/aria/orchestrator/workflows/core/graph_compiler.py b/azure/aria/aria-extension-cloudify/src/aria/aria/orchestrator/workflows/core/graph_compiler.py
new file mode 100644
index 0000000..81543d5
--- /dev/null
+++ b/azure/aria/aria-extension-cloudify/src/aria/aria/orchestrator/workflows/core/graph_compiler.py
@@ -0,0 +1,118 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+from ....modeling import models
+from .. import executor, api
+
+
+class GraphCompiler(object):
+ def __init__(self, ctx, default_executor):
+ self._ctx = ctx
+ self._default_executor = default_executor
+ self._stub_executor = executor.base.StubTaskExecutor
+ self._model_to_api_id = {}
+
+ def compile(self,
+ task_graph,
+ start_stub_type=models.Task.START_WORKFLOW,
+ end_stub_type=models.Task.END_WORKFLOW,
+ depends_on=()):
+ """
+ Translates the user graph to the execution graph
+ :param task_graph: The user's graph
+ :param start_stub_type: internal use
+ :param end_stub_type: internal use
+ :param depends_on: internal use
+ """
+ depends_on = list(depends_on)
+
+ # Insert start marker
+ start_task = self._create_stub_task(
+ start_stub_type, depends_on, self._start_graph_suffix(task_graph.id), task_graph.name,
+ )
+
+ for task in task_graph.topological_order(reverse=True):
+ dependencies = \
+ (self._get_tasks_from_dependencies(task_graph.get_dependencies(task))
+ or [start_task])
+
+ if isinstance(task, api.task.OperationTask):
+ self._create_operation_task(task, dependencies)
+
+ elif isinstance(task, api.task.WorkflowTask):
+ # Build the graph recursively while adding start and end markers
+ self.compile(
+ task, models.Task.START_SUBWROFKLOW, models.Task.END_SUBWORKFLOW, dependencies
+ )
+ elif isinstance(task, api.task.StubTask):
+ self._create_stub_task(models.Task.STUB, dependencies, task.id)
+ else:
+ raise RuntimeError('Undefined state')
+
+ # Insert end marker
+ self._create_stub_task(
+ end_stub_type,
+ self._get_non_dependent_tasks(self._ctx.execution) or [start_task],
+ self._end_graph_suffix(task_graph.id),
+ task_graph.name
+ )
+
+ def _create_stub_task(self, stub_type, dependencies, api_id, name=None):
+ model_task = models.Task(
+ name=name,
+ dependencies=dependencies,
+ execution=self._ctx.execution,
+ _executor=self._stub_executor,
+ _stub_type=stub_type)
+ self._ctx.model.task.put(model_task)
+ self._model_to_api_id[model_task.id] = api_id
+ return model_task
+
+ def _create_operation_task(self, api_task, dependencies):
+ model_task = models.Task.from_api_task(
+ api_task, self._default_executor, dependencies=dependencies)
+ self._ctx.model.task.put(model_task)
+ self._model_to_api_id[model_task.id] = api_task.id
+ return model_task
+
+ @staticmethod
+ def _start_graph_suffix(api_id):
+ return '{0}-Start'.format(api_id)
+
+ @staticmethod
+ def _end_graph_suffix(api_id):
+ return '{0}-End'.format(api_id)
+
+ @staticmethod
+ def _get_non_dependent_tasks(execution):
+ tasks_with_dependencies = set()
+ for task in execution.tasks:
+ tasks_with_dependencies.update(task.dependencies)
+ return list(set(execution.tasks) - set(tasks_with_dependencies))
+
+ def _get_tasks_from_dependencies(self, dependencies):
+ """
+ Returns task list from dependencies.
+ """
+ tasks = []
+ for dependency in dependencies:
+ if isinstance(dependency, (api.task.StubTask, api.task.OperationTask)):
+ dependency_name = dependency.id
+ else:
+ dependency_name = self._end_graph_suffix(dependency.id)
+ tasks.extend(task for task in self._ctx.execution.tasks
+ if self._model_to_api_id.get(task.id, None) == dependency_name)
+ return tasks