1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
|
#
# Copyright (c) 2017 GigaSpaces Technologies Ltd. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
from functools import wraps
from contextlib import contextmanager
from aria import extension as aria_extension
from .context_adapter import CloudifyContextAdapter
@aria_extension.process_executor
class CloudifyExecutorExtension(object):
def decorate(self):
def decorator(function):
@wraps(function)
def wrapper(ctx, **operation_inputs):
# We assume that any Cloudify-based plugin would use the plugins-common, thus two
# different paths are created
is_cloudify_dependent = ctx.task.plugin and any(
'cloudify_plugins_common' in w for w in ctx.task.plugin.wheels)
if is_cloudify_dependent:
from cloudify import context
from cloudify.exceptions import (NonRecoverableError, RecoverableError)
with ctx.model.instrument(*ctx.INSTRUMENTATION_FIELDS):
# We need to create a new class dynamically, since CloudifyContextAdapter
# doesn't exist at runtime
ctx_adapter = type('_CloudifyContextAdapter',
(CloudifyContextAdapter, context.CloudifyContext),
{}, )(ctx)
exception = None
with _push_cfy_ctx(ctx_adapter, operation_inputs):
try:
function(ctx=ctx_adapter, **operation_inputs)
except NonRecoverableError as e:
ctx.task.abort(str(e))
except RecoverableError as e:
ctx.task.retry(str(e), retry_interval=e.retry_after)
except BaseException as e:
# Keep exception and raise it outside of "with", because
# contextmanager does not allow raising exceptions
exception = e
if exception is not None:
raise exception
else:
function(ctx=ctx, **operation_inputs)
return wrapper
return decorator
@contextmanager
def _push_cfy_ctx(ctx, params):
from cloudify import state
try:
# Support for Cloudify > 4.0
with state.current_ctx.push(ctx, params) as current_ctx:
yield current_ctx
except AttributeError:
# Support for Cloudify < 4.0
try:
original_ctx = state.current_ctx.get_ctx()
except RuntimeError:
original_ctx = None
try:
original_params = state.current_ctx.get_parameters()
except RuntimeError:
original_params = None
state.current_ctx.set(ctx, params)
try:
yield state.current_ctx.get_ctx()
finally:
state.current_ctx.set(original_ctx, original_params)
|