summaryrefslogtreecommitdiffstats
path: root/azure/aria/aria-extension-cloudify/src/aria/aria/orchestrator/context
diff options
context:
space:
mode:
Diffstat (limited to 'azure/aria/aria-extension-cloudify/src/aria/aria/orchestrator/context')
-rw-r--r--azure/aria/aria-extension-cloudify/src/aria/aria/orchestrator/context/__init__.py21
-rw-r--r--azure/aria/aria-extension-cloudify/src/aria/aria/orchestrator/context/common.py217
-rw-r--r--azure/aria/aria-extension-cloudify/src/aria/aria/orchestrator/context/exceptions.py27
-rw-r--r--azure/aria/aria-extension-cloudify/src/aria/aria/orchestrator/context/operation.py174
-rw-r--r--azure/aria/aria-extension-cloudify/src/aria/aria/orchestrator/context/toolbelt.py59
-rw-r--r--azure/aria/aria-extension-cloudify/src/aria/aria/orchestrator/context/workflow.py135
6 files changed, 633 insertions, 0 deletions
diff --git a/azure/aria/aria-extension-cloudify/src/aria/aria/orchestrator/context/__init__.py b/azure/aria/aria-extension-cloudify/src/aria/aria/orchestrator/context/__init__.py
new file mode 100644
index 0000000..a87828d
--- /dev/null
+++ b/azure/aria/aria-extension-cloudify/src/aria/aria/orchestrator/context/__init__.py
@@ -0,0 +1,21 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""
+Contexts for workflows and operations.
+"""
+
+from . import workflow, operation
+from .toolbelt import toolbelt
diff --git a/azure/aria/aria-extension-cloudify/src/aria/aria/orchestrator/context/common.py b/azure/aria/aria-extension-cloudify/src/aria/aria/orchestrator/context/common.py
new file mode 100644
index 0000000..3c5f618
--- /dev/null
+++ b/azure/aria/aria-extension-cloudify/src/aria/aria/orchestrator/context/common.py
@@ -0,0 +1,217 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""
+Common code for contexts.
+"""
+
+import logging
+from contextlib import contextmanager
+from functools import partial
+
+import jinja2
+
+from aria import (
+ logger as aria_logger,
+ modeling
+)
+from aria.storage import exceptions
+
+from ...utils.uuid import generate_uuid
+
+
+class BaseContext(object):
+ """
+ Base class for contexts.
+ """
+
+ INSTRUMENTATION_FIELDS = (
+ modeling.models.Service.inputs,
+ modeling.models.ServiceTemplate.inputs,
+ modeling.models.Policy.properties,
+ modeling.models.PolicyTemplate.properties,
+ modeling.models.Node.attributes,
+ modeling.models.Node.properties,
+ modeling.models.NodeTemplate.attributes,
+ modeling.models.NodeTemplate.properties,
+ modeling.models.Group.properties,
+ modeling.models.GroupTemplate.properties,
+ modeling.models.Capability.properties,
+ # TODO ARIA-279: modeling.models.Capability.attributes,
+ modeling.models.CapabilityTemplate.properties,
+ # TODO ARIA-279: modeling.models.CapabilityTemplate.attributes
+ modeling.models.Relationship.properties,
+ modeling.models.Artifact.properties,
+ modeling.models.ArtifactTemplate.properties,
+ modeling.models.Interface.inputs,
+ modeling.models.InterfaceTemplate.inputs,
+ modeling.models.Operation.inputs,
+ modeling.models.OperationTemplate.inputs
+ )
+
+ class PrefixedLogger(object):
+ def __init__(self, base_logger, task_id=None):
+ self._logger = base_logger
+ self._task_id = task_id
+
+ def __getattr__(self, attribute):
+ if attribute.upper() in logging._levelNames:
+ return partial(self._logger_with_task_id, _level=attribute)
+ else:
+ return getattr(self._logger, attribute)
+
+ def _logger_with_task_id(self, *args, **kwargs):
+ level = kwargs.pop('_level')
+ kwargs.setdefault('extra', {})['task_id'] = self._task_id
+ return getattr(self._logger, level)(*args, **kwargs)
+
+ def __init__(self,
+ name,
+ service_id,
+ model_storage,
+ resource_storage,
+ execution_id,
+ workdir=None,
+ **kwargs):
+ super(BaseContext, self).__init__(**kwargs)
+ self._name = name
+ self._id = generate_uuid(variant='uuid')
+ self._model = model_storage
+ self._resource = resource_storage
+ self._service_id = service_id
+ self._workdir = workdir
+ self._execution_id = execution_id
+ self.logger = None
+
+ def _register_logger(self, level=None, task_id=None):
+ self.logger = self.PrefixedLogger(
+ logging.getLogger(aria_logger.TASK_LOGGER_NAME), task_id=task_id)
+ self.logger.setLevel(level or logging.DEBUG)
+ if not self.logger.handlers:
+ self.logger.addHandler(self._get_sqla_handler())
+
+ def _get_sqla_handler(self):
+ return aria_logger.create_sqla_log_handler(model=self._model,
+ log_cls=modeling.models.Log,
+ execution_id=self._execution_id)
+
+ def __repr__(self):
+ return (
+ '{name}(name={self.name}, '
+ 'deployment_id={self._service_id}, '
+ .format(name=self.__class__.__name__, self=self))
+
+ @contextmanager
+ def logging_handlers(self, handlers=None):
+ handlers = handlers or []
+ try:
+ for handler in handlers:
+ self.logger.addHandler(handler)
+ yield self.logger
+ finally:
+ for handler in handlers:
+ self.logger.removeHandler(handler)
+
+ @property
+ def model(self):
+ """
+ Storage model API ("MAPI").
+ """
+ return self._model
+
+ @property
+ def resource(self):
+ """
+ Storage resource API ("RAPI").
+ """
+ return self._resource
+
+ @property
+ def service_template(self):
+ """
+ Service template model.
+ """
+ return self.service.service_template
+
+ @property
+ def service(self):
+ """
+ Service instance model.
+ """
+ return self.model.service.get(self._service_id)
+
+ @property
+ def name(self):
+ """
+ Operation name.
+ """
+ return self._name
+
+ @property
+ def id(self):
+ """
+ Operation ID.
+ """
+ return self._id
+
+ def download_resource(self, destination, path=None):
+ """
+ Download a service template resource from the storage resource API ("RAPI").
+ """
+ try:
+ self.resource.service.download(entry_id=str(self.service.id),
+ destination=destination,
+ path=path)
+ except exceptions.StorageError:
+ self.resource.service_template.download(entry_id=str(self.service_template.id),
+ destination=destination,
+ path=path)
+
+ def download_resource_and_render(self, destination, path=None, variables=None):
+ """
+ Downloads a service template resource from the resource storage and renders its content as a
+ Jinja template using the provided variables. ``ctx`` is available to the template without
+ providing it explicitly.
+ """
+ resource_content = self.get_resource(path=path)
+ resource_content = self._render_resource(resource_content=resource_content,
+ variables=variables)
+ with open(destination, 'wb') as f:
+ f.write(resource_content)
+
+ def get_resource(self, path=None):
+ """
+ Reads a service instance resource as string from the resource storage.
+ """
+ try:
+ return self.resource.service.read(entry_id=str(self.service.id), path=path)
+ except exceptions.StorageError:
+ return self.resource.service_template.read(entry_id=str(self.service_template.id),
+ path=path)
+
+ def get_resource_and_render(self, path=None, variables=None):
+ """
+ Reads a service instance resource as string from the resource storage and renders it as a
+ Jinja template using the provided variables. ``ctx`` is available to the template without
+ providing it explicitly.
+ """
+ resource_content = self.get_resource(path=path)
+ return self._render_resource(resource_content=resource_content, variables=variables)
+
+ def _render_resource(self, resource_content, variables):
+ variables = variables or {}
+ variables.setdefault('ctx', self)
+ resource_template = jinja2.Template(resource_content)
+ return resource_template.render(variables)
diff --git a/azure/aria/aria-extension-cloudify/src/aria/aria/orchestrator/context/exceptions.py b/azure/aria/aria-extension-cloudify/src/aria/aria/orchestrator/context/exceptions.py
new file mode 100644
index 0000000..e46e2b1
--- /dev/null
+++ b/azure/aria/aria-extension-cloudify/src/aria/aria/orchestrator/context/exceptions.py
@@ -0,0 +1,27 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""
+Context exceptions.
+"""
+
+from ..exceptions import OrchestratorError
+
+
+class ContextException(OrchestratorError):
+ """
+ Context based exception
+ """
+ pass
diff --git a/azure/aria/aria-extension-cloudify/src/aria/aria/orchestrator/context/operation.py b/azure/aria/aria-extension-cloudify/src/aria/aria/orchestrator/context/operation.py
new file mode 100644
index 0000000..8613ec3
--- /dev/null
+++ b/azure/aria/aria-extension-cloudify/src/aria/aria/orchestrator/context/operation.py
@@ -0,0 +1,174 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""
+Operation contexts.
+"""
+
+import threading
+from contextlib import contextmanager
+
+import aria
+from aria.utils import file
+from . import common
+
+
+class BaseOperationContext(common.BaseContext):
+ """
+ Base class for contexts used during operation creation and execution.
+ """
+
+ def __init__(self, task_id, actor_id, **kwargs):
+ self._task_id = task_id
+ self._actor_id = actor_id
+ self._thread_local = threading.local()
+ self._destroy_session = kwargs.pop('destroy_session', False)
+ logger_level = kwargs.pop('logger_level', None)
+ super(BaseOperationContext, self).__init__(**kwargs)
+ self._register_logger(task_id=self.task.id, level=logger_level)
+
+ def __repr__(self):
+ details = 'function={task.function}; ' \
+ 'operation_arguments={task.arguments}'\
+ .format(task=self.task)
+ return '{name}({0})'.format(details, name=self.name)
+
+ @property
+ def task(self):
+ """
+ The task in the model storage.
+ """
+ # SQLAlchemy prevents from accessing an object which was created on a different thread.
+ # So we retrieve the object from the storage if the current thread isn't the same as the
+ # original thread.
+
+ if not hasattr(self._thread_local, 'task'):
+ self._thread_local.task = self.model.task.get(self._task_id)
+ return self._thread_local.task
+
+ @property
+ def plugin_workdir(self):
+ """
+ A work directory that is unique to the plugin and the service ID.
+ """
+ if self.task.plugin is None:
+ return None
+ plugin_workdir = '{0}/plugins/{1}/{2}'.format(self._workdir,
+ self.service.id,
+ self.task.plugin.name)
+ file.makedirs(plugin_workdir)
+ return plugin_workdir
+
+ @property
+ def serialization_dict(self):
+ context_dict = {
+ 'name': self.name,
+ 'service_id': self._service_id,
+ 'task_id': self._task_id,
+ 'actor_id': self._actor_id,
+ 'workdir': self._workdir,
+ 'model_storage': self.model.serialization_dict if self.model else None,
+ 'resource_storage': self.resource.serialization_dict if self.resource else None,
+ 'execution_id': self._execution_id,
+ 'logger_level': self.logger.level
+ }
+ return {
+ 'context_cls': self.__class__,
+ 'context': context_dict
+ }
+
+ @classmethod
+ def instantiate_from_dict(cls, model_storage=None, resource_storage=None, **kwargs):
+ if model_storage:
+ model_storage = aria.application_model_storage(**model_storage)
+ if resource_storage:
+ resource_storage = aria.application_resource_storage(**resource_storage)
+
+ return cls(model_storage=model_storage,
+ resource_storage=resource_storage,
+ destroy_session=True,
+ **kwargs)
+
+ def close(self):
+ if self._destroy_session:
+ self.model.log._session.remove()
+ self.model.log._engine.dispose()
+
+ @property
+ @contextmanager
+ def persist_changes(self):
+ yield
+ self.model.task.update(self.task)
+
+
+class NodeOperationContext(BaseOperationContext):
+ """
+ Context for node operations.
+ """
+
+ @property
+ def node(self):
+ """
+ The node of the current operation.
+ """
+ return self.model.node.get(self._actor_id)
+
+ @property
+ def node_template(self):
+ """
+ The node template of the current operation.
+ """
+ return self.node.node_template
+
+
+class RelationshipOperationContext(BaseOperationContext):
+ """
+ Context for relationship operations.
+ """
+
+ @property
+ def relationship(self):
+ """
+ The relationship instance of the current operation.
+ """
+ return self.model.relationship.get(self._actor_id)
+
+ @property
+ def source_node(self):
+ """
+ The relationship source node.
+ """
+ return self.relationship.source_node
+
+ @property
+ def source_node_template(self):
+ """
+ The relationship source node template.
+ """
+ return self.source_node.node_template
+
+ @property
+ def target_node(self):
+ """
+ The relationship target node.
+ """
+ return self.relationship.target_node
+
+ @property
+ def target_node_template(self):
+ """
+ The relationship target node template.
+ """
+ return self.target_node.node_template
diff --git a/azure/aria/aria-extension-cloudify/src/aria/aria/orchestrator/context/toolbelt.py b/azure/aria/aria-extension-cloudify/src/aria/aria/orchestrator/context/toolbelt.py
new file mode 100644
index 0000000..a2e1122
--- /dev/null
+++ b/azure/aria/aria-extension-cloudify/src/aria/aria/orchestrator/context/toolbelt.py
@@ -0,0 +1,59 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""
+Tools for operations.
+"""
+
+from . import operation
+
+
+class NodeToolBelt(object):
+ """
+ Node operation tool belt.
+ """
+ def __init__(self, operation_context):
+ self._op_context = operation_context
+
+ @property
+ def host_ip(self):
+ """
+ The host ip of the current node
+ :return:
+ """
+ assert isinstance(self._op_context, operation.NodeOperationContext)
+ return self._op_context.node.host.attributes.get('ip')
+
+
+class RelationshipToolBelt(object):
+ """
+ Relationship operation tool belt.
+ """
+ def __init__(self, operation_context):
+ self._op_context = operation_context
+
+
+def toolbelt(operation_context):
+ """
+ Get a toolbelt from to the current operation executor.
+
+ :param operation_context:
+ """
+ if isinstance(operation_context, operation.NodeOperationContext):
+ return NodeToolBelt(operation_context)
+ elif isinstance(operation_context, operation.RelationshipOperationContext):
+ return RelationshipToolBelt(operation_context)
+ else:
+ raise RuntimeError("Operation context not supported")
diff --git a/azure/aria/aria-extension-cloudify/src/aria/aria/orchestrator/context/workflow.py b/azure/aria/aria-extension-cloudify/src/aria/aria/orchestrator/context/workflow.py
new file mode 100644
index 0000000..738d2fd
--- /dev/null
+++ b/azure/aria/aria-extension-cloudify/src/aria/aria/orchestrator/context/workflow.py
@@ -0,0 +1,135 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""
+Workflow context.
+"""
+
+import threading
+from contextlib import contextmanager
+
+from .exceptions import ContextException
+from .common import BaseContext
+
+
+class WorkflowContext(BaseContext):
+ """
+ Context used during workflow creation and execution.
+ """
+ def __init__(self,
+ workflow_name,
+ parameters=None,
+ task_max_attempts=1,
+ task_retry_interval=0,
+ task_ignore_failure=False,
+ *args, **kwargs):
+ super(WorkflowContext, self).__init__(*args, **kwargs)
+ self._workflow_name = workflow_name
+ self._parameters = parameters or {}
+ self._task_max_attempts = task_max_attempts
+ self._task_retry_interval = task_retry_interval
+ self._task_ignore_failure = task_ignore_failure
+ self._execution_graph = None
+ self._register_logger()
+
+ def __repr__(self):
+ return (
+ '{name}(deployment_id={self._service_id}, '
+ 'workflow_name={self._workflow_name}, execution_id={self._execution_id})'.format(
+ name=self.__class__.__name__, self=self))
+
+ @property
+ def workflow_name(self):
+ return self._workflow_name
+
+ @property
+ def execution(self):
+ """
+ Execution model.
+ """
+ return self.model.execution.get(self._execution_id)
+
+ @execution.setter
+ def execution(self, value):
+ """
+ Stores the execution in the storage model API ("MAPI").
+ """
+ self.model.execution.put(value)
+
+ @property
+ def node_templates(self):
+ """
+ Iterates over nodes templates.
+ """
+ key = 'service_{0}'.format(self.model.node_template.model_cls.name_column_name())
+
+ return self.model.node_template.iter(
+ filters={
+ key: getattr(self.service, self.service.name_column_name())
+ }
+ )
+
+ @property
+ def nodes(self):
+ """
+ Iterates over nodes.
+ """
+ key = 'service_{0}'.format(self.model.node.model_cls.name_column_name())
+ return self.model.node.iter(
+ filters={
+ key: getattr(self.service, self.service.name_column_name())
+ }
+ )
+
+ @property
+ @contextmanager
+ def persist_changes(self):
+ yield
+ self._model.execution.update(self.execution)
+
+
+class _CurrentContext(threading.local):
+ """
+ Provides a thread-level context, with sugar for the task MAPI.
+ """
+
+ def __init__(self):
+ super(_CurrentContext, self).__init__()
+ self._workflow_context = None
+
+ def _set(self, value):
+ self._workflow_context = value
+
+ def get(self):
+ """
+ Retrieves the current workflow context.
+ """
+ if self._workflow_context is not None:
+ return self._workflow_context
+ raise ContextException("No context was set")
+
+ @contextmanager
+ def push(self, workflow_context):
+ """
+ Switches the current context to the provided context.
+ """
+ prev_workflow_context = self._workflow_context
+ self._set(workflow_context)
+ try:
+ yield self
+ finally:
+ self._set(prev_workflow_context)
+
+current = _CurrentContext()