diff options
Diffstat (limited to 'azure/aria/aria-extension-cloudify/src/aria/aria/orchestrator/context/operation.py')
-rw-r--r-- | azure/aria/aria-extension-cloudify/src/aria/aria/orchestrator/context/operation.py | 174 |
1 files changed, 174 insertions, 0 deletions
diff --git a/azure/aria/aria-extension-cloudify/src/aria/aria/orchestrator/context/operation.py b/azure/aria/aria-extension-cloudify/src/aria/aria/orchestrator/context/operation.py new file mode 100644 index 0000000..8613ec3 --- /dev/null +++ b/azure/aria/aria-extension-cloudify/src/aria/aria/orchestrator/context/operation.py @@ -0,0 +1,174 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +Operation contexts. +""" + +import threading +from contextlib import contextmanager + +import aria +from aria.utils import file +from . import common + + +class BaseOperationContext(common.BaseContext): + """ + Base class for contexts used during operation creation and execution. + """ + + def __init__(self, task_id, actor_id, **kwargs): + self._task_id = task_id + self._actor_id = actor_id + self._thread_local = threading.local() + self._destroy_session = kwargs.pop('destroy_session', False) + logger_level = kwargs.pop('logger_level', None) + super(BaseOperationContext, self).__init__(**kwargs) + self._register_logger(task_id=self.task.id, level=logger_level) + + def __repr__(self): + details = 'function={task.function}; ' \ + 'operation_arguments={task.arguments}'\ + .format(task=self.task) + return '{name}({0})'.format(details, name=self.name) + + @property + def task(self): + """ + The task in the model storage. + """ + # SQLAlchemy prevents from accessing an object which was created on a different thread. + # So we retrieve the object from the storage if the current thread isn't the same as the + # original thread. + + if not hasattr(self._thread_local, 'task'): + self._thread_local.task = self.model.task.get(self._task_id) + return self._thread_local.task + + @property + def plugin_workdir(self): + """ + A work directory that is unique to the plugin and the service ID. + """ + if self.task.plugin is None: + return None + plugin_workdir = '{0}/plugins/{1}/{2}'.format(self._workdir, + self.service.id, + self.task.plugin.name) + file.makedirs(plugin_workdir) + return plugin_workdir + + @property + def serialization_dict(self): + context_dict = { + 'name': self.name, + 'service_id': self._service_id, + 'task_id': self._task_id, + 'actor_id': self._actor_id, + 'workdir': self._workdir, + 'model_storage': self.model.serialization_dict if self.model else None, + 'resource_storage': self.resource.serialization_dict if self.resource else None, + 'execution_id': self._execution_id, + 'logger_level': self.logger.level + } + return { + 'context_cls': self.__class__, + 'context': context_dict + } + + @classmethod + def instantiate_from_dict(cls, model_storage=None, resource_storage=None, **kwargs): + if model_storage: + model_storage = aria.application_model_storage(**model_storage) + if resource_storage: + resource_storage = aria.application_resource_storage(**resource_storage) + + return cls(model_storage=model_storage, + resource_storage=resource_storage, + destroy_session=True, + **kwargs) + + def close(self): + if self._destroy_session: + self.model.log._session.remove() + self.model.log._engine.dispose() + + @property + @contextmanager + def persist_changes(self): + yield + self.model.task.update(self.task) + + +class NodeOperationContext(BaseOperationContext): + """ + Context for node operations. + """ + + @property + def node(self): + """ + The node of the current operation. + """ + return self.model.node.get(self._actor_id) + + @property + def node_template(self): + """ + The node template of the current operation. + """ + return self.node.node_template + + +class RelationshipOperationContext(BaseOperationContext): + """ + Context for relationship operations. + """ + + @property + def relationship(self): + """ + The relationship instance of the current operation. + """ + return self.model.relationship.get(self._actor_id) + + @property + def source_node(self): + """ + The relationship source node. + """ + return self.relationship.source_node + + @property + def source_node_template(self): + """ + The relationship source node template. + """ + return self.source_node.node_template + + @property + def target_node(self): + """ + The relationship target node. + """ + return self.relationship.target_node + + @property + def target_node_template(self): + """ + The relationship target node template. + """ + return self.target_node.node_template |