1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
|
# ================================================================================
# Copyright (c) 2018-2020 AT&T Intellectual Property. All rights reserved.
# ================================================================================
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ============LICENSE_END=========================================================
#
"""utils for policy usage and conversions"""
import re
from ..policy_consts import POLICY_BODY, POLICY_ID
from ..utils import Utils
from .pdp_consts import POLICY_CONFIG, POLICY_NAME, POLICY_VERSION
class PolicyUtils(object):
"""policy-client utils"""
_policy_name_ext = re.compile('[.][0-9]+[.][a-zA-Z]+$')
@staticmethod
def extract_policy_id(policy_name):
""" policy_name = policy_id + "." + <version> + "." + <extension>
For instance,
policy_name = DCAE_alex.Config_alex_policy_number_1.3.xml
policy_id = DCAE_alex.Config_alex_policy_number_1
policy_scope = DCAE_alex
policy_class = Config
policy_version = 3
type = extension = xml
delimiter = "."
policy_class_delimiter = "_"
policy_name in PAP = DCAE_alex.alex_policy_number_1
"""
if not policy_name:
return
return PolicyUtils._policy_name_ext.sub('', policy_name)
@staticmethod
def parse_policy_config(policy):
"""try parsing the config in policy."""
if not policy:
return policy
config = policy.get(POLICY_BODY, {}).get(POLICY_CONFIG)
if config:
policy[POLICY_BODY][POLICY_CONFIG] = Utils.safe_json_parse(config)
return policy
@staticmethod
def convert_to_policy(policy_body):
"""wrap policy_body received from policy-engine with policy_id."""
if not policy_body:
return None
policy_name = policy_body.get(POLICY_NAME)
policy_version = policy_body.get(POLICY_VERSION)
if not policy_name or policy_version is None:
return None
policy_id = PolicyUtils.extract_policy_id(policy_name)
if not policy_id:
return None
return {POLICY_ID:policy_id, POLICY_BODY:policy_body}
@staticmethod
def select_latest_policy(policy_bodies, expected_versions=None, ignore_policy_names=None):
"""For some reason, the policy-engine returns all version of the policy_bodies.
DCAE-Controller is only interested in the latest version
"""
if not policy_bodies:
return
latest_policy_body = {}
for policy_body in policy_bodies:
policy_name = policy_body.get(POLICY_NAME)
policy_version = policy_body.get(POLICY_VERSION)
if not policy_name or policy_version is None or not policy_version.isdigit():
continue
if expected_versions and policy_version not in expected_versions:
continue
if ignore_policy_names and policy_name in ignore_policy_names:
continue
if (not latest_policy_body
or int(latest_policy_body[POLICY_VERSION]) < int(policy_version)):
latest_policy_body = policy_body
return PolicyUtils.parse_policy_config(PolicyUtils.convert_to_policy(latest_policy_body))
@staticmethod
def select_latest_policies(policy_bodies):
"""For some reason, the policy-engine returns all version of the policy_bodies.
DCAE-Controller is only interested in the latest versions
"""
if not policy_bodies:
return {}
policies = {}
for policy_body in policy_bodies:
policy = PolicyUtils.convert_to_policy(policy_body)
if not policy:
continue
policy_id = policy.get(POLICY_ID)
policy_version = policy.get(POLICY_BODY, {}).get(POLICY_VERSION)
if not policy_id or policy_version is None or not policy_version.isdigit():
continue
if (policy_id not in policies
or int(policy_version) > int(policies[policy_id][POLICY_BODY][POLICY_VERSION])):
policies[policy_id] = policy
for policy_id in policies:
policies[policy_id] = PolicyUtils.parse_policy_config(policies[policy_id])
return policies
|