summaryrefslogtreecommitdiffstats
path: root/azure/aria/aria-extension-cloudify/src/aria/aria/orchestrator/context/operation.py
diff options
context:
space:
mode:
Diffstat (limited to 'azure/aria/aria-extension-cloudify/src/aria/aria/orchestrator/context/operation.py')
-rw-r--r--azure/aria/aria-extension-cloudify/src/aria/aria/orchestrator/context/operation.py174
1 files changed, 174 insertions, 0 deletions
diff --git a/azure/aria/aria-extension-cloudify/src/aria/aria/orchestrator/context/operation.py b/azure/aria/aria-extension-cloudify/src/aria/aria/orchestrator/context/operation.py
new file mode 100644
index 0000000..8613ec3
--- /dev/null
+++ b/azure/aria/aria-extension-cloudify/src/aria/aria/orchestrator/context/operation.py
@@ -0,0 +1,174 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""
+Operation contexts.
+"""
+
+import threading
+from contextlib import contextmanager
+
+import aria
+from aria.utils import file
+from . import common
+
+
+class BaseOperationContext(common.BaseContext):
+ """
+ Base class for contexts used during operation creation and execution.
+ """
+
+ def __init__(self, task_id, actor_id, **kwargs):
+ self._task_id = task_id
+ self._actor_id = actor_id
+ self._thread_local = threading.local()
+ self._destroy_session = kwargs.pop('destroy_session', False)
+ logger_level = kwargs.pop('logger_level', None)
+ super(BaseOperationContext, self).__init__(**kwargs)
+ self._register_logger(task_id=self.task.id, level=logger_level)
+
+ def __repr__(self):
+ details = 'function={task.function}; ' \
+ 'operation_arguments={task.arguments}'\
+ .format(task=self.task)
+ return '{name}({0})'.format(details, name=self.name)
+
+ @property
+ def task(self):
+ """
+ The task in the model storage.
+ """
+ # SQLAlchemy prevents from accessing an object which was created on a different thread.
+ # So we retrieve the object from the storage if the current thread isn't the same as the
+ # original thread.
+
+ if not hasattr(self._thread_local, 'task'):
+ self._thread_local.task = self.model.task.get(self._task_id)
+ return self._thread_local.task
+
+ @property
+ def plugin_workdir(self):
+ """
+ A work directory that is unique to the plugin and the service ID.
+ """
+ if self.task.plugin is None:
+ return None
+ plugin_workdir = '{0}/plugins/{1}/{2}'.format(self._workdir,
+ self.service.id,
+ self.task.plugin.name)
+ file.makedirs(plugin_workdir)
+ return plugin_workdir
+
+ @property
+ def serialization_dict(self):
+ context_dict = {
+ 'name': self.name,
+ 'service_id': self._service_id,
+ 'task_id': self._task_id,
+ 'actor_id': self._actor_id,
+ 'workdir': self._workdir,
+ 'model_storage': self.model.serialization_dict if self.model else None,
+ 'resource_storage': self.resource.serialization_dict if self.resource else None,
+ 'execution_id': self._execution_id,
+ 'logger_level': self.logger.level
+ }
+ return {
+ 'context_cls': self.__class__,
+ 'context': context_dict
+ }
+
+ @classmethod
+ def instantiate_from_dict(cls, model_storage=None, resource_storage=None, **kwargs):
+ if model_storage:
+ model_storage = aria.application_model_storage(**model_storage)
+ if resource_storage:
+ resource_storage = aria.application_resource_storage(**resource_storage)
+
+ return cls(model_storage=model_storage,
+ resource_storage=resource_storage,
+ destroy_session=True,
+ **kwargs)
+
+ def close(self):
+ if self._destroy_session:
+ self.model.log._session.remove()
+ self.model.log._engine.dispose()
+
+ @property
+ @contextmanager
+ def persist_changes(self):
+ yield
+ self.model.task.update(self.task)
+
+
+class NodeOperationContext(BaseOperationContext):
+ """
+ Context for node operations.
+ """
+
+ @property
+ def node(self):
+ """
+ The node of the current operation.
+ """
+ return self.model.node.get(self._actor_id)
+
+ @property
+ def node_template(self):
+ """
+ The node template of the current operation.
+ """
+ return self.node.node_template
+
+
+class RelationshipOperationContext(BaseOperationContext):
+ """
+ Context for relationship operations.
+ """
+
+ @property
+ def relationship(self):
+ """
+ The relationship instance of the current operation.
+ """
+ return self.model.relationship.get(self._actor_id)
+
+ @property
+ def source_node(self):
+ """
+ The relationship source node.
+ """
+ return self.relationship.source_node
+
+ @property
+ def source_node_template(self):
+ """
+ The relationship source node template.
+ """
+ return self.source_node.node_template
+
+ @property
+ def target_node(self):
+ """
+ The relationship target node.
+ """
+ return self.relationship.target_node
+
+ @property
+ def target_node_template(self):
+ """
+ The relationship target node template.
+ """
+ return self.target_node.node_template