diff options
Diffstat (limited to 'policyhandler/policy_utils.py')
-rw-r--r-- | policyhandler/policy_utils.py | 260 |
1 files changed, 232 insertions, 28 deletions
diff --git a/policyhandler/policy_utils.py b/policyhandler/policy_utils.py index c2a8b07..c63f382 100644 --- a/policyhandler/policy_utils.py +++ b/policyhandler/policy_utils.py @@ -16,15 +16,18 @@ # # ECOMP is a trademark and service mark of AT&T Intellectual Property. -"""policy-client communicates with policy-engine thru REST API""" +"""utils for policy usage and conversions""" import json import logging import re +from copy import deepcopy +from typing import Pattern from .policy_consts import (POLICY_BODY, POLICY_CONFIG, POLICY_ID, POLICY_NAME, POLICY_VERSION) + class PolicyUtils(object): """policy-client utils""" _logger = logging.getLogger("policy_handler.policy_utils") @@ -59,30 +62,30 @@ class PolicyUtils(object): return policy @staticmethod - def convert_to_policy(policy_config): - """wrap policy_config received from policy-engine with policy_id.""" - if not policy_config: - return - policy_name = policy_config.get(POLICY_NAME) - policy_version = policy_config.get(POLICY_VERSION) + def convert_to_policy(policy_body): + """wrap policy_body received from policy-engine with policy_id.""" + if not policy_body: + return None + policy_name = policy_body.get(POLICY_NAME) + policy_version = policy_body.get(POLICY_VERSION) if not policy_name or not policy_version: - return + return None policy_id = PolicyUtils.extract_policy_id(policy_name) if not policy_id: - return - return {POLICY_ID:policy_id, POLICY_BODY:policy_config} + return None + return {POLICY_ID:policy_id, POLICY_BODY:policy_body} @staticmethod - def select_latest_policy(policy_configs, min_version_expected=None, ignore_policy_names=None): - """For some reason, the policy-engine returns all version of the policy_configs. + def select_latest_policy(policy_bodies, min_version_expected=None, ignore_policy_names=None): + """For some reason, the policy-engine returns all version of the policy_bodies. DCAE-Controller is only interested in the latest version """ - if not policy_configs: + if not policy_bodies: return - latest_policy_config = {} - for policy_config in policy_configs: - policy_name = policy_config.get(POLICY_NAME) - policy_version = policy_config.get(POLICY_VERSION) + latest_policy_body = {} + for policy_body in policy_bodies: + policy_name = policy_body.get(POLICY_NAME) + policy_version = policy_body.get(POLICY_VERSION) if not policy_name or not policy_version or not policy_version.isdigit(): continue policy_version = int(policy_version) @@ -91,30 +94,30 @@ class PolicyUtils(object): if ignore_policy_names and policy_name in ignore_policy_names: continue - if not latest_policy_config \ - or int(latest_policy_config[POLICY_VERSION]) < policy_version: - latest_policy_config = policy_config + if (not latest_policy_body + or int(latest_policy_body[POLICY_VERSION]) < policy_version): + latest_policy_body = policy_body - return PolicyUtils.parse_policy_config(PolicyUtils.convert_to_policy(latest_policy_config)) + return PolicyUtils.parse_policy_config(PolicyUtils.convert_to_policy(latest_policy_body)) @staticmethod - def select_latest_policies(policy_configs): - """For some reason, the policy-engine returns all version of the policy_configs. + def select_latest_policies(policy_bodies): + """For some reason, the policy-engine returns all version of the policy_bodies. DCAE-Controller is only interested in the latest versions """ - if not policy_configs: + if not policy_bodies: return {} policies = {} - for policy_config in policy_configs: - policy = PolicyUtils.convert_to_policy(policy_config) + for policy_body in policy_bodies: + policy = PolicyUtils.convert_to_policy(policy_body) if not policy: continue policy_id = policy.get(POLICY_ID) policy_version = policy.get(POLICY_BODY, {}).get(POLICY_VERSION) if not policy_id or not policy_version or not policy_version.isdigit(): continue - if policy_id not in policies \ - or int(policy_version) > int(policies[policy_id][POLICY_BODY][POLICY_VERSION]): + if (policy_id not in policies + or int(policy_version) > int(policies[policy_id][POLICY_BODY][POLICY_VERSION])): policies[policy_id] = policy for policy_id in policies: @@ -173,3 +176,204 @@ class Utils(object): if not the_same_values: Utils._logger.debug("values %s != %s", body_1, body_2) return the_same_values + +class RegexCoarser(object): + """ + utility to combine or coarse the collection of regex patterns + into a single regex that is at least not narrower (wider or the same) + than the collection regexes + + inspired by https://github.com/spadgos/regex-combiner in js + """ + ENDER = '***' + GROUPERS = {'{': '}', '[': ']', '(': ')'} + MODIFIERS = '*?+' + CHOICE_STARTER = '(' + HIDDEN_CHOICE_STARTER = '(?:' + ANY_CHARS = '.*' + LINE_START = '^' + + def __init__(self, regex_patterns=None): + """regex coarser""" + self.trie = {} + self.patterns = [] + self.add_regex_patterns(regex_patterns) + + def get_combined_regex_pattern(self): + """gets the pattern for the combined regex""" + trie = deepcopy(self.trie) + RegexCoarser._compress(trie) + return RegexCoarser._trie_to_pattern(trie) + + def get_coarse_regex_patterns(self, max_length=100): + """gets the patterns for the coarse regex""" + trie = deepcopy(self.trie) + RegexCoarser._compress(trie) + patterns = RegexCoarser._trie_to_pattern(trie, True) + + root_patterns = [] + for pattern in patterns: + left, _, choice = pattern.partition(RegexCoarser.CHOICE_STARTER) + if choice and left and left.strip() != RegexCoarser.LINE_START and not left.isspace(): + pattern = left + RegexCoarser.ANY_CHARS + root_patterns.append(pattern) + root_patterns = RegexCoarser._join_patterns(root_patterns, max_length) + + if not root_patterns or root_patterns == ['']: + return [] + return root_patterns + + + def add_regex_patterns(self, new_regex_patterns): + """adds the new_regex patterns to RegexPatternCoarser""" + if not new_regex_patterns or not isinstance(new_regex_patterns, list): + return + for new_regex_pattern in new_regex_patterns: + self.add_regex_pattern(new_regex_pattern) + + def add_regex_pattern(self, new_regex_pattern): + """adds the new_regex to RegexPatternCoarser""" + new_regex_pattern = RegexCoarser._regex_pattern_to_string(new_regex_pattern) + if not new_regex_pattern: + return + + self.patterns.append(new_regex_pattern) + + tokens = RegexCoarser._tokenize(new_regex_pattern) + last_token_idx = len(tokens) - 1 + trie_node = self.trie + for idx, token in enumerate(tokens): + if token not in trie_node: + trie_node[token] = {} + if idx == last_token_idx: + trie_node[token][RegexCoarser.ENDER] = {} + trie_node = trie_node[token] + + @staticmethod + def _regex_pattern_to_string(regex_pattern): + """convert regex pattern to string""" + if not regex_pattern: + return '' + + if isinstance(regex_pattern, str): + return regex_pattern + + if isinstance(regex_pattern, Pattern): + return regex_pattern.pattern + return None + + @staticmethod + def _tokenize(regex_pattern): + """tokenize the regex pattern for trie assignment""" + tokens = [] + token = '' + group_ender = None + use_next = False + + for char in regex_pattern: + if use_next: + use_next = False + token += char + char = None + + if char == '\\': + use_next = True + token += char + continue + + if not group_ender and char in RegexCoarser.GROUPERS: + group_ender = RegexCoarser.GROUPERS[char] + token = char + char = None + + if char is None: + pass + elif char == group_ender: + token += char + group_ender = None + if char == '}': # this group is a modifier + tokens[len(tokens) - 1] += token + token = '' + continue + elif char in RegexCoarser.MODIFIERS: + if group_ender: + token += char + else: + tokens[len(tokens) - 1] += char + continue + else: + token += char + + if not group_ender: + tokens.append(token) + token = '' + + if token: + tokens.append(token) + return tokens + + @staticmethod + def _compress(trie): + """compress trie into shortest leaves""" + for key, subtrie in trie.items(): + RegexCoarser._compress(subtrie) + subkeys = list(subtrie.keys()) + if len(subkeys) == 1: + trie[key + subkeys[0]] = subtrie[subkeys[0]] + del trie[key] + + @staticmethod + def _trie_to_pattern(trie, top_keep=False): + """convert trie to the regex pattern""" + patterns = [ + key.replace(RegexCoarser.ENDER, '') + RegexCoarser._trie_to_pattern(subtrie) + for key, subtrie in trie.items() + ] + + if top_keep: + return patterns + + return RegexCoarser._join_patterns(patterns)[0] + + @staticmethod + def _join_patterns(patterns, max_length=0): + """convert list of patterns to the segmented list of dense regex patterns""" + if not patterns: + return [''] + + if len(patterns) == 1: + return patterns + + if not max_length: + return [RegexCoarser.HIDDEN_CHOICE_STARTER + '|'.join(patterns) + ')'] + + long_patterns = [] + join_patterns = [] + for pattern in patterns: + len_pattern = len(pattern) + if not len_pattern: + continue + if len_pattern >= max_length: + long_patterns.append(pattern) + continue + + for idx, patterns_to_join in enumerate(join_patterns): + patterns_to_join, len_patterns_to_join = patterns_to_join + if len_pattern + len_patterns_to_join < max_length: + patterns_to_join.append(pattern) + len_patterns_to_join += len_pattern + join_patterns[idx] = (patterns_to_join, len_patterns_to_join) + len_pattern = 0 + break + if len_pattern: + join_patterns.append(([pattern], len_pattern)) + join_patterns.sort(key=lambda x: x[1]) + + if join_patterns: + # pattern, _, choice = pattern.endswith(RegexCoarser.ANY_CHARS) + join_patterns = [ + RegexCoarser.HIDDEN_CHOICE_STARTER + '|'.join(patterns_to_join) + ')' + for patterns_to_join, _ in join_patterns + ] + + return join_patterns + long_patterns |