summaryrefslogtreecommitdiffstats
path: root/azure/aria/aria-extension-cloudify/src/aria/aria/utils/threading.py
diff options
context:
space:
mode:
Diffstat (limited to 'azure/aria/aria-extension-cloudify/src/aria/aria/utils/threading.py')
-rw-r--r--azure/aria/aria-extension-cloudify/src/aria/aria/utils/threading.py286
1 files changed, 286 insertions, 0 deletions
diff --git a/azure/aria/aria-extension-cloudify/src/aria/aria/utils/threading.py b/azure/aria/aria-extension-cloudify/src/aria/aria/utils/threading.py
new file mode 100644
index 0000000..f5ca302
--- /dev/null
+++ b/azure/aria/aria-extension-cloudify/src/aria/aria/utils/threading.py
@@ -0,0 +1,286 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""
+Threading utilities.
+"""
+
+from __future__ import absolute_import # so we can import standard 'threading'
+
+import sys
+import itertools
+import multiprocessing
+from threading import (Thread, Lock)
+from Queue import (Queue, Full, Empty)
+
+from .exceptions import print_exception
+
+class ExecutorException(Exception):
+ pass
+
+
+class DaemonThread(Thread):
+ def __init__(self, *args, **kwargs):
+ super(DaemonThread, self).__init__(*args, **kwargs)
+ self.daemon = True
+
+ def run(self):
+ """
+ We're overriding ``Thread.run`` in order to avoid annoying (but harmless) error messages
+ during shutdown. The problem is that CPython nullifies the global state _before_ shutting
+ down daemon threads, so that exceptions might happen, and then ``Thread.__bootstrap_inner``
+ prints them out.
+
+ Our solution is to swallow these exceptions here.
+
+ The side effect is that uncaught exceptions in our own thread code will _not_ be printed out
+ as usual, so it's our responsibility to catch them in our code.
+ """
+
+ try:
+ super(DaemonThread, self).run()
+ except SystemExit as e:
+ # This exception should be bubbled up
+ raise e
+ except BaseException:
+ # Exceptions might occur in daemon threads during interpreter shutdown
+ pass
+
+
+# https://gist.github.com/tliron/81dd915166b0bfc64be08b4f8e22c835
+class FixedThreadPoolExecutor(object):
+ """
+ Executes tasks in a fixed thread pool.
+
+ Makes sure to gather all returned results and thrown exceptions in one place, in order of task
+ submission.
+
+ Example::
+
+ def sum(arg1, arg2):
+ return arg1 + arg2
+
+ executor = FixedThreadPoolExecutor(10)
+ try:
+ for value in range(100):
+ executor.submit(sum, value, value)
+ executor.drain()
+ except:
+ executor.close()
+ executor.raise_first()
+ print executor.returns
+
+ You can also use it with the Python ``with`` keyword, in which case you don't need to call
+ ``close`` explicitly::
+
+ with FixedThreadPoolExecutor(10) as executor:
+ for value in range(100):
+ executor.submit(sum, value, value)
+ executor.drain()
+ executor.raise_first()
+ print executor.returns
+ """
+
+ _CYANIDE = object() # Special task marker used to kill worker threads.
+
+ def __init__(self,
+ size=None,
+ timeout=None,
+ print_exceptions=False):
+ """
+ :param size: number of threads in the pool; if ``None`` will use an optimal number for the
+ platform
+ :param timeout: timeout in seconds for all blocking operations (``None`` means no timeout)
+ :param print_exceptions: set to ``True`` in order to print exceptions from tasks
+ """
+ if not size:
+ try:
+ size = multiprocessing.cpu_count() * 2 + 1
+ except NotImplementedError:
+ size = 3
+
+ self.size = size
+ self.timeout = timeout
+ self.print_exceptions = print_exceptions
+
+ self._tasks = Queue()
+ self._returns = {}
+ self._exceptions = {}
+ self._id_creator = itertools.count()
+ self._lock = Lock() # for console output
+
+ self._workers = []
+ for index in range(size):
+ worker = DaemonThread(
+ name='%s%d' % (self.__class__.__name__, index),
+ target=self._thread_worker)
+ worker.start()
+ self._workers.append(worker)
+
+ def submit(self, func, *args, **kwargs):
+ """
+ Submit a task for execution.
+
+ The task will be called ASAP on the next available worker thread in the pool.
+
+ :raises ExecutorException: if cannot be submitted
+ """
+
+ try:
+ self._tasks.put((self._id_creator.next(), func, args, kwargs), timeout=self.timeout)
+ except Full:
+ raise ExecutorException('cannot submit task: queue is full')
+
+ def close(self):
+ """
+ Blocks until all current tasks finish execution and all worker threads are dead.
+
+ You cannot submit tasks anymore after calling this.
+
+ This is called automatically upon exit if you are using the ``with`` keyword.
+ """
+
+ self.drain()
+ while self.is_alive:
+ try:
+ self._tasks.put(self._CYANIDE, timeout=self.timeout)
+ except Full:
+ raise ExecutorException('cannot close executor: a thread seems to be hanging')
+ self._workers = None
+
+ def drain(self):
+ """
+ Blocks until all current tasks finish execution, but leaves the worker threads alive.
+ """
+
+ self._tasks.join() # oddly, the API does not support a timeout parameter
+
+ @property
+ def is_alive(self):
+ """
+ True if any of the worker threads are alive.
+ """
+
+ for worker in self._workers:
+ if worker.is_alive():
+ return True
+ return False
+
+ @property
+ def returns(self):
+ """
+ The returned values from all tasks, in order of submission.
+ """
+
+ return [self._returns[k] for k in sorted(self._returns)]
+
+ @property
+ def exceptions(self):
+ """
+ The raised exceptions from all tasks, in order of submission.
+ """
+
+ return [self._exceptions[k] for k in sorted(self._exceptions)]
+
+ def raise_first(self):
+ """
+ If exceptions were thrown by any task, then the first one will be raised.
+
+ This is rather arbitrary: proper handling would involve iterating all the exceptions.
+ However, if you want to use the "raise" mechanism, you are limited to raising only one of
+ them.
+ """
+
+ exceptions = self.exceptions
+ if exceptions:
+ raise exceptions[0]
+
+ def _thread_worker(self):
+ while True:
+ if not self._execute_next_task():
+ break
+
+ def _execute_next_task(self):
+ try:
+ task = self._tasks.get(timeout=self.timeout)
+ except Empty:
+ # Happens if timeout is reached
+ return True
+ if task == self._CYANIDE:
+ # Time to die :(
+ return False
+ self._execute_task(*task)
+ return True
+
+ def _execute_task(self, task_id, func, args, kwargs):
+ try:
+ result = func(*args, **kwargs)
+ self._returns[task_id] = result
+ except Exception as e:
+ self._exceptions[task_id] = e
+ if self.print_exceptions:
+ with self._lock:
+ print_exception(e)
+ self._tasks.task_done()
+
+ def __enter__(self):
+ return self
+
+ def __exit__(self, the_type, value, traceback):
+ self.close()
+ return False
+
+
+class LockedList(list):
+ """
+ A list that supports the ``with`` keyword with a built-in lock.
+
+ Though Python lists are thread-safe in that they will not raise exceptions during concurrent
+ access, they do not guarantee atomicity. This class will let you gain atomicity when needed.
+ """
+
+ def __init__(self, *args, **kwargs):
+ super(LockedList, self).__init__(*args, **kwargs)
+ self.lock = Lock()
+
+ def __enter__(self):
+ return self.lock.__enter__()
+
+ def __exit__(self, the_type, value, traceback):
+ return self.lock.__exit__(the_type, value, traceback)
+
+
+class ExceptionThread(Thread):
+ """
+ A thread from which top level exceptions can be retrieved or re-raised.
+ """
+ def __init__(self, *args, **kwargs):
+ Thread.__init__(self, *args, **kwargs)
+ self.exception = None
+ self.daemon = True
+
+ def run(self):
+ try:
+ super(ExceptionThread, self).run()
+ except BaseException:
+ self.exception = sys.exc_info()
+
+ def is_error(self):
+ return self.exception is not None
+
+ def raise_error_if_exists(self):
+ if self.is_error():
+ type_, value, trace = self.exception
+ raise type_, value, trace