summaryrefslogtreecommitdiffstats
path: root/azure/aria/aria-extension-cloudify/adapters/extension.py
blob: bab472e0c8e7cdc4276bef5c9d072adb998debad (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
#
# Copyright (c) 2017 GigaSpaces Technologies Ltd. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#

from functools import wraps
from contextlib import contextmanager

from aria import extension as aria_extension

from .context_adapter import CloudifyContextAdapter


@aria_extension.process_executor
class CloudifyExecutorExtension(object):

    def decorate(self):
        def decorator(function):
            @wraps(function)
            def wrapper(ctx, **operation_inputs):
                # We assume that any Cloudify-based plugin would use the plugins-common, thus two
                # different paths are created
                is_cloudify_dependent = ctx.task.plugin and any(
                    'cloudify_plugins_common' in w for w in ctx.task.plugin.wheels)

                if is_cloudify_dependent:
                    from cloudify import context
                    from cloudify.exceptions import (NonRecoverableError, RecoverableError)

                    with ctx.model.instrument(*ctx.INSTRUMENTATION_FIELDS):
                        # We need to create a new class dynamically, since CloudifyContextAdapter
                        # doesn't exist at runtime
                        ctx_adapter = type('_CloudifyContextAdapter',
                                           (CloudifyContextAdapter, context.CloudifyContext),
                                           {}, )(ctx)

                        exception = None
                        with _push_cfy_ctx(ctx_adapter, operation_inputs):
                            try:
                                function(ctx=ctx_adapter, **operation_inputs)
                            except NonRecoverableError as e:
                                ctx.task.abort(str(e))
                            except RecoverableError as e:
                                ctx.task.retry(str(e), retry_interval=e.retry_after)
                            except BaseException as e:
                                # Keep exception and raise it outside of "with", because
                                # contextmanager does not allow raising exceptions
                                exception = e
                        if exception is not None:
                            raise exception
                else:
                    function(ctx=ctx, **operation_inputs)
            return wrapper
        return decorator


@contextmanager
def _push_cfy_ctx(ctx, params):
    from cloudify import state

    try:
        # Support for Cloudify > 4.0
        with state.current_ctx.push(ctx, params) as current_ctx:
            yield current_ctx

    except AttributeError:
        # Support for Cloudify < 4.0
        try:
            original_ctx = state.current_ctx.get_ctx()
        except RuntimeError:
            original_ctx = None
        try:
            original_params = state.current_ctx.get_parameters()
        except RuntimeError:
            original_params = None

        state.current_ctx.set(ctx, params)
        try:
            yield state.current_ctx.get_ctx()
        finally:
            state.current_ctx.set(original_ctx, original_params)