diff options
Diffstat (limited to 'azure/aria/aria-extension-cloudify/src/aria/aria/modeling/orchestration.py')
-rw-r--r-- | azure/aria/aria-extension-cloudify/src/aria/aria/modeling/orchestration.py | 715 |
1 files changed, 715 insertions, 0 deletions
diff --git a/azure/aria/aria-extension-cloudify/src/aria/aria/modeling/orchestration.py b/azure/aria/aria-extension-cloudify/src/aria/aria/modeling/orchestration.py new file mode 100644 index 0000000..4d4f0fe --- /dev/null +++ b/azure/aria/aria-extension-cloudify/src/aria/aria/modeling/orchestration.py @@ -0,0 +1,715 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +ARIA modeling orchestration module +""" + +# pylint: disable=no-self-argument, no-member, abstract-method +from datetime import datetime + +from sqlalchemy import ( + Column, + Integer, + Text, + DateTime, + Boolean, + Enum, + String, + Float, + orm, + PickleType) +from sqlalchemy.ext.declarative import declared_attr + +from ..orchestrator.exceptions import (TaskAbortException, TaskRetryException) +from . import mixins +from . import ( + relationship, + types as modeling_types +) + + +class ExecutionBase(mixins.ModelMixin): + """ + Workflow execution. + """ + + __tablename__ = 'execution' + + __private_fields__ = ('service_fk', + 'service_template') + + SUCCEEDED = 'succeeded' + FAILED = 'failed' + CANCELLED = 'cancelled' + PENDING = 'pending' + STARTED = 'started' + CANCELLING = 'cancelling' + + STATES = (SUCCEEDED, FAILED, CANCELLED, PENDING, STARTED, CANCELLING) + END_STATES = (SUCCEEDED, FAILED, CANCELLED) + + VALID_TRANSITIONS = { + PENDING: (STARTED, CANCELLED), + STARTED: END_STATES + (CANCELLING,), + CANCELLING: END_STATES, + # Retrying + CANCELLED: PENDING, + FAILED: PENDING + } + + # region one_to_many relationships + + @declared_attr + def inputs(cls): + """ + Execution parameters. + + :type: {:obj:`basestring`: :class:`Input`} + """ + return relationship.one_to_many(cls, 'input', dict_key='name') + + @declared_attr + def tasks(cls): + """ + Tasks. + + :type: [:class:`Task`] + """ + return relationship.one_to_many(cls, 'task') + + @declared_attr + def logs(cls): + """ + Log messages for the execution (including log messages for its tasks). + + :type: [:class:`Log`] + """ + return relationship.one_to_many(cls, 'log') + + # endregion + + # region many_to_one relationships + + @declared_attr + def service(cls): + """ + Associated service. + + :type: :class:`Service` + """ + return relationship.many_to_one(cls, 'service') + + # endregion + + # region association proxies + + @declared_attr + def service_name(cls): + return relationship.association_proxy('service', cls.name_column_name()) + + @declared_attr + def service_template(cls): + return relationship.association_proxy('service', 'service_template') + + @declared_attr + def service_template_name(cls): + return relationship.association_proxy('service', 'service_template_name') + + # endregion + + # region foreign keys + + @declared_attr + def service_fk(cls): + return relationship.foreign_key('service') + + # endregion + + created_at = Column(DateTime, index=True, doc=""" + Creation timestamp. + + :type: :class:`~datetime.datetime` + """) + + started_at = Column(DateTime, nullable=True, index=True, doc=""" + Started timestamp. + + :type: :class:`~datetime.datetime` + """) + + ended_at = Column(DateTime, nullable=True, index=True, doc=""" + Ended timestamp. + + :type: :class:`~datetime.datetime` + """) + + error = Column(Text, nullable=True, doc=""" + Error message. + + :type: :obj:`basestring` + """) + + status = Column(Enum(*STATES, name='execution_status'), default=PENDING, doc=""" + Status. + + :type: :obj:`basestring` + """) + + workflow_name = Column(Text, doc=""" + Workflow name. + + :type: :obj:`basestring` + """) + + @orm.validates('status') + def validate_status(self, key, value): + """Validation function that verifies execution status transitions are OK""" + try: + current_status = getattr(self, key) + except AttributeError: + return + valid_transitions = self.VALID_TRANSITIONS.get(current_status, []) + if all([current_status is not None, + current_status != value, + value not in valid_transitions]): + raise ValueError('Cannot change execution status from {current} to {new}'.format( + current=current_status, + new=value)) + return value + + def has_ended(self): + return self.status in self.END_STATES + + def is_active(self): + return not self.has_ended() and self.status != self.PENDING + + def __str__(self): + return '<{0} id=`{1}` (status={2})>'.format( + self.__class__.__name__, + getattr(self, self.name_column_name()), + self.status + ) + + +class TaskBase(mixins.ModelMixin): + """ + Represents the smallest unit of stateful execution in ARIA. The task state includes inputs, + outputs, as well as an atomic status, ensuring that the task can only be running once at any + given time. + + The Python :attr:`function` is usually provided by an associated :class:`Plugin`. The + :attr:`arguments` of the function should be set according to the specific signature of the + function. + + Tasks may be "one shot" or may be configured to run repeatedly in the case of failure. + + Tasks are often based on :class:`Operation`, and thus act on either a :class:`Node` or a + :class:`Relationship`, however this is not required. + """ + + __tablename__ = 'task' + + __private_fields__ = ('node_fk', + 'relationship_fk', + 'plugin_fk', + 'execution_fk') + + START_WORKFLOW = 'start_workflow' + END_WORKFLOW = 'end_workflow' + START_SUBWROFKLOW = 'start_subworkflow' + END_SUBWORKFLOW = 'end_subworkflow' + STUB = 'stub' + CONDITIONAL = 'conditional' + + STUB_TYPES = ( + START_WORKFLOW, + START_SUBWROFKLOW, + END_WORKFLOW, + END_SUBWORKFLOW, + STUB, + CONDITIONAL, + ) + + PENDING = 'pending' + RETRYING = 'retrying' + SENT = 'sent' + STARTED = 'started' + SUCCESS = 'success' + FAILED = 'failed' + STATES = ( + PENDING, + RETRYING, + SENT, + STARTED, + SUCCESS, + FAILED, + ) + INFINITE_RETRIES = -1 + + # region one_to_many relationships + + @declared_attr + def logs(cls): + """ + Log messages. + + :type: [:class:`Log`] + """ + return relationship.one_to_many(cls, 'log') + + @declared_attr + def arguments(cls): + """ + Arguments sent to the Python :attr:`function``. + + :type: {:obj:`basestring`: :class:`Argument`} + """ + return relationship.one_to_many(cls, 'argument', dict_key='name') + + # endregion + + # region many_one relationships + + @declared_attr + def execution(cls): + """ + Containing execution. + + :type: :class:`Execution` + """ + return relationship.many_to_one(cls, 'execution') + + @declared_attr + def node(cls): + """ + Node actor (can be ``None``). + + :type: :class:`Node` + """ + return relationship.many_to_one(cls, 'node') + + @declared_attr + def relationship(cls): + """ + Relationship actor (can be ``None``). + + :type: :class:`Relationship` + """ + return relationship.many_to_one(cls, 'relationship') + + @declared_attr + def plugin(cls): + """ + Associated plugin. + + :type: :class:`Plugin` + """ + return relationship.many_to_one(cls, 'plugin') + + # endregion + + # region association proxies + + @declared_attr + def node_name(cls): + return relationship.association_proxy('node', cls.name_column_name()) + + @declared_attr + def relationship_name(cls): + return relationship.association_proxy('relationship', cls.name_column_name()) + + @declared_attr + def execution_name(cls): + return relationship.association_proxy('execution', cls.name_column_name()) + + # endregion + + # region foreign keys + + @declared_attr + def execution_fk(cls): + return relationship.foreign_key('execution', nullable=True) + + @declared_attr + def node_fk(cls): + return relationship.foreign_key('node', nullable=True) + + @declared_attr + def relationship_fk(cls): + return relationship.foreign_key('relationship', nullable=True) + + @declared_attr + def plugin_fk(cls): + return relationship.foreign_key('plugin', nullable=True) + + # endregion + + status = Column(Enum(*STATES, name='status'), default=PENDING, doc=""" + Current atomic status ('pending', 'retrying', 'sent', 'started', 'success', 'failed'). + + :type: :obj:`basestring` + """) + + due_at = Column(DateTime, nullable=False, index=True, default=datetime.utcnow(), doc=""" + Timestamp to start the task. + + :type: :class:`~datetime.datetime` + """) + + started_at = Column(DateTime, default=None, doc=""" + Started timestamp. + + :type: :class:`~datetime.datetime` + """) + + ended_at = Column(DateTime, default=None, doc=""" + Ended timestamp. + + :type: :class:`~datetime.datetime` + """) + + attempts_count = Column(Integer, default=1, doc=""" + How many attempts occurred. + + :type: :class:`~datetime.datetime` + """) + + function = Column(String, doc=""" + Full path to Python function. + + :type: :obj:`basestring` + """) + + max_attempts = Column(Integer, default=1, doc=""" + Maximum number of attempts allowed in case of task failure. + + :type: :obj:`int` + """) + + retry_interval = Column(Float, default=0, doc=""" + Interval between task retry attemps (in seconds). + + :type: :obj:`float` + """) + + ignore_failure = Column(Boolean, default=False, doc=""" + Set to ``True`` to ignore failures. + + :type: :obj:`bool` + """) + + interface_name = Column(String, doc=""" + Name of interface on node or relationship. + + :type: :obj:`basestring` + """) + + operation_name = Column(String, doc=""" + Name of operation in interface on node or relationship. + + :type: :obj:`basestring` + """) + + _api_id = Column(String) + _executor = Column(PickleType) + _context_cls = Column(PickleType) + _stub_type = Column(Enum(*STUB_TYPES)) + + @property + def actor(self): + """ + Actor of the task (node or relationship). + """ + return self.node or self.relationship + + @orm.validates('max_attempts') + def validate_max_attempts(self, _, value): # pylint: disable=no-self-use + """ + Validates that max attempts is either -1 or a positive number. + """ + if value < 1 and value != TaskBase.INFINITE_RETRIES: + raise ValueError('Max attempts can be either -1 (infinite) or any positive number. ' + 'Got {value}'.format(value=value)) + return value + + @staticmethod + def abort(message=None): + raise TaskAbortException(message) + + @staticmethod + def retry(message=None, retry_interval=None): + raise TaskRetryException(message, retry_interval=retry_interval) + + @declared_attr + def dependencies(cls): + return relationship.many_to_many(cls, self=True) + + def has_ended(self): + return self.status in (self.SUCCESS, self.FAILED) + + def is_waiting(self): + if self._stub_type: + return not self.has_ended() + else: + return self.status in (self.PENDING, self.RETRYING) + + @classmethod + def from_api_task(cls, api_task, executor, **kwargs): + instantiation_kwargs = {} + + if hasattr(api_task.actor, 'outbound_relationships'): + instantiation_kwargs['node'] = api_task.actor + elif hasattr(api_task.actor, 'source_node'): + instantiation_kwargs['relationship'] = api_task.actor + else: + raise RuntimeError('No operation context could be created for {actor.model_cls}' + .format(actor=api_task.actor)) + + instantiation_kwargs.update( + { + 'name': api_task.name, + 'status': cls.PENDING, + 'max_attempts': api_task.max_attempts, + 'retry_interval': api_task.retry_interval, + 'ignore_failure': api_task.ignore_failure, + 'execution': api_task._workflow_context.execution, + 'interface_name': api_task.interface_name, + 'operation_name': api_task.operation_name, + + # Only non-stub tasks have these fields + 'plugin': api_task.plugin, + 'function': api_task.function, + 'arguments': api_task.arguments, + '_context_cls': api_task._context_cls, + '_executor': executor, + } + ) + + instantiation_kwargs.update(**kwargs) + + return cls(**instantiation_kwargs) + + +class LogBase(mixins.ModelMixin): + """ + Single log message. + """ + + __tablename__ = 'log' + + __private_fields__ = ('execution_fk', + 'task_fk') + + # region many_to_one relationships + + @declared_attr + def execution(cls): + """ + Containing execution. + + :type: :class:`Execution` + """ + return relationship.many_to_one(cls, 'execution') + + @declared_attr + def task(cls): + """ + Containing task (can be ``None``). + + :type: :class:`Task` + """ + return relationship.many_to_one(cls, 'task') + + # endregion + + # region foreign keys + + @declared_attr + def execution_fk(cls): + return relationship.foreign_key('execution') + + @declared_attr + def task_fk(cls): + return relationship.foreign_key('task', nullable=True) + + # endregion + + level = Column(String, doc=""" + Log level. + + :type: :obj:`basestring` + """) + + msg = Column(String, doc=""" + Log message. + + :type: :obj:`basestring` + """) + + created_at = Column(DateTime, index=True, doc=""" + Creation timestamp. + + :type: :class:`~datetime.datetime` + """) + + traceback = Column(Text, doc=""" + Error traceback in case of failure. + + :type: :class:`~datetime.datetime` + """) + + def __str__(self): + return self.msg + + def __repr__(self): + name = (self.task.actor if self.task else self.execution).name + return '{name}: {self.msg}'.format(name=name, self=self) + + +class PluginBase(mixins.ModelMixin): + """ + Installed plugin. + + Plugins are usually packaged as `wagons <https://github.com/cloudify-cosmo/wagon>`__, which + are archives of one or more `wheels <https://packaging.python.org/distributing/#wheels>`__. + Most of these fields are indeed extracted from the installed wagon's metadata. + """ + + __tablename__ = 'plugin' + + # region one_to_many relationships + + @declared_attr + def tasks(cls): + """ + Associated Tasks. + + :type: [:class:`Task`] + """ + return relationship.one_to_many(cls, 'task') + + # endregion + + archive_name = Column(Text, nullable=False, index=True, doc=""" + Filename (not the full path) of the wagon's archive, often with a ``.wgn`` extension. + + :type: :obj:`basestring` + """) + + distribution = Column(Text, doc=""" + Name of the operating system on which the wagon was installed (e.g. ``ubuntu``). + + :type: :obj:`basestring` + """) + + distribution_release = Column(Text, doc=""" + Release of the operating system on which the wagon was installed (e.g. ``trusty``). + + :type: :obj:`basestring` + """) + + distribution_version = Column(Text, doc=""" + Version of the operating system on which the wagon was installed (e.g. ``14.04``). + + :type: :obj:`basestring` + """) + + package_name = Column(Text, nullable=False, index=True, doc=""" + Primary Python package name used when the wagon was installed, which is one of the wheels in the + wagon (e.g. ``cloudify-script-plugin``). + + :type: :obj:`basestring` + """) + + package_source = Column(Text, doc=""" + Full install string for the primary Python package name used when the wagon was installed (e.g. + ``cloudify-script-plugin==1.2``). + + :type: :obj:`basestring` + """) + + package_version = Column(Text, doc=""" + Version for the primary Python package name used when the wagon was installed (e.g. ``1.2``). + + :type: :obj:`basestring` + """) + + supported_platform = Column(Text, doc=""" + If the wheels are *all* pure Python then this would be "any", otherwise it would be the + installed platform name (e.g. ``linux_x86_64``). + + :type: :obj:`basestring` + """) + + supported_py_versions = Column(modeling_types.StrictList(basestring), doc=""" + Python versions supported by all the wheels (e.g. ``["py26", "py27"]``) + + :type: [:obj:`basestring`] + """) + + wheels = Column(modeling_types.StrictList(basestring), nullable=False, doc=""" + Filenames of the wheels archived in the wagon, often with a ``.whl`` extension. + + :type: [:obj:`basestring`] + """) + + uploaded_at = Column(DateTime, nullable=False, index=True, doc=""" + Timestamp for when the wagon was installed. + + :type: :class:`~datetime.datetime` + """) + + +class ArgumentBase(mixins.ParameterMixin): + """ + Python function argument parameter. + """ + + __tablename__ = 'argument' + + # region many_to_one relationships + + @declared_attr + def task(cls): + """ + Containing task (can be ``None``); + + :type: :class:`Task` + """ + return relationship.many_to_one(cls, 'task') + + @declared_attr + def operation(cls): + """ + Containing operation (can be ``None``); + + :type: :class:`Operation` + """ + return relationship.many_to_one(cls, 'operation') + + # endregion + + # region foreign keys + + @declared_attr + def task_fk(cls): + return relationship.foreign_key('task', nullable=True) + + @declared_attr + def operation_fk(cls): + return relationship.foreign_key('operation', nullable=True) + + # endregion |