diff options
Diffstat (limited to 'azure/aria/aria-extension-cloudify/src/aria/aria/orchestrator/workflows/executor/thread.py')
-rw-r--r-- | azure/aria/aria-extension-cloudify/src/aria/aria/orchestrator/workflows/executor/thread.py | 79 |
1 files changed, 79 insertions, 0 deletions
diff --git a/azure/aria/aria-extension-cloudify/src/aria/aria/orchestrator/workflows/executor/thread.py b/azure/aria/aria-extension-cloudify/src/aria/aria/orchestrator/workflows/executor/thread.py new file mode 100644 index 0000000..170620e --- /dev/null +++ b/azure/aria/aria-extension-cloudify/src/aria/aria/orchestrator/workflows/executor/thread.py @@ -0,0 +1,79 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +Thread task executor. +""" + +import Queue +import threading + +import sys + +from aria.utils import imports, exceptions + +from .base import BaseExecutor + + +class ThreadExecutor(BaseExecutor): + """ + Thread task executor. + + It's easier writing tests using this executor rather than the full-blown sub-process executor. + + Note: This executor is incapable of running plugin operations. + """ + + def __init__(self, pool_size=1, close_timeout=5, *args, **kwargs): + super(ThreadExecutor, self).__init__(*args, **kwargs) + self._stopped = False + self._close_timeout = close_timeout + self._queue = Queue.Queue() + self._pool = [] + for i in range(pool_size): + name = 'ThreadExecutor-{index}'.format(index=i+1) + thread = threading.Thread(target=self._processor, name=name) + thread.daemon = True + thread.start() + self._pool.append(thread) + + def _execute(self, ctx): + self._queue.put(ctx) + + def close(self): + self._stopped = True + for thread in self._pool: + if self._close_timeout is None: + thread.join() + else: + thread.join(self._close_timeout) + + def _processor(self): + while not self._stopped: + try: + ctx = self._queue.get(timeout=1) + self._task_started(ctx) + try: + task_func = imports.load_attribute(ctx.task.function) + arguments = dict(arg.unwrapped for arg in ctx.task.arguments.itervalues()) + task_func(ctx=ctx, **arguments) + self._task_succeeded(ctx) + except BaseException as e: + self._task_failed(ctx, + exception=e, + traceback=exceptions.get_exception_as_string(*sys.exc_info())) + # Daemon threads + except BaseException as e: + pass |