1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
|
# ================================================================================
# Copyright (c) 2019-2020 AT&T Intellectual Property. All rights reserved.
# ================================================================================
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ============LICENSE_END=========================================================
#
"""policy-matcher matches the policies from deployment-handler to policies from policy-engine"""
import os
from ..deploy_handler import DeployHandler, PolicyUpdateMessage
from ..policy_consts import (ERRORED_POLICIES, LATEST_POLICIES, POLICY_BODY,
POLICY_VERSIONS)
from ..utils import Utils
from .pdp_consts import POLICY_VERSION
from .policy_rest import PolicyRest
_LOGGER = Utils.get_logger(__file__)
class PolicyMatcher(object):
"""policy-matcher - static class"""
PENDING_UPDATE = "pending_update"
PDP_API_FOLDER = os.path.basename(os.path.dirname(os.path.realpath(__file__)))
@staticmethod
def build_catch_up_message(audit, deployed_policies, _=None):
"""find the latest policies from policy-engine for the deployed policies"""
if not deployed_policies:
error_txt = "no deployed policies"
_LOGGER.warning(error_txt)
return {"error": error_txt}, None
pdp_response = PolicyRest.get_latest_policies(audit, policy_ids=list(deployed_policies))
if not audit.is_success():
error_txt = "failed to retrieve policies from policy-engine"
_LOGGER.warning(error_txt)
return {"error": error_txt}, None
latest_policies = pdp_response.get(LATEST_POLICIES, {})
errored_policies = pdp_response.get(ERRORED_POLICIES, {})
latest_policies, changed_policies = PolicyMatcher._match_policies(
latest_policies, deployed_policies)
errored_policies = dict((policy_id, policy)
for (policy_id, policy) in errored_policies.items()
if deployed_policies.get(policy_id, {}).get(POLICY_VERSIONS))
removed_policies = dict(
(policy_id, True)
for (policy_id, deployed_policy) in deployed_policies.items()
if deployed_policy.get(POLICY_VERSIONS)
and policy_id not in latest_policies
and policy_id not in errored_policies
)
return ({LATEST_POLICIES: latest_policies, ERRORED_POLICIES: errored_policies},
PolicyUpdateMessage(changed_policies, removed_policies))
@staticmethod
def match_to_deployed_policies(audit, policies_updated, policies_removed):
"""match the policies_updated, policies_removed versus deployed policies"""
_, deployed_policies, _ = DeployHandler.get_deployed_policies(audit)
if not audit.is_success():
return {}, {}, {}
_, changed_policies = PolicyMatcher._match_policies(policies_updated, deployed_policies)
policies_removed = dict((policy_id, policy)
for (policy_id, policy) in policies_removed.items()
if deployed_policies.get(policy_id, {}).get(POLICY_VERSIONS))
return changed_policies, policies_removed, {}
@staticmethod
def _match_policies(policies, deployed_policies):
"""
Match policies to deployed policies by policy_id.
Also calculates the policies that changed in comparison to deployed policies
"""
matching_policies = {}
changed_policies = {}
policies = policies or {}
deployed_policies = deployed_policies or {}
for (policy_id, policy) in policies.items():
new_version = policy.get(POLICY_BODY, {}).get(POLICY_VERSION)
deployed_policy = deployed_policies.get(policy_id)
if deployed_policy:
matching_policies[policy_id] = policy
policy_changed = (deployed_policy and new_version
and (deployed_policy.get(PolicyMatcher.PENDING_UPDATE)
or {new_version} ^
deployed_policy.get(POLICY_VERSIONS, {}).keys()))
if policy_changed:
changed_policies[policy_id] = policy
return matching_policies, changed_policies
|