diff options
Diffstat (limited to 'azure/aria/aria-extension-cloudify/src/aria/aria/orchestrator/context')
6 files changed, 633 insertions, 0 deletions
diff --git a/azure/aria/aria-extension-cloudify/src/aria/aria/orchestrator/context/__init__.py b/azure/aria/aria-extension-cloudify/src/aria/aria/orchestrator/context/__init__.py new file mode 100644 index 0000000..a87828d --- /dev/null +++ b/azure/aria/aria-extension-cloudify/src/aria/aria/orchestrator/context/__init__.py @@ -0,0 +1,21 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +Contexts for workflows and operations. +""" + +from . import workflow, operation +from .toolbelt import toolbelt diff --git a/azure/aria/aria-extension-cloudify/src/aria/aria/orchestrator/context/common.py b/azure/aria/aria-extension-cloudify/src/aria/aria/orchestrator/context/common.py new file mode 100644 index 0000000..3c5f618 --- /dev/null +++ b/azure/aria/aria-extension-cloudify/src/aria/aria/orchestrator/context/common.py @@ -0,0 +1,217 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +Common code for contexts. +""" + +import logging +from contextlib import contextmanager +from functools import partial + +import jinja2 + +from aria import ( + logger as aria_logger, + modeling +) +from aria.storage import exceptions + +from ...utils.uuid import generate_uuid + + +class BaseContext(object): + """ + Base class for contexts. + """ + + INSTRUMENTATION_FIELDS = ( + modeling.models.Service.inputs, + modeling.models.ServiceTemplate.inputs, + modeling.models.Policy.properties, + modeling.models.PolicyTemplate.properties, + modeling.models.Node.attributes, + modeling.models.Node.properties, + modeling.models.NodeTemplate.attributes, + modeling.models.NodeTemplate.properties, + modeling.models.Group.properties, + modeling.models.GroupTemplate.properties, + modeling.models.Capability.properties, + # TODO ARIA-279: modeling.models.Capability.attributes, + modeling.models.CapabilityTemplate.properties, + # TODO ARIA-279: modeling.models.CapabilityTemplate.attributes + modeling.models.Relationship.properties, + modeling.models.Artifact.properties, + modeling.models.ArtifactTemplate.properties, + modeling.models.Interface.inputs, + modeling.models.InterfaceTemplate.inputs, + modeling.models.Operation.inputs, + modeling.models.OperationTemplate.inputs + ) + + class PrefixedLogger(object): + def __init__(self, base_logger, task_id=None): + self._logger = base_logger + self._task_id = task_id + + def __getattr__(self, attribute): + if attribute.upper() in logging._levelNames: + return partial(self._logger_with_task_id, _level=attribute) + else: + return getattr(self._logger, attribute) + + def _logger_with_task_id(self, *args, **kwargs): + level = kwargs.pop('_level') + kwargs.setdefault('extra', {})['task_id'] = self._task_id + return getattr(self._logger, level)(*args, **kwargs) + + def __init__(self, + name, + service_id, + model_storage, + resource_storage, + execution_id, + workdir=None, + **kwargs): + super(BaseContext, self).__init__(**kwargs) + self._name = name + self._id = generate_uuid(variant='uuid') + self._model = model_storage + self._resource = resource_storage + self._service_id = service_id + self._workdir = workdir + self._execution_id = execution_id + self.logger = None + + def _register_logger(self, level=None, task_id=None): + self.logger = self.PrefixedLogger( + logging.getLogger(aria_logger.TASK_LOGGER_NAME), task_id=task_id) + self.logger.setLevel(level or logging.DEBUG) + if not self.logger.handlers: + self.logger.addHandler(self._get_sqla_handler()) + + def _get_sqla_handler(self): + return aria_logger.create_sqla_log_handler(model=self._model, + log_cls=modeling.models.Log, + execution_id=self._execution_id) + + def __repr__(self): + return ( + '{name}(name={self.name}, ' + 'deployment_id={self._service_id}, ' + .format(name=self.__class__.__name__, self=self)) + + @contextmanager + def logging_handlers(self, handlers=None): + handlers = handlers or [] + try: + for handler in handlers: + self.logger.addHandler(handler) + yield self.logger + finally: + for handler in handlers: + self.logger.removeHandler(handler) + + @property + def model(self): + """ + Storage model API ("MAPI"). + """ + return self._model + + @property + def resource(self): + """ + Storage resource API ("RAPI"). + """ + return self._resource + + @property + def service_template(self): + """ + Service template model. + """ + return self.service.service_template + + @property + def service(self): + """ + Service instance model. + """ + return self.model.service.get(self._service_id) + + @property + def name(self): + """ + Operation name. + """ + return self._name + + @property + def id(self): + """ + Operation ID. + """ + return self._id + + def download_resource(self, destination, path=None): + """ + Download a service template resource from the storage resource API ("RAPI"). + """ + try: + self.resource.service.download(entry_id=str(self.service.id), + destination=destination, + path=path) + except exceptions.StorageError: + self.resource.service_template.download(entry_id=str(self.service_template.id), + destination=destination, + path=path) + + def download_resource_and_render(self, destination, path=None, variables=None): + """ + Downloads a service template resource from the resource storage and renders its content as a + Jinja template using the provided variables. ``ctx`` is available to the template without + providing it explicitly. + """ + resource_content = self.get_resource(path=path) + resource_content = self._render_resource(resource_content=resource_content, + variables=variables) + with open(destination, 'wb') as f: + f.write(resource_content) + + def get_resource(self, path=None): + """ + Reads a service instance resource as string from the resource storage. + """ + try: + return self.resource.service.read(entry_id=str(self.service.id), path=path) + except exceptions.StorageError: + return self.resource.service_template.read(entry_id=str(self.service_template.id), + path=path) + + def get_resource_and_render(self, path=None, variables=None): + """ + Reads a service instance resource as string from the resource storage and renders it as a + Jinja template using the provided variables. ``ctx`` is available to the template without + providing it explicitly. + """ + resource_content = self.get_resource(path=path) + return self._render_resource(resource_content=resource_content, variables=variables) + + def _render_resource(self, resource_content, variables): + variables = variables or {} + variables.setdefault('ctx', self) + resource_template = jinja2.Template(resource_content) + return resource_template.render(variables) diff --git a/azure/aria/aria-extension-cloudify/src/aria/aria/orchestrator/context/exceptions.py b/azure/aria/aria-extension-cloudify/src/aria/aria/orchestrator/context/exceptions.py new file mode 100644 index 0000000..e46e2b1 --- /dev/null +++ b/azure/aria/aria-extension-cloudify/src/aria/aria/orchestrator/context/exceptions.py @@ -0,0 +1,27 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +Context exceptions. +""" + +from ..exceptions import OrchestratorError + + +class ContextException(OrchestratorError): + """ + Context based exception + """ + pass diff --git a/azure/aria/aria-extension-cloudify/src/aria/aria/orchestrator/context/operation.py b/azure/aria/aria-extension-cloudify/src/aria/aria/orchestrator/context/operation.py new file mode 100644 index 0000000..8613ec3 --- /dev/null +++ b/azure/aria/aria-extension-cloudify/src/aria/aria/orchestrator/context/operation.py @@ -0,0 +1,174 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +Operation contexts. +""" + +import threading +from contextlib import contextmanager + +import aria +from aria.utils import file +from . import common + + +class BaseOperationContext(common.BaseContext): + """ + Base class for contexts used during operation creation and execution. + """ + + def __init__(self, task_id, actor_id, **kwargs): + self._task_id = task_id + self._actor_id = actor_id + self._thread_local = threading.local() + self._destroy_session = kwargs.pop('destroy_session', False) + logger_level = kwargs.pop('logger_level', None) + super(BaseOperationContext, self).__init__(**kwargs) + self._register_logger(task_id=self.task.id, level=logger_level) + + def __repr__(self): + details = 'function={task.function}; ' \ + 'operation_arguments={task.arguments}'\ + .format(task=self.task) + return '{name}({0})'.format(details, name=self.name) + + @property + def task(self): + """ + The task in the model storage. + """ + # SQLAlchemy prevents from accessing an object which was created on a different thread. + # So we retrieve the object from the storage if the current thread isn't the same as the + # original thread. + + if not hasattr(self._thread_local, 'task'): + self._thread_local.task = self.model.task.get(self._task_id) + return self._thread_local.task + + @property + def plugin_workdir(self): + """ + A work directory that is unique to the plugin and the service ID. + """ + if self.task.plugin is None: + return None + plugin_workdir = '{0}/plugins/{1}/{2}'.format(self._workdir, + self.service.id, + self.task.plugin.name) + file.makedirs(plugin_workdir) + return plugin_workdir + + @property + def serialization_dict(self): + context_dict = { + 'name': self.name, + 'service_id': self._service_id, + 'task_id': self._task_id, + 'actor_id': self._actor_id, + 'workdir': self._workdir, + 'model_storage': self.model.serialization_dict if self.model else None, + 'resource_storage': self.resource.serialization_dict if self.resource else None, + 'execution_id': self._execution_id, + 'logger_level': self.logger.level + } + return { + 'context_cls': self.__class__, + 'context': context_dict + } + + @classmethod + def instantiate_from_dict(cls, model_storage=None, resource_storage=None, **kwargs): + if model_storage: + model_storage = aria.application_model_storage(**model_storage) + if resource_storage: + resource_storage = aria.application_resource_storage(**resource_storage) + + return cls(model_storage=model_storage, + resource_storage=resource_storage, + destroy_session=True, + **kwargs) + + def close(self): + if self._destroy_session: + self.model.log._session.remove() + self.model.log._engine.dispose() + + @property + @contextmanager + def persist_changes(self): + yield + self.model.task.update(self.task) + + +class NodeOperationContext(BaseOperationContext): + """ + Context for node operations. + """ + + @property + def node(self): + """ + The node of the current operation. + """ + return self.model.node.get(self._actor_id) + + @property + def node_template(self): + """ + The node template of the current operation. + """ + return self.node.node_template + + +class RelationshipOperationContext(BaseOperationContext): + """ + Context for relationship operations. + """ + + @property + def relationship(self): + """ + The relationship instance of the current operation. + """ + return self.model.relationship.get(self._actor_id) + + @property + def source_node(self): + """ + The relationship source node. + """ + return self.relationship.source_node + + @property + def source_node_template(self): + """ + The relationship source node template. + """ + return self.source_node.node_template + + @property + def target_node(self): + """ + The relationship target node. + """ + return self.relationship.target_node + + @property + def target_node_template(self): + """ + The relationship target node template. + """ + return self.target_node.node_template diff --git a/azure/aria/aria-extension-cloudify/src/aria/aria/orchestrator/context/toolbelt.py b/azure/aria/aria-extension-cloudify/src/aria/aria/orchestrator/context/toolbelt.py new file mode 100644 index 0000000..a2e1122 --- /dev/null +++ b/azure/aria/aria-extension-cloudify/src/aria/aria/orchestrator/context/toolbelt.py @@ -0,0 +1,59 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +Tools for operations. +""" + +from . import operation + + +class NodeToolBelt(object): + """ + Node operation tool belt. + """ + def __init__(self, operation_context): + self._op_context = operation_context + + @property + def host_ip(self): + """ + The host ip of the current node + :return: + """ + assert isinstance(self._op_context, operation.NodeOperationContext) + return self._op_context.node.host.attributes.get('ip') + + +class RelationshipToolBelt(object): + """ + Relationship operation tool belt. + """ + def __init__(self, operation_context): + self._op_context = operation_context + + +def toolbelt(operation_context): + """ + Get a toolbelt from to the current operation executor. + + :param operation_context: + """ + if isinstance(operation_context, operation.NodeOperationContext): + return NodeToolBelt(operation_context) + elif isinstance(operation_context, operation.RelationshipOperationContext): + return RelationshipToolBelt(operation_context) + else: + raise RuntimeError("Operation context not supported") diff --git a/azure/aria/aria-extension-cloudify/src/aria/aria/orchestrator/context/workflow.py b/azure/aria/aria-extension-cloudify/src/aria/aria/orchestrator/context/workflow.py new file mode 100644 index 0000000..738d2fd --- /dev/null +++ b/azure/aria/aria-extension-cloudify/src/aria/aria/orchestrator/context/workflow.py @@ -0,0 +1,135 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +Workflow context. +""" + +import threading +from contextlib import contextmanager + +from .exceptions import ContextException +from .common import BaseContext + + +class WorkflowContext(BaseContext): + """ + Context used during workflow creation and execution. + """ + def __init__(self, + workflow_name, + parameters=None, + task_max_attempts=1, + task_retry_interval=0, + task_ignore_failure=False, + *args, **kwargs): + super(WorkflowContext, self).__init__(*args, **kwargs) + self._workflow_name = workflow_name + self._parameters = parameters or {} + self._task_max_attempts = task_max_attempts + self._task_retry_interval = task_retry_interval + self._task_ignore_failure = task_ignore_failure + self._execution_graph = None + self._register_logger() + + def __repr__(self): + return ( + '{name}(deployment_id={self._service_id}, ' + 'workflow_name={self._workflow_name}, execution_id={self._execution_id})'.format( + name=self.__class__.__name__, self=self)) + + @property + def workflow_name(self): + return self._workflow_name + + @property + def execution(self): + """ + Execution model. + """ + return self.model.execution.get(self._execution_id) + + @execution.setter + def execution(self, value): + """ + Stores the execution in the storage model API ("MAPI"). + """ + self.model.execution.put(value) + + @property + def node_templates(self): + """ + Iterates over nodes templates. + """ + key = 'service_{0}'.format(self.model.node_template.model_cls.name_column_name()) + + return self.model.node_template.iter( + filters={ + key: getattr(self.service, self.service.name_column_name()) + } + ) + + @property + def nodes(self): + """ + Iterates over nodes. + """ + key = 'service_{0}'.format(self.model.node.model_cls.name_column_name()) + return self.model.node.iter( + filters={ + key: getattr(self.service, self.service.name_column_name()) + } + ) + + @property + @contextmanager + def persist_changes(self): + yield + self._model.execution.update(self.execution) + + +class _CurrentContext(threading.local): + """ + Provides a thread-level context, with sugar for the task MAPI. + """ + + def __init__(self): + super(_CurrentContext, self).__init__() + self._workflow_context = None + + def _set(self, value): + self._workflow_context = value + + def get(self): + """ + Retrieves the current workflow context. + """ + if self._workflow_context is not None: + return self._workflow_context + raise ContextException("No context was set") + + @contextmanager + def push(self, workflow_context): + """ + Switches the current context to the provided context. + """ + prev_workflow_context = self._workflow_context + self._set(workflow_context) + try: + yield self + finally: + self._set(prev_workflow_context) + +current = _CurrentContext() |