summaryrefslogtreecommitdiffstats
path: root/azure/aria/aria-extension-cloudify/src/aria/aria/orchestrator/context/workflow.py
blob: 738d2fd335184de1da07656a15f6e32fdab798ac (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""
Workflow context.
"""

import threading
from contextlib import contextmanager

from .exceptions import ContextException
from .common import BaseContext


class WorkflowContext(BaseContext):
    """
    Context used during workflow creation and execution.
    """
    def __init__(self,
                 workflow_name,
                 parameters=None,
                 task_max_attempts=1,
                 task_retry_interval=0,
                 task_ignore_failure=False,
                 *args, **kwargs):
        super(WorkflowContext, self).__init__(*args, **kwargs)
        self._workflow_name = workflow_name
        self._parameters = parameters or {}
        self._task_max_attempts = task_max_attempts
        self._task_retry_interval = task_retry_interval
        self._task_ignore_failure = task_ignore_failure
        self._execution_graph = None
        self._register_logger()

    def __repr__(self):
        return (
            '{name}(deployment_id={self._service_id}, '
            'workflow_name={self._workflow_name}, execution_id={self._execution_id})'.format(
                name=self.__class__.__name__, self=self))

    @property
    def workflow_name(self):
        return self._workflow_name

    @property
    def execution(self):
        """
        Execution model.
        """
        return self.model.execution.get(self._execution_id)

    @execution.setter
    def execution(self, value):
        """
        Stores the execution in the storage model API ("MAPI").
        """
        self.model.execution.put(value)

    @property
    def node_templates(self):
        """
        Iterates over nodes templates.
        """
        key = 'service_{0}'.format(self.model.node_template.model_cls.name_column_name())

        return self.model.node_template.iter(
            filters={
                key: getattr(self.service, self.service.name_column_name())
            }
        )

    @property
    def nodes(self):
        """
        Iterates over nodes.
        """
        key = 'service_{0}'.format(self.model.node.model_cls.name_column_name())
        return self.model.node.iter(
            filters={
                key: getattr(self.service, self.service.name_column_name())
            }
        )

    @property
    @contextmanager
    def persist_changes(self):
        yield
        self._model.execution.update(self.execution)


class _CurrentContext(threading.local):
    """
    Provides a thread-level context, with sugar for the task MAPI.
    """

    def __init__(self):
        super(_CurrentContext, self).__init__()
        self._workflow_context = None

    def _set(self, value):
        self._workflow_context = value

    def get(self):
        """
        Retrieves the current workflow context.
        """
        if self._workflow_context is not None:
            return self._workflow_context
        raise ContextException("No context was set")

    @contextmanager
    def push(self, workflow_context):
        """
        Switches the current context to the provided context.
        """
        prev_workflow_context = self._workflow_context
        self._set(workflow_context)
        try:
            yield self
        finally:
            self._set(prev_workflow_context)

current = _CurrentContext()