diff options
Diffstat (limited to 'policyhandler/policy_matcher.py')
-rw-r--r-- | policyhandler/policy_matcher.py | 264 |
1 files changed, 0 insertions, 264 deletions
diff --git a/policyhandler/policy_matcher.py b/policyhandler/policy_matcher.py deleted file mode 100644 index d0786ba..0000000 --- a/policyhandler/policy_matcher.py +++ /dev/null @@ -1,264 +0,0 @@ -# ================================================================================ -# Copyright (c) 2018 AT&T Intellectual Property. All rights reserved. -# ================================================================================ -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# ============LICENSE_END========================================================= -# -# ECOMP is a trademark and service mark of AT&T Intellectual Property. - -"""policy-matcher matches the policies from deployment-handler to policies from policy-engine""" - -import json -import logging -import re - -from .deploy_handler import DeployHandler, PolicyUpdateMessage -from .onap.audit import AuditHttpCode, AuditResponseCode -from .policy_consts import (ERRORED_POLICIES, LATEST_POLICIES, - MATCHING_CONDITIONS, POLICY_BODY, POLICY_FILTER, - POLICY_NAME, POLICY_VERSION, POLICY_VERSIONS) -from .policy_rest import PolicyRest -from .policy_utils import RegexCoarser - - -class PolicyMatcher(object): - """policy-matcher - static class""" - _logger = logging.getLogger("policy_handler.policy_matcher") - PENDING_UPDATE = "pending_update" - - @staticmethod - def get_deployed_policies(audit): - """get the deployed policies and policy-filters""" - deployed_policies, deployed_policy_filters = DeployHandler.get_deployed_policies(audit) - - if audit.is_not_found(): - warning_txt = "got no deployed policies or policy-filters" - PolicyMatcher._logger.warning(warning_txt) - return {"warning": warning_txt}, None, None - - if not audit.is_success() or (not deployed_policies and not deployed_policy_filters): - error_txt = "failed to retrieve policies from deployment-handler" - PolicyMatcher._logger.error(error_txt) - return {"error": error_txt}, None, None - - return None, deployed_policies, deployed_policy_filters - - - @staticmethod - def build_catch_up_message(audit, deployed_policies, deployed_policy_filters): - """ - find the latest policies from policy-engine for the deployed policies and policy-filters - """ - - if not (deployed_policies or deployed_policy_filters): - error_txt = "no deployed policies or policy-filters" - PolicyMatcher._logger.warning(error_txt) - return {"error": error_txt}, None - - coarse_regex_patterns = PolicyMatcher.calc_coarse_patterns( - audit, deployed_policies, deployed_policy_filters) - - if not coarse_regex_patterns: - error_txt = ("failed to construct the coarse_regex_patterns from " + - "deployed_policies: {} and deployed_policy_filters: {}" - .format(deployed_policies, deployed_policy_filters)) - PolicyMatcher._logger.error(audit.error( - error_txt, error_code=AuditResponseCode.DATA_ERROR)) - audit.set_http_status_code(AuditHttpCode.DATA_ERROR.value) - return {"error": error_txt}, None - - pdp_response = PolicyRest.get_latest_policies( - audit, policy_filters=[{POLICY_NAME: policy_name_pattern} - for policy_name_pattern in coarse_regex_patterns] - ) - - if not audit.is_success(): - error_txt = "failed to retrieve policies from policy-engine" - PolicyMatcher._logger.warning(error_txt) - return {"error": error_txt}, None - - latest_policies = pdp_response.get(LATEST_POLICIES, {}) - errored_policies = pdp_response.get(ERRORED_POLICIES, {}) - - latest_policies, changed_policies, policy_filter_matches = PolicyMatcher._match_policies( - audit, latest_policies, deployed_policies, deployed_policy_filters) - - errored_policies = dict((policy_id, policy) - for (policy_id, policy) in errored_policies.items() - if deployed_policies.get(policy_id, {}).get(POLICY_VERSIONS)) - - removed_policies = dict( - (policy_id, True) - for (policy_id, deployed_policy) in deployed_policies.items() - if deployed_policy.get(POLICY_VERSIONS) - and policy_id not in latest_policies - and policy_id not in errored_policies - ) - - return ({LATEST_POLICIES: latest_policies, ERRORED_POLICIES: errored_policies}, - PolicyUpdateMessage(changed_policies, - removed_policies, - policy_filter_matches)) - - - @staticmethod - def calc_coarse_patterns(audit, deployed_policies, deployed_policy_filters): - """calculate the coarsed patterns on policy-names in policies and policy-filters""" - coarse_regex = RegexCoarser() - for policy_id in deployed_policies or {}: - coarse_regex.add_regex_pattern(policy_id) - - for policy_filter in (deployed_policy_filters or {}).values(): - policy_name_pattern = policy_filter.get(POLICY_FILTER, {}).get(POLICY_NAME) - coarse_regex.add_regex_pattern(policy_name_pattern) - - coarse_regex_patterns = coarse_regex.get_coarse_regex_patterns() - PolicyMatcher._logger.debug( - audit.debug("coarse_regex_patterns({}) combined_regex_pattern({}) for patterns({})" - .format(coarse_regex_patterns, - coarse_regex.get_combined_regex_pattern(), - coarse_regex.patterns))) - return coarse_regex_patterns - - - @staticmethod - def match_to_deployed_policies(audit, policies_updated, policies_removed): - """match the policies_updated, policies_removed versus deployed policies""" - deployed_policies, deployed_policy_filters = DeployHandler.get_deployed_policies(audit) - if not audit.is_success(): - return {}, {}, {} - - _, changed_policies, policy_filter_matches = PolicyMatcher._match_policies( - audit, policies_updated, deployed_policies, deployed_policy_filters) - - policies_removed = dict((policy_id, policy) - for (policy_id, policy) in policies_removed.items() - if deployed_policies.get(policy_id, {}).get(POLICY_VERSIONS)) - - return changed_policies, policies_removed, policy_filter_matches - - - @staticmethod - def _match_policies(audit, policies, deployed_policies, deployed_policy_filters): - """ - Match policies to deployed policies either by policy_id or the policy-filters. - - Also calculates the policies that changed in comparison to deployed policies - """ - matching_policies = {} - changed_policies = {} - policy_filter_matches = {} - - policies = policies or {} - deployed_policies = deployed_policies or {} - deployed_policy_filters = deployed_policy_filters or {} - - for (policy_id, policy) in policies.items(): - new_version = policy.get(POLICY_BODY, {}).get(POLICY_VERSION) - deployed_policy = deployed_policies.get(policy_id) - - if deployed_policy: - matching_policies[policy_id] = policy - - policy_changed = (deployed_policy and new_version - and (deployed_policy.get(PolicyMatcher.PENDING_UPDATE) - or {new_version} ^ - deployed_policy.get(POLICY_VERSIONS, {}).keys())) - if policy_changed: - changed_policies[policy_id] = policy - policy_filter_matches[policy_id] = {} - - in_filters = False - for (policy_filter_id, policy_filter) in deployed_policy_filters.items(): - if not PolicyMatcher._match_policy_to_filter( - audit, policy_id, policy, - policy_filter_id, policy_filter.get(POLICY_FILTER)): - continue - - if policy_changed or not deployed_policy: - in_filters = True - if policy_id not in policy_filter_matches: - policy_filter_matches[policy_id] = {} - policy_filter_matches[policy_id][policy_filter_id] = True - - if not deployed_policy and in_filters: - matching_policies[policy_id] = policy - changed_policies[policy_id] = policy - - return matching_policies, changed_policies, policy_filter_matches - - - @staticmethod - def _match_policy_to_filter(audit, policy_id, policy, policy_filter_id, policy_filter): - """Match the policy to the policy-filter""" - if not policy_id or not policy or not policy_filter or not policy_filter_id: - return False - - filter_policy_name = policy_filter.get(POLICY_NAME) - if not filter_policy_name: - return False - - policy_body = policy.get(POLICY_BODY) - if not policy_body: - return False - - policy_name = policy_body.get(POLICY_NAME) - if not policy_name: - return False - - log_line = "policy {} to filter id {}: {}".format(json.dumps(policy), - policy_filter_id, - json.dumps(policy_filter)) - # PolicyMatcher._logger.debug(audit.debug("matching {}...".format(log_line))) - - if (filter_policy_name != policy_id and filter_policy_name != policy_name - and not re.match(filter_policy_name, policy_name)): - PolicyMatcher._logger.debug( - audit.debug("not match by policyName: {} != {}: {}" - .format(policy_name, filter_policy_name, log_line))) - return False - - matching_conditions = policy_body.get(MATCHING_CONDITIONS, {}) - if not isinstance(matching_conditions, dict): - return False - - filter_onap_name = policy_filter.get("onapName") - policy_onap_name = matching_conditions.get("ONAPName") - if filter_onap_name and filter_onap_name != policy_onap_name: - PolicyMatcher._logger.debug( - audit.debug("not match by ONAPName: {} != {}: {}" - .format(policy_onap_name, filter_onap_name, log_line))) - return False - - filter_config_name = policy_filter.get("configName") - policy_config_name = matching_conditions.get("ConfigName") - if filter_config_name and filter_config_name != policy_config_name: - PolicyMatcher._logger.debug( - audit.debug("not match by configName: {} != {}: {}" - .format(policy_config_name, filter_config_name, log_line))) - return False - - filter_config_attributes = policy_filter.get("configAttributes") - if filter_config_attributes and isinstance(filter_config_attributes, dict): - for filter_key, filter_config_attribute in filter_config_attributes.items(): - if (filter_key not in matching_conditions - or filter_config_attribute != matching_conditions.get(filter_key)): - PolicyMatcher._logger.debug( - audit.debug("not match by configAttributes: {} != {}: {}" - .format(json.dumps(matching_conditions), - json.dumps(filter_config_attributes), - log_line))) - return False - - PolicyMatcher._logger.debug(audit.debug("matched {}".format(log_line))) - return True |