summaryrefslogtreecommitdiffstats
path: root/python-dcae-policy/dcaepolicy/dcae_policy.py
blob: 1ae9b8f4d8b26b40268b692b866c57437db98c54 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
"""dcae_policy contains decorators for the policy lifecycle in cloudify"""

# org.onap.dcae
# ================================================================================
# Copyright (c) 2017 AT&T Intellectual Property. All rights reserved.
# ================================================================================
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ============LICENSE_END=========================================================
#
# ECOMP is a trademark and service mark of AT&T Intellectual Property.


import json
import uuid
import copy
from functools import wraps

import requests

from cloudify import ctx
from cloudify.context import NODE_INSTANCE
from cloudify.exceptions import NonRecoverableError

from .dcae_consul_client import ConsulClient

POLICIES = 'policies'
SERVICE_NAME_POLICY_HANDLER = "policy_handler"
X_ECOMP_REQUESTID = 'X-ECOMP-RequestID'

POLICY_ID = 'policy_id'
POLICY_APPLY_MODE = 'policy_apply_mode'
POLICY_BODY = 'policy_body'
POLICY_VERSION = "policyVersion"
POLICY_CONFIG = 'config'
DCAE_POLICY_TYPE = 'dcae.nodes.policy'
POLICY_MESSAGE_TYPE = 'policy'
POLICY_NOTIFICATION_SCRIPT = 'script'

