summaryrefslogtreecommitdiffstats
path: root/azure/aria/aria-extension-cloudify/src/aria/aria/utils/threading.py
blob: f5ca30233febe347d1a8408ba5a3cf7cbc571f92 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""
Threading utilities.
"""

from __future__ import absolute_import  # so we can import standard 'threading'

import sys
import itertools
import multiprocessing
from threading import (Thread, Lock)
from Queue import (Queue, Full, Empty)

from .exceptions import print_exception

class ExecutorException(Exception):
    pass


class DaemonThread(Thread):
    def __init__(self, *args, **kwargs):
        super(DaemonThread, self).__init__(*args, **kwargs)
        self.daemon = True

    def run(self):
        """
        We're overriding ``Thread.run`` in order to avoid annoying (but harmless) error messages
        during shutdown. The problem is that CPython nullifies the global state _before_ shutting
        down daemon threads, so that exceptions might happen, and then ``Thread.__bootstrap_inner``
        prints them out.

        Our solution is to swallow these exceptions here.

        The side effect is that uncaught exceptions in our own thread code will _not_ be printed out
        as usual, so it's our responsibility to catch them in our code.
        """

        try:
            super(DaemonThread, self).run()
        except SystemExit as e:
            # This exception should be bubbled up
            raise e
        except BaseException:
            # Exceptions might occur in daemon threads during interpreter shutdown
            pass


# https://gist.github.com/tliron/81dd915166b0bfc64be08b4f8e22c835
class FixedThreadPoolExecutor(object):
    """
    Executes tasks in a fixed thread pool.

    Makes sure to gather all returned results and thrown exceptions in one place, in order of task
    submission.

    Example::

        def sum(arg1, arg2):
            return arg1 + arg2

        executor = FixedThreadPoolExecutor(10)
        try:
            for value in range(100):
                executor.submit(sum, value, value)
            executor.drain()
        except:
            executor.close()
        executor.raise_first()
        print executor.returns

    You can also use it with the Python ``with`` keyword, in which case you don't need to call
    ``close`` explicitly::

        with FixedThreadPoolExecutor(10) as executor:
            for value in range(100):
                executor.submit(sum, value, value)
            executor.drain()
            executor.raise_first()
            print executor.returns
    """

    _CYANIDE = object()  # Special task marker used to kill worker threads.

    def __init__(self,
                 size=None,
                 timeout=None,
                 print_exceptions=False):
        """
        :param size: number of threads in the pool; if ``None`` will use an optimal number for the
         platform
        :param timeout: timeout in seconds for all blocking operations (``None`` means no timeout)
        :param print_exceptions: set to ``True`` in order to print exceptions from tasks
        """
        if not size:
            try:
                size = multiprocessing.cpu_count() * 2 + 1
            except NotImplementedError:
                size = 3

        self.size = size
        self.timeout = timeout
        self.print_exceptions = print_exceptions

        self._tasks = Queue()
        self._returns = {}
        self._exceptions = {}
        self._id_creator = itertools.count()
        self._lock = Lock() # for console output

        self._workers = []
        for index in range(size):
            worker = DaemonThread(
                name='%s%d' % (self.__class__.__name__, index),
                target=self._thread_worker)
            worker.start()
            self._workers.append(worker)

    def submit(self, func, *args, **kwargs):
        """
        Submit a task for execution.

        The task will be called ASAP on the next available worker thread in the pool.

        :raises ExecutorException: if cannot be submitted
        """

        try:
            self._tasks.put((self._id_creator.next(), func, args, kwargs), timeout=self.timeout)
        except Full:
            raise ExecutorException('cannot submit task: queue is full')

    def close(self):
        """
        Blocks until all current tasks finish execution and all worker threads are dead.

        You cannot submit tasks anymore after calling this.

        This is called automatically upon exit if you are using the ``with`` keyword.
        """

        self.drain()
        while self.is_alive:
            try:
                self._tasks.put(self._CYANIDE, timeout=self.timeout)
            except Full:
                raise ExecutorException('cannot close executor: a thread seems to be hanging')
        self._workers = None

    def drain(self):
        """
        Blocks until all current tasks finish execution, but leaves the worker threads alive.
        """

        self._tasks.join()  # oddly, the API does not support a timeout parameter

    @property
    def is_alive(self):
        """
        True if any of the worker threads are alive.
        """

        for worker in self._workers:
            if worker.is_alive():
                return True
        return False

    @property
    def returns(self):
        """
        The returned values from all tasks, in order of submission.
        """

        return [self._returns[k] for k in sorted(self._returns)]

    @property
    def exceptions(self):
        """
        The raised exceptions from all tasks, in order of submission.
        """

        return [self._exceptions[k] for k in sorted(self._exceptions)]

    def raise_first(self):
        """
        If exceptions were thrown by any task, then the first one will be raised.

        This is rather arbitrary: proper handling would involve iterating all the exceptions.
        However, if you want to use the "raise" mechanism, you are limited to raising only one of
        them.
        """

        exceptions = self.exceptions
        if exceptions:
            raise exceptions[0]

    def _thread_worker(self):
        while True:
            if not self._execute_next_task():
                break

    def _execute_next_task(self):
        try:
            task = self._tasks.get(timeout=self.timeout)
        except Empty:
            # Happens if timeout is reached
            return True
        if task == self._CYANIDE:
            # Time to die :(
            return False
        self._execute_task(*task)
        return True

    def _execute_task(self, task_id, func, args, kwargs):
        try:
            result = func(*args, **kwargs)
            self._returns[task_id] = result
        except Exception as e:
            self._exceptions[task_id] = e
            if self.print_exceptions:
                with self._lock:
                    print_exception(e)
        self._tasks.task_done()

    def __enter__(self):
        return self

    def __exit__(self, the_type, value, traceback):
        self.close()
        return False


class LockedList(list):
    """
    A list that supports the ``with`` keyword with a built-in lock.

    Though Python lists are thread-safe in that they will not raise exceptions during concurrent
    access, they do not guarantee atomicity. This class will let you gain atomicity when needed.
    """

    def __init__(self, *args, **kwargs):
        super(LockedList, self).__init__(*args, **kwargs)
        self.lock = Lock()

    def __enter__(self):
        return self.lock.__enter__()

    def __exit__(self, the_type, value, traceback):
        return self.lock.__exit__(the_type, value, traceback)


class ExceptionThread(Thread):
    """
    A thread from which top level exceptions can be retrieved or re-raised.
    """
    def __init__(self, *args, **kwargs):
        Thread.__init__(self, *args, **kwargs)
        self.exception = None
        self.daemon = True

    def run(self):
        try:
            super(ExceptionThread, self).run()
        except BaseException:
            self.exception = sys.exc_info()

    def is_error(self):
        return self.exception is not None

    def raise_error_if_exists(self):
        if self.is_error():
            type_, value, trace = self.exception
            raise type_, value, trace