aboutsummaryrefslogtreecommitdiffstats
path: root/policyhandler/policy_utils.py
diff options
context:
space:
mode:
authorAlex Shatov <alexs@att.com>2019-04-01 11:32:06 -0400
committerAlex Shatov <alexs@att.com>2019-04-01 11:32:06 -0400
commit9a4d3c5b8dc9c7697275cab38ee45b014dff9e55 (patch)
treed4d55bcc8bc237ee3199d0e6a13f5e7cd95fadea /policyhandler/policy_utils.py
parentebc1a062328e53e97e4d24ed111534cfc567a809 (diff)
5.0.0 policy-handler - new PDP API or old PDP API4.0.0-ONAPdublin
- in R4 Dublin the policy-engine introduced a totally new API - policy-handler now has a startup option to either use the new PDP API or the old PDP API that was created-updated before the end of 2018 - see README.md and README_pdp_api_v0.md for instructions on how to setup the policy-handler running either with the new PDP API or the old (pdp_api_v0) PDP API - this is a massive refactoring that changed almost all the source files, but kept the old logic when using the old (pdp_api_v0) PDP API - all the code related to PDP API version is split into two subfolders = pdp_api/ contains the new PDP API source code = pdp_api_v0/ contains the old (2018) PDP API source code = pdp_client.py imports from either pdp_api or pdp_api_v0 = the rest of the code is only affected when it needs to branch the logic - logging to policy_handler.log now shows the path of the source file to allow tracing which PDP API is actually used - when the new PDP API is used, the policy-update flow is disabled = passive mode of operation = no web-socket = no periodic catch_up = no policy-filters = reduced web-API - only a single /policy_latest endpoint is available /policies_latest returns 404 /catch_up request is accepted, but ignored - on new PDP API: http /policy_latest returns the new data from the new PDP API with the following fields added by the policy-handler to keep other policy related parts intact in R4 (see pdp_api/policy_utils.py) = "policyName" = policy_id + "." + "policyVersion" + ".xml" = "policyVersion" = str("metadata"."policy-version") = "config" - is the renamed "properties" from the new PDP API response - unit tests are split into two subfolders as well = main/ for the new PDP API testing = pdp_api_v0/ for the old (2018) PDP API - removed the following line from the license text of changed files ECOMP is a trademark and service mark of AT&T Intellectual Property. - the new PDP API is expected to be extended and redesigned in R5 El Alto - on retiring the old PDP API - the intention is to be able to remove the pdp_api_v0/ subfolder and minimal related cleanup of the code that imports that as well as the cleanup of the config.py, etc. Change-Id: Ief9a2ae4541300308caaf97377f4ed051535dbe4 Signed-off-by: Alex Shatov <alexs@att.com> Issue-ID: DCAEGEN2-1128
Diffstat (limited to 'policyhandler/policy_utils.py')
-rw-r--r--policyhandler/policy_utils.py380
1 files changed, 0 insertions, 380 deletions
diff --git a/policyhandler/policy_utils.py b/policyhandler/policy_utils.py
deleted file mode 100644
index 08d26f0..0000000
--- a/policyhandler/policy_utils.py
+++ /dev/null
@@ -1,380 +0,0 @@
-# ================================================================================
-# Copyright (c) 2018 AT&T Intellectual Property. All rights reserved.
-# ================================================================================
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-# ============LICENSE_END=========================================================
-#
-# ECOMP is a trademark and service mark of AT&T Intellectual Property.
-
-"""utils for policy usage and conversions"""
-
-import json
-import logging
-import re
-from copy import deepcopy
-from typing import Pattern
-
-from .policy_consts import (POLICY_BODY, POLICY_CONFIG, POLICY_ID, POLICY_NAME,
- POLICY_VERSION)
-
-
-class PolicyUtils(object):
- """policy-client utils"""
- _logger = logging.getLogger("policy_handler.policy_utils")
- _policy_name_ext = re.compile('[.][0-9]+[.][a-zA-Z]+$')
-
- @staticmethod
- def extract_policy_id(policy_name):
- """ policy_name = policy_id + "." + <version> + "." + <extension>
- For instance,
- policy_name = DCAE_alex.Config_alex_policy_number_1.3.xml
- policy_id = DCAE_alex.Config_alex_policy_number_1
- policy_scope = DCAE_alex
- policy_class = Config
- policy_version = 3
- type = extension = xml
- delimiter = "."
- policy_class_delimiter = "_"
- policy_name in PAP = DCAE_alex.alex_policy_number_1
- """
- if not policy_name:
- return
- return PolicyUtils._policy_name_ext.sub('', policy_name)
-
- @staticmethod
- def parse_policy_config(policy):
- """try parsing the config in policy."""
- if not policy:
- return policy
- config = policy.get(POLICY_BODY, {}).get(POLICY_CONFIG)
- if config:
- policy[POLICY_BODY][POLICY_CONFIG] = Utils.safe_json_parse(config)
- return policy
-
- @staticmethod
- def convert_to_policy(policy_body):
- """wrap policy_body received from policy-engine with policy_id."""
- if not policy_body:
- return None
- policy_name = policy_body.get(POLICY_NAME)
- policy_version = policy_body.get(POLICY_VERSION)
- if not policy_name or not policy_version:
- return None
- policy_id = PolicyUtils.extract_policy_id(policy_name)
- if not policy_id:
- return None
- return {POLICY_ID:policy_id, POLICY_BODY:policy_body}
-
- @staticmethod
- def select_latest_policy(policy_bodies, expected_versions=None, ignore_policy_names=None):
- """For some reason, the policy-engine returns all version of the policy_bodies.
- DCAE-Controller is only interested in the latest version
- """
- if not policy_bodies:
- return
- latest_policy_body = {}
- for policy_body in policy_bodies:
- policy_name = policy_body.get(POLICY_NAME)
- policy_version = policy_body.get(POLICY_VERSION)
- if not policy_name or not policy_version or not policy_version.isdigit():
- continue
- if expected_versions and policy_version not in expected_versions:
- continue
- if ignore_policy_names and policy_name in ignore_policy_names:
- continue
-
- if (not latest_policy_body
- or int(latest_policy_body[POLICY_VERSION]) < int(policy_version)):
- latest_policy_body = policy_body
-
- return PolicyUtils.parse_policy_config(PolicyUtils.convert_to_policy(latest_policy_body))
-
- @staticmethod
- def select_latest_policies(policy_bodies):
- """For some reason, the policy-engine returns all version of the policy_bodies.
- DCAE-Controller is only interested in the latest versions
- """
- if not policy_bodies:
- return {}
- policies = {}
- for policy_body in policy_bodies:
- policy = PolicyUtils.convert_to_policy(policy_body)
- if not policy:
- continue
- policy_id = policy.get(POLICY_ID)
- policy_version = policy.get(POLICY_BODY, {}).get(POLICY_VERSION)
- if not policy_id or not policy_version or not policy_version.isdigit():
- continue
- if (policy_id not in policies
- or int(policy_version) > int(policies[policy_id][POLICY_BODY][POLICY_VERSION])):
- policies[policy_id] = policy
-
- for policy_id in policies:
- policies[policy_id] = PolicyUtils.parse_policy_config(policies[policy_id])
-
- return policies
-
-class Utils(object):
- """general purpose utils"""
- _logger = logging.getLogger("policy_handler.utils")
-
- @staticmethod
- def safe_json_parse(json_str):
- """try parsing json without exception - returns the json_str back if fails"""
- if not json_str:
- return json_str
- try:
- return json.loads(json_str)
- except (ValueError, TypeError) as err:
- Utils._logger.warning("unexpected json error(%s): len(%s) str[:100]: (%s)",
- str(err), len(json_str), str(json_str)[:100])
- return json_str
-
- @staticmethod
- def are_the_same(body_1, body_2, json_dumps=None):
- """check whether both objects are the same"""
- if not json_dumps:
- json_dumps = json.dumps
- if (body_1 and not body_2) or (not body_1 and body_2):
- Utils._logger.debug("only one is empty %s != %s", body_1, body_2)
- return False
-
- if body_1 is None and body_2 is None:
- return True
-
- if isinstance(body_1, list) and isinstance(body_2, list):
- if len(body_1) != len(body_2):
- Utils._logger.debug("len %s != %s", json_dumps(body_1), json_dumps(body_2))
- return False
-
- for val_1, val_2 in zip(body_1, body_2):
- if not Utils.are_the_same(val_1, val_2, json_dumps):
- return False
- return True
-
- if isinstance(body_1, dict) and isinstance(body_2, dict):
- if body_1.keys() ^ body_2.keys():
- Utils._logger.debug("keys %s != %s", json_dumps(body_1), json_dumps(body_2))
- return False
-
- for key, val_1 in body_1.items():
- if not Utils.are_the_same(val_1, body_2[key], json_dumps):
- return False
- return True
-
- # ... here when primitive values or mismatched types ...
- the_same_values = (body_1 == body_2)
- if not the_same_values:
- Utils._logger.debug("values %s != %s", body_1, body_2)
- return the_same_values
-
-class RegexCoarser(object):
- """
- utility to combine or coarse the collection of regex patterns
- into a single regex that is at least not narrower (wider or the same)
- than the collection regexes
-
- inspired by https://github.com/spadgos/regex-combiner in js
- """
- ENDER = '***'
- GROUPERS = {'{': '}', '[': ']', '(': ')'}
- MODIFIERS = '*?+'
- CHOICE_STARTER = '('
- HIDDEN_CHOICE_STARTER = '(?:'
- ANY_CHARS = '.*'
- LINE_START = '^'
-
- def __init__(self, regex_patterns=None):
- """regex coarser"""
- self.trie = {}
- self.patterns = []
- self.add_regex_patterns(regex_patterns)
-
- def get_combined_regex_pattern(self):
- """gets the pattern for the combined regex"""
- trie = deepcopy(self.trie)
- RegexCoarser._compress(trie)
- return RegexCoarser._trie_to_pattern(trie)
-
- def get_coarse_regex_patterns(self, max_length=100):
- """gets the patterns for the coarse regex"""
- trie = deepcopy(self.trie)
- RegexCoarser._compress(trie)
- patterns = RegexCoarser._trie_to_pattern(trie, True)
-
- root_patterns = []
- for pattern in patterns:
- left, _, choice = pattern.partition(RegexCoarser.CHOICE_STARTER)
- if choice and left and left.strip() != RegexCoarser.LINE_START and not left.isspace():
- pattern = left + RegexCoarser.ANY_CHARS
- root_patterns.append(pattern)
- root_patterns = RegexCoarser._join_patterns(root_patterns, max_length)
-
- if not root_patterns or root_patterns == ['']:
- return []
- return root_patterns
-
-
- def add_regex_patterns(self, new_regex_patterns):
- """adds the new_regex patterns to RegexPatternCoarser"""
- if not new_regex_patterns or not isinstance(new_regex_patterns, list):
- return
- for new_regex_pattern in new_regex_patterns:
- self.add_regex_pattern(new_regex_pattern)
-
- def add_regex_pattern(self, new_regex_pattern):
- """adds the new_regex to RegexPatternCoarser"""
- new_regex_pattern = RegexCoarser._regex_pattern_to_string(new_regex_pattern)
- if not new_regex_pattern:
- return
-
- self.patterns.append(new_regex_pattern)
-
- tokens = RegexCoarser._tokenize(new_regex_pattern)
- last_token_idx = len(tokens) - 1
- trie_node = self.trie
- for idx, token in enumerate(tokens):
- if token not in trie_node:
- trie_node[token] = {}
- if idx == last_token_idx:
- trie_node[token][RegexCoarser.ENDER] = {}
- trie_node = trie_node[token]
-
- @staticmethod
- def _regex_pattern_to_string(regex_pattern):
- """convert regex pattern to string"""
- if not regex_pattern:
- return ''
-
- if isinstance(regex_pattern, str):
- return regex_pattern
-
- if isinstance(regex_pattern, Pattern):
- return regex_pattern.pattern
- return None
-
- @staticmethod
- def _tokenize(regex_pattern):
- """tokenize the regex pattern for trie assignment"""
- tokens = []
- token = ''
- group_ender = None
- use_next = False
-
- for char in regex_pattern:
- if use_next:
- use_next = False
- token += char
- char = None
-
- if char == '\\':
- use_next = True
- token += char
- continue
-
- if not group_ender and char in RegexCoarser.GROUPERS:
- group_ender = RegexCoarser.GROUPERS[char]
- token = char
- char = None
-
- if char is None:
- pass
- elif char == group_ender:
- token += char
- group_ender = None
- if char == '}': # this group is a modifier
- tokens[len(tokens) - 1] += token
- token = ''
- continue
- elif char in RegexCoarser.MODIFIERS:
- if group_ender:
- token += char
- else:
- tokens[len(tokens) - 1] += char
- continue
- else:
- token += char
-
- if not group_ender:
- tokens.append(token)
- token = ''
-
- if token:
- tokens.append(token)
- return tokens
-
- @staticmethod
- def _compress(trie):
- """compress trie into shortest leaves"""
- for key, subtrie in trie.items():
- RegexCoarser._compress(subtrie)
- subkeys = list(subtrie.keys())
- if len(subkeys) == 1:
- trie[key + subkeys[0]] = subtrie[subkeys[0]]
- del trie[key]
-
- @staticmethod
- def _trie_to_pattern(trie, top_keep=False):
- """convert trie to the regex pattern"""
- patterns = [
- key.replace(RegexCoarser.ENDER, '') + RegexCoarser._trie_to_pattern(subtrie)
- for key, subtrie in trie.items()
- ]
-
- if top_keep:
- return patterns
-
- return RegexCoarser._join_patterns(patterns)[0]
-
- @staticmethod
- def _join_patterns(patterns, max_length=0):
- """convert list of patterns to the segmented list of dense regex patterns"""
- if not patterns:
- return ['']
-
- if len(patterns) == 1:
- return patterns
-
- if not max_length:
- return [RegexCoarser.HIDDEN_CHOICE_STARTER + '|'.join(patterns) + ')']
-
- long_patterns = []
- join_patterns = []
- for pattern in patterns:
- len_pattern = len(pattern)
- if not len_pattern:
- continue
- if len_pattern >= max_length:
- long_patterns.append(pattern)
- continue
-
- for idx, patterns_to_join in enumerate(join_patterns):
- patterns_to_join, len_patterns_to_join = patterns_to_join
- if len_pattern + len_patterns_to_join < max_length:
- patterns_to_join.append(pattern)
- len_patterns_to_join += len_pattern
- join_patterns[idx] = (patterns_to_join, len_patterns_to_join)
- len_pattern = 0
- break
- if len_pattern:
- join_patterns.append(([pattern], len_pattern))
- join_patterns.sort(key=lambda x: x[1])
-
- if join_patterns:
- # pattern, _, choice = pattern.endswith(RegexCoarser.ANY_CHARS)
- join_patterns = [
- RegexCoarser.HIDDEN_CHOICE_STARTER + '|'.join(patterns_to_join) + ')'
- for patterns_to_join, _ in join_patterns
- ]
-
- return join_patterns + long_patterns