class Policies(object):
    """static class for policy operations"""
    _policy_handler_url = None

    @staticmethod
    def _get_latest_policy(policy_id):
        """retrieve the latest policy for policy_id from policy-handler"""
        if not Policies._policy_handler_url:
            Policies._policy_handler_url = ConsulClient.get_service_url(SERVICE_NAME_POLICY_HANDLER)

        ph_path = "{0}/policy_latest/{1}".format(Policies._policy_handler_url, policy_id)
        headers = {X_ECOMP_REQUESTID: str(uuid.uuid4())}

        ctx.logger.info("getting latest policy from {0} headers={1}".format( \
            ph_path, json.dumps(headers)))
        res = requests.get(ph_path, headers=headers)
        res.raise_for_status()

        if res.status_code == requests.codes.ok:
            return res.json()
        return {}

    @staticmethod
    def populate_policy_on_node(func):
        """dcae.nodes.policy node retrieves the policy_body identified by policy_id property
        from policy-handler that gets it from policy-engine.

        Places the found policy into runtime_properties["policy_body"].
        """
        if not func:
            return

        @wraps(func)
        def wrapper(*args, **kwargs):
            """retrieve and save the latest policy body per policy_id"""
            try:
                if ctx.type != NODE_INSTANCE:
                    return func(*args, **kwargs)

                if POLICY_ID not in ctx.node.properties:
                    ctx.logger.error("no {0} found in ctx.node.properties".format(POLICY_ID))
                    return func(*args, **kwargs)

                policy_id = ctx.node.properties[POLICY_ID]
                policy = Policies._get_latest_policy(policy_id)
                if policy:
                    ctx.logger.info("found policy {0}".format(json.dumps(policy)))
                    if POLICY_BODY in policy:
                        ctx.instance.runtime_properties[POLICY_BODY] = policy[POLICY_BODY]
                else:
                    error = "policy not found for policy_id {0}".format(policy_id)
                    ctx.logger.error(error)
                    raise NonRecoverableError(error)

            except Exception as ex:
                error = "Failed to get the policy {0}".format(str(ex))
                ctx.logger.error(error)
                raise NonRecoverableError(error)

            return func(*args, **kwargs)
        return wrapper

    @staticmethod
    def gather_policies_to_node(func):
        """decorate with @Policies.gather_policies_to_node to
        gather the policies from dcae.nodes.policy nodes this node depends on.

        Places the policies into runtime_properties["policies"].

        Call Policies.shallow_merge_policies_into(config) to merge the policies into config.
        """
        def _merge_policy_with_node(target):
            """get all properties of the policy node and add the actual policy"""
            policy = dict(target.node.properties)
            if POLICY_BODY in target.instance.runtime_properties:
                policy[POLICY_BODY] = target.instance.runtime_properties[POLICY_BODY]
            return policy

        if not func:
            return

        @wraps(func)
        def wrapper(*args, **kwargs):
            """gather and save the policies from dcae.nodes.policy nodes this node related to"""
            try:
                if ctx.type == NODE_INSTANCE:
                    policies = dict([(rel.target.node.properties[POLICY_ID], \
                                    _merge_policy_with_node(rel.target)) \
                                for rel in ctx.instance.relationships \
                                    if DCAE_POLICY_TYPE in rel.target.node.type_hierarchy \
                                    and POLICY_ID in rel.target.node.properties \
                                    and rel.target.node.properties[POLICY_ID] \
                                ])
                    if policies:
                        ctx.instance.runtime_properties[POLICIES] = policies
            except Exception as ex:
                error = "Failed to set the policies {0}".format(str(ex))
                ctx.logger.error(error)
                raise NonRecoverableError(error)

            return func(*args, **kwargs)
        return wrapper

    @staticmethod
    def _update_policies_on_ctx(updated_policies):
        """update policies in runtime_properties and return changed_policies"""
        if POLICIES not in ctx.instance.runtime_properties:
            return
        if not updated_policies:
            ctx.logger.error("update_policies_on_ctx - no updated_policies provided in arguments")
            return

        policies = ctx.instance.runtime_properties[POLICIES]
        ctx.logger.info("update_policies_on_ctx: {0}".format(json.dumps(updated_policies)))
        changed_policies = []
        ignored_policies = []
        unexpected_policies = []
        same_policies = []
        for policy in updated_policies:
            if POLICY_ID not in policy or policy[POLICY_ID] not in policies:
                ignored_policies.append(policy)
                continue
            if POLICY_BODY not in policy or POLICY_VERSION not in policy[POLICY_BODY] \
            or not policy[POLICY_BODY][POLICY_VERSION]:
                unexpected_policies.append(policy)
                continue

            deployed_policy = policies[policy[POLICY_ID]].get(POLICY_BODY, {})
            new_policy_body = policy[POLICY_BODY]
            if not deployed_policy or POLICY_VERSION not in deployed_policy \
            or not deployed_policy[POLICY_VERSION] \
            or deployed_policy[POLICY_VERSION] != new_policy_body[POLICY_VERSION]:
                policies[policy[POLICY_ID]][POLICY_BODY] = new_policy_body
                changed_policies.append(dict(policies[policy[POLICY_ID]]))
            else:
                same_policies.append(policy)

        if same_policies:
            ctx.logger.info("same policies: {0}".format(json.dumps(same_policies)))
        if ignored_policies:
            ctx.logger.info("ignored policies: {0}".format(json.dumps(ignored_policies)))
        if unexpected_policies:
            ctx.logger.warn("unexpected policies: {0}".format(json.dumps(unexpected_policies)))

        if changed_policies:
            ctx.instance.runtime_properties[POLICIES] = policies
            return changed_policies

    @staticmethod
    def update_policies_on_node(configs_only=True):
        """decorate each policy_update operation with @Policies.update_policies_on_node to
        filter out the updated_policies to only what applies to the current node instance,
        update runtime_properties["policies"]

        :configs_only: - set to True if expect to see only the config in updated_policies
            instead of the whole policy object (False)

        Passes through the filtered list of updated_policies that apply to the current node instance

        :updated_policies: contains the list of changed policy-configs when configs_only=True.

        :notify_app_through_script: in kwargs is set to True/False to indicate whether to invoke
        the script based on policy_apply_mode property in the blueprint
        """
        def update_policies_decorator(func):
            """actual decorator"""
            if not func:
                return

            @wraps(func)
            def wrapper(updated_policies, **kwargs):
                """update matching policies on context"""
                if ctx.type != NODE_INSTANCE:
                    return

                updated_policies = Policies._update_policies_on_ctx(updated_policies)
                if updated_policies:
                    notify_app_through_script = max(
                        updated_policies,
                        key=lambda pol: pol.get(POLICY_APPLY_MODE) == POLICY_NOTIFICATION_SCRIPT
                    )

                    if configs_only:
                        updated_policies = [policy[POLICY_BODY][POLICY_CONFIG] \
                                            for policy in updated_policies \
                                                if POLICY_BODY in policy \
                                                and POLICY_CONFIG in policy[POLICY_BODY] \
                                           ]
                    return func(updated_policies,
                                notify_app_through_script=notify_app_through_script, **kwargs)
            return wrapper
        return update_policies_decorator

    @staticmethod
    def get_notify_app_through_script():
        """returns True if any of the policy has property policy_apply_mode==script"""
        if ctx.type != NODE_INSTANCE \
        or POLICIES not in ctx.instance.runtime_properties:
            return
        policies = ctx.instance.runtime_properties[POLICIES]
        if not policies:
            return
        for policy_id in policies:
            if policies[policy_id].get(POLICY_APPLY_MODE) == POLICY_NOTIFICATION_SCRIPT:
                return True

    @staticmethod
    def get_policy_configs():
        """returns the list of policy configs from the runtime policies"""
        if ctx.type != NODE_INSTANCE \
        or POLICIES not in ctx.instance.runtime_properties:
            return
        policies = ctx.instance.runtime_properties[POLICIES]
        if not policies:
            return
        policy_configs = [policies[policy_id][POLICY_BODY][POLICY_CONFIG] \
                          for policy_id in policies \
                            if POLICY_BODY in policies[policy_id] \
                            and POLICY_CONFIG in policies[policy_id][POLICY_BODY] \
                         ]
        return policy_configs

    @staticmethod
    def shallow_merge_policies_into(config):
        """shallow merge the policy configs (dict) into config that is expected to be a dict"""
        if config is None:
            config = {}
        policy_configs = Policies.get_policy_configs()
        if not policy_configs or not isinstance(config, dict):
            return config

        for policy_config in copy.deepcopy(policy_configs):
            if not isinstance(policy_config, dict):
                continue

            config.update(policy_config)
            for cfg_item in policy_config:
                if policy_config[cfg_item] is None:
                    config.pop(cfg_item, None)

        return config