summaryrefslogtreecommitdiffstats
path: root/azure/aria/aria-extension-cloudify/src/aria/aria/modeling/orchestration.py
diff options
context:
space:
mode:
Diffstat (limited to 'azure/aria/aria-extension-cloudify/src/aria/aria/modeling/orchestration.py')
-rw-r--r--azure/aria/aria-extension-cloudify/src/aria/aria/modeling/orchestration.py715
1 files changed, 715 insertions, 0 deletions
diff --git a/azure/aria/aria-extension-cloudify/src/aria/aria/modeling/orchestration.py b/azure/aria/aria-extension-cloudify/src/aria/aria/modeling/orchestration.py
new file mode 100644
index 0000000..4d4f0fe
--- /dev/null
+++ b/azure/aria/aria-extension-cloudify/src/aria/aria/modeling/orchestration.py
@@ -0,0 +1,715 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""
+ARIA modeling orchestration module
+"""
+
+# pylint: disable=no-self-argument, no-member, abstract-method
+from datetime import datetime
+
+from sqlalchemy import (
+ Column,
+ Integer,
+ Text,
+ DateTime,
+ Boolean,
+ Enum,
+ String,
+ Float,
+ orm,
+ PickleType)
+from sqlalchemy.ext.declarative import declared_attr
+
+from ..orchestrator.exceptions import (TaskAbortException, TaskRetryException)
+from . import mixins
+from . import (
+ relationship,
+ types as modeling_types
+)
+
+
+class ExecutionBase(mixins.ModelMixin):
+ """
+ Workflow execution.
+ """
+
+ __tablename__ = 'execution'
+
+ __private_fields__ = ('service_fk',
+ 'service_template')
+
+ SUCCEEDED = 'succeeded'
+ FAILED = 'failed'
+ CANCELLED = 'cancelled'
+ PENDING = 'pending'
+ STARTED = 'started'
+ CANCELLING = 'cancelling'
+
+ STATES = (SUCCEEDED, FAILED, CANCELLED, PENDING, STARTED, CANCELLING)
+ END_STATES = (SUCCEEDED, FAILED, CANCELLED)
+
+ VALID_TRANSITIONS = {
+ PENDING: (STARTED, CANCELLED),
+ STARTED: END_STATES + (CANCELLING,),
+ CANCELLING: END_STATES,
+ # Retrying
+ CANCELLED: PENDING,
+ FAILED: PENDING
+ }
+
+ # region one_to_many relationships
+
+ @declared_attr
+ def inputs(cls):
+ """
+ Execution parameters.
+
+ :type: {:obj:`basestring`: :class:`Input`}
+ """
+ return relationship.one_to_many(cls, 'input', dict_key='name')
+
+ @declared_attr
+ def tasks(cls):
+ """
+ Tasks.
+
+ :type: [:class:`Task`]
+ """
+ return relationship.one_to_many(cls, 'task')
+
+ @declared_attr
+ def logs(cls):
+ """
+ Log messages for the execution (including log messages for its tasks).
+
+ :type: [:class:`Log`]
+ """
+ return relationship.one_to_many(cls, 'log')
+
+ # endregion
+
+ # region many_to_one relationships
+
+ @declared_attr
+ def service(cls):
+ """
+ Associated service.
+
+ :type: :class:`Service`
+ """
+ return relationship.many_to_one(cls, 'service')
+
+ # endregion
+
+ # region association proxies
+
+ @declared_attr
+ def service_name(cls):
+ return relationship.association_proxy('service', cls.name_column_name())
+
+ @declared_attr
+ def service_template(cls):
+ return relationship.association_proxy('service', 'service_template')
+
+ @declared_attr
+ def service_template_name(cls):
+ return relationship.association_proxy('service', 'service_template_name')
+
+ # endregion
+
+ # region foreign keys
+
+ @declared_attr
+ def service_fk(cls):
+ return relationship.foreign_key('service')
+
+ # endregion
+
+ created_at = Column(DateTime, index=True, doc="""
+ Creation timestamp.
+
+ :type: :class:`~datetime.datetime`
+ """)
+
+ started_at = Column(DateTime, nullable=True, index=True, doc="""
+ Started timestamp.
+
+ :type: :class:`~datetime.datetime`
+ """)
+
+ ended_at = Column(DateTime, nullable=True, index=True, doc="""
+ Ended timestamp.
+
+ :type: :class:`~datetime.datetime`
+ """)
+
+ error = Column(Text, nullable=True, doc="""
+ Error message.
+
+ :type: :obj:`basestring`
+ """)
+
+ status = Column(Enum(*STATES, name='execution_status'), default=PENDING, doc="""
+ Status.
+
+ :type: :obj:`basestring`
+ """)
+
+ workflow_name = Column(Text, doc="""
+ Workflow name.
+
+ :type: :obj:`basestring`
+ """)
+
+ @orm.validates('status')
+ def validate_status(self, key, value):
+ """Validation function that verifies execution status transitions are OK"""
+ try:
+ current_status = getattr(self, key)
+ except AttributeError:
+ return
+ valid_transitions = self.VALID_TRANSITIONS.get(current_status, [])
+ if all([current_status is not None,
+ current_status != value,
+ value not in valid_transitions]):
+ raise ValueError('Cannot change execution status from {current} to {new}'.format(
+ current=current_status,
+ new=value))
+ return value
+
+ def has_ended(self):
+ return self.status in self.END_STATES
+
+ def is_active(self):
+ return not self.has_ended() and self.status != self.PENDING
+
+ def __str__(self):
+ return '<{0} id=`{1}` (status={2})>'.format(
+ self.__class__.__name__,
+ getattr(self, self.name_column_name()),
+ self.status
+ )
+
+
+class TaskBase(mixins.ModelMixin):
+ """
+ Represents the smallest unit of stateful execution in ARIA. The task state includes inputs,
+ outputs, as well as an atomic status, ensuring that the task can only be running once at any
+ given time.
+
+ The Python :attr:`function` is usually provided by an associated :class:`Plugin`. The
+ :attr:`arguments` of the function should be set according to the specific signature of the
+ function.
+
+ Tasks may be "one shot" or may be configured to run repeatedly in the case of failure.
+
+ Tasks are often based on :class:`Operation`, and thus act on either a :class:`Node` or a
+ :class:`Relationship`, however this is not required.
+ """
+
+ __tablename__ = 'task'
+
+ __private_fields__ = ('node_fk',
+ 'relationship_fk',
+ 'plugin_fk',
+ 'execution_fk')
+
+ START_WORKFLOW = 'start_workflow'
+ END_WORKFLOW = 'end_workflow'
+ START_SUBWROFKLOW = 'start_subworkflow'
+ END_SUBWORKFLOW = 'end_subworkflow'
+ STUB = 'stub'
+ CONDITIONAL = 'conditional'
+
+ STUB_TYPES = (
+ START_WORKFLOW,
+ START_SUBWROFKLOW,
+ END_WORKFLOW,
+ END_SUBWORKFLOW,
+ STUB,
+ CONDITIONAL,
+ )
+
+ PENDING = 'pending'
+ RETRYING = 'retrying'
+ SENT = 'sent'
+ STARTED = 'started'
+ SUCCESS = 'success'
+ FAILED = 'failed'
+ STATES = (
+ PENDING,
+ RETRYING,
+ SENT,
+ STARTED,
+ SUCCESS,
+ FAILED,
+ )
+ INFINITE_RETRIES = -1
+
+ # region one_to_many relationships
+
+ @declared_attr
+ def logs(cls):
+ """
+ Log messages.
+
+ :type: [:class:`Log`]
+ """
+ return relationship.one_to_many(cls, 'log')
+
+ @declared_attr
+ def arguments(cls):
+ """
+ Arguments sent to the Python :attr:`function``.
+
+ :type: {:obj:`basestring`: :class:`Argument`}
+ """
+ return relationship.one_to_many(cls, 'argument', dict_key='name')
+
+ # endregion
+
+ # region many_one relationships
+
+ @declared_attr
+ def execution(cls):
+ """
+ Containing execution.
+
+ :type: :class:`Execution`
+ """
+ return relationship.many_to_one(cls, 'execution')
+
+ @declared_attr
+ def node(cls):
+ """
+ Node actor (can be ``None``).
+
+ :type: :class:`Node`
+ """
+ return relationship.many_to_one(cls, 'node')
+
+ @declared_attr
+ def relationship(cls):
+ """
+ Relationship actor (can be ``None``).
+
+ :type: :class:`Relationship`
+ """
+ return relationship.many_to_one(cls, 'relationship')
+
+ @declared_attr
+ def plugin(cls):
+ """
+ Associated plugin.
+
+ :type: :class:`Plugin`
+ """
+ return relationship.many_to_one(cls, 'plugin')
+
+ # endregion
+
+ # region association proxies
+
+ @declared_attr
+ def node_name(cls):
+ return relationship.association_proxy('node', cls.name_column_name())
+
+ @declared_attr
+ def relationship_name(cls):
+ return relationship.association_proxy('relationship', cls.name_column_name())
+
+ @declared_attr
+ def execution_name(cls):
+ return relationship.association_proxy('execution', cls.name_column_name())
+
+ # endregion
+
+ # region foreign keys
+
+ @declared_attr
+ def execution_fk(cls):
+ return relationship.foreign_key('execution', nullable=True)
+
+ @declared_attr
+ def node_fk(cls):
+ return relationship.foreign_key('node', nullable=True)
+
+ @declared_attr
+ def relationship_fk(cls):
+ return relationship.foreign_key('relationship', nullable=True)
+
+ @declared_attr
+ def plugin_fk(cls):
+ return relationship.foreign_key('plugin', nullable=True)
+
+ # endregion
+
+ status = Column(Enum(*STATES, name='status'), default=PENDING, doc="""
+ Current atomic status ('pending', 'retrying', 'sent', 'started', 'success', 'failed').
+
+ :type: :obj:`basestring`
+ """)
+
+ due_at = Column(DateTime, nullable=False, index=True, default=datetime.utcnow(), doc="""
+ Timestamp to start the task.
+
+ :type: :class:`~datetime.datetime`
+ """)
+
+ started_at = Column(DateTime, default=None, doc="""
+ Started timestamp.
+
+ :type: :class:`~datetime.datetime`
+ """)
+
+ ended_at = Column(DateTime, default=None, doc="""
+ Ended timestamp.
+
+ :type: :class:`~datetime.datetime`
+ """)
+
+ attempts_count = Column(Integer, default=1, doc="""
+ How many attempts occurred.
+
+ :type: :class:`~datetime.datetime`
+ """)
+
+ function = Column(String, doc="""
+ Full path to Python function.
+
+ :type: :obj:`basestring`
+ """)
+
+ max_attempts = Column(Integer, default=1, doc="""
+ Maximum number of attempts allowed in case of task failure.
+
+ :type: :obj:`int`
+ """)
+
+ retry_interval = Column(Float, default=0, doc="""
+ Interval between task retry attemps (in seconds).
+
+ :type: :obj:`float`
+ """)
+
+ ignore_failure = Column(Boolean, default=False, doc="""
+ Set to ``True`` to ignore failures.
+
+ :type: :obj:`bool`
+ """)
+
+ interface_name = Column(String, doc="""
+ Name of interface on node or relationship.
+
+ :type: :obj:`basestring`
+ """)
+
+ operation_name = Column(String, doc="""
+ Name of operation in interface on node or relationship.
+
+ :type: :obj:`basestring`
+ """)
+
+ _api_id = Column(String)
+ _executor = Column(PickleType)
+ _context_cls = Column(PickleType)
+ _stub_type = Column(Enum(*STUB_TYPES))
+
+ @property
+ def actor(self):
+ """
+ Actor of the task (node or relationship).
+ """
+ return self.node or self.relationship
+
+ @orm.validates('max_attempts')
+ def validate_max_attempts(self, _, value): # pylint: disable=no-self-use
+ """
+ Validates that max attempts is either -1 or a positive number.
+ """
+ if value < 1 and value != TaskBase.INFINITE_RETRIES:
+ raise ValueError('Max attempts can be either -1 (infinite) or any positive number. '
+ 'Got {value}'.format(value=value))
+ return value
+
+ @staticmethod
+ def abort(message=None):
+ raise TaskAbortException(message)
+
+ @staticmethod
+ def retry(message=None, retry_interval=None):
+ raise TaskRetryException(message, retry_interval=retry_interval)
+
+ @declared_attr
+ def dependencies(cls):
+ return relationship.many_to_many(cls, self=True)
+
+ def has_ended(self):
+ return self.status in (self.SUCCESS, self.FAILED)
+
+ def is_waiting(self):
+ if self._stub_type:
+ return not self.has_ended()
+ else:
+ return self.status in (self.PENDING, self.RETRYING)
+
+ @classmethod
+ def from_api_task(cls, api_task, executor, **kwargs):
+ instantiation_kwargs = {}
+
+ if hasattr(api_task.actor, 'outbound_relationships'):
+ instantiation_kwargs['node'] = api_task.actor
+ elif hasattr(api_task.actor, 'source_node'):
+ instantiation_kwargs['relationship'] = api_task.actor
+ else:
+ raise RuntimeError('No operation context could be created for {actor.model_cls}'
+ .format(actor=api_task.actor))
+
+ instantiation_kwargs.update(
+ {
+ 'name': api_task.name,
+ 'status': cls.PENDING,
+ 'max_attempts': api_task.max_attempts,
+ 'retry_interval': api_task.retry_interval,
+ 'ignore_failure': api_task.ignore_failure,
+ 'execution': api_task._workflow_context.execution,
+ 'interface_name': api_task.interface_name,
+ 'operation_name': api_task.operation_name,
+
+ # Only non-stub tasks have these fields
+ 'plugin': api_task.plugin,
+ 'function': api_task.function,
+ 'arguments': api_task.arguments,
+ '_context_cls': api_task._context_cls,
+ '_executor': executor,
+ }
+ )
+
+ instantiation_kwargs.update(**kwargs)
+
+ return cls(**instantiation_kwargs)
+
+
+class LogBase(mixins.ModelMixin):
+ """
+ Single log message.
+ """
+
+ __tablename__ = 'log'
+
+ __private_fields__ = ('execution_fk',
+ 'task_fk')
+
+ # region many_to_one relationships
+
+ @declared_attr
+ def execution(cls):
+ """
+ Containing execution.
+
+ :type: :class:`Execution`
+ """
+ return relationship.many_to_one(cls, 'execution')
+
+ @declared_attr
+ def task(cls):
+ """
+ Containing task (can be ``None``).
+
+ :type: :class:`Task`
+ """
+ return relationship.many_to_one(cls, 'task')
+
+ # endregion
+
+ # region foreign keys
+
+ @declared_attr
+ def execution_fk(cls):
+ return relationship.foreign_key('execution')
+
+ @declared_attr
+ def task_fk(cls):
+ return relationship.foreign_key('task', nullable=True)
+
+ # endregion
+
+ level = Column(String, doc="""
+ Log level.
+
+ :type: :obj:`basestring`
+ """)
+
+ msg = Column(String, doc="""
+ Log message.
+
+ :type: :obj:`basestring`
+ """)
+
+ created_at = Column(DateTime, index=True, doc="""
+ Creation timestamp.
+
+ :type: :class:`~datetime.datetime`
+ """)
+
+ traceback = Column(Text, doc="""
+ Error traceback in case of failure.
+
+ :type: :class:`~datetime.datetime`
+ """)
+
+ def __str__(self):
+ return self.msg
+
+ def __repr__(self):
+ name = (self.task.actor if self.task else self.execution).name
+ return '{name}: {self.msg}'.format(name=name, self=self)
+
+
+class PluginBase(mixins.ModelMixin):
+ """
+ Installed plugin.
+
+ Plugins are usually packaged as `wagons <https://github.com/cloudify-cosmo/wagon>`__, which
+ are archives of one or more `wheels <https://packaging.python.org/distributing/#wheels>`__.
+ Most of these fields are indeed extracted from the installed wagon's metadata.
+ """
+
+ __tablename__ = 'plugin'
+
+ # region one_to_many relationships
+
+ @declared_attr
+ def tasks(cls):
+ """
+ Associated Tasks.
+
+ :type: [:class:`Task`]
+ """
+ return relationship.one_to_many(cls, 'task')
+
+ # endregion
+
+ archive_name = Column(Text, nullable=False, index=True, doc="""
+ Filename (not the full path) of the wagon's archive, often with a ``.wgn`` extension.
+
+ :type: :obj:`basestring`
+ """)
+
+ distribution = Column(Text, doc="""
+ Name of the operating system on which the wagon was installed (e.g. ``ubuntu``).
+
+ :type: :obj:`basestring`
+ """)
+
+ distribution_release = Column(Text, doc="""
+ Release of the operating system on which the wagon was installed (e.g. ``trusty``).
+
+ :type: :obj:`basestring`
+ """)
+
+ distribution_version = Column(Text, doc="""
+ Version of the operating system on which the wagon was installed (e.g. ``14.04``).
+
+ :type: :obj:`basestring`
+ """)
+
+ package_name = Column(Text, nullable=False, index=True, doc="""
+ Primary Python package name used when the wagon was installed, which is one of the wheels in the
+ wagon (e.g. ``cloudify-script-plugin``).
+
+ :type: :obj:`basestring`
+ """)
+
+ package_source = Column(Text, doc="""
+ Full install string for the primary Python package name used when the wagon was installed (e.g.
+ ``cloudify-script-plugin==1.2``).
+
+ :type: :obj:`basestring`
+ """)
+
+ package_version = Column(Text, doc="""
+ Version for the primary Python package name used when the wagon was installed (e.g. ``1.2``).
+
+ :type: :obj:`basestring`
+ """)
+
+ supported_platform = Column(Text, doc="""
+ If the wheels are *all* pure Python then this would be "any", otherwise it would be the
+ installed platform name (e.g. ``linux_x86_64``).
+
+ :type: :obj:`basestring`
+ """)
+
+ supported_py_versions = Column(modeling_types.StrictList(basestring), doc="""
+ Python versions supported by all the wheels (e.g. ``["py26", "py27"]``)
+
+ :type: [:obj:`basestring`]
+ """)
+
+ wheels = Column(modeling_types.StrictList(basestring), nullable=False, doc="""
+ Filenames of the wheels archived in the wagon, often with a ``.whl`` extension.
+
+ :type: [:obj:`basestring`]
+ """)
+
+ uploaded_at = Column(DateTime, nullable=False, index=True, doc="""
+ Timestamp for when the wagon was installed.
+
+ :type: :class:`~datetime.datetime`
+ """)
+
+
+class ArgumentBase(mixins.ParameterMixin):
+ """
+ Python function argument parameter.
+ """
+
+ __tablename__ = 'argument'
+
+ # region many_to_one relationships
+
+ @declared_attr
+ def task(cls):
+ """
+ Containing task (can be ``None``);
+
+ :type: :class:`Task`
+ """
+ return relationship.many_to_one(cls, 'task')
+
+ @declared_attr
+ def operation(cls):
+ """
+ Containing operation (can be ``None``);
+
+ :type: :class:`Operation`
+ """
+ return relationship.many_to_one(cls, 'operation')
+
+ # endregion
+
+ # region foreign keys
+
+ @declared_attr
+ def task_fk(cls):
+ return relationship.foreign_key('task', nullable=True)
+
+ @declared_attr
+ def operation_fk(cls):
+ return relationship.foreign_key('operation', nullable=True)
+
+ # endregion