aboutsummaryrefslogtreecommitdiffstats
path: root/policyhandler/pdp_api_v0/policy_utils.py
blob: 2cbb22c024e209999015ee9d08fc4b7f981d322d (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
# ================================================================================
# Copyright (c) 2018-2020 AT&T Intellectual Property. All rights reserved.
# ================================================================================
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ============LICENSE_END=========================================================
#

"""utils for policy usage and conversions"""

import re

from ..policy_consts import POLICY_BODY, POLICY_ID
from ..utils import Utils
from .pdp_consts import POLICY_CONFIG, POLICY_NAME, POLICY_VERSION


class PolicyUtils(object):
    """policy-client utils"""
    _policy_name_ext = re.compile('[.][0-9]+[.][a-zA-Z]+$')

    @staticmethod
    def extract_policy_id(policy_name):
        """ policy_name  = policy_id + "." + <version> + "." + <extension>
        For instance,
        policy_name      = DCAE_alex.Config_alex_policy_number_1.3.xml
               policy_id = DCAE_alex.Config_alex_policy_number_1
            policy_scope = DCAE_alex
            policy_class = Config
          policy_version = 3
        type = extension = xml
               delimiter = "."
        policy_class_delimiter = "_"
        policy_name in PAP = DCAE_alex.alex_policy_number_1
        """
        if not policy_name:
            return
        return PolicyUtils._policy_name_ext.sub('', policy_name)

    @staticmethod
    def parse_policy_config(policy):
        """try parsing the config in policy."""
        if not policy:
            return policy
        config = policy.get(POLICY_BODY, {}).get(POLICY_CONFIG)
        if config:
            policy[POLICY_BODY][POLICY_CONFIG] = Utils.safe_json_parse(config)
        return policy

    @staticmethod
    def convert_to_policy(policy_body):
        """wrap policy_body received from policy-engine with policy_id."""
        if not policy_body:
            return None
        policy_name = policy_body.get(POLICY_NAME)
        policy_version = policy_body.get(POLICY_VERSION)
        if not policy_name or policy_version is None:
            return None
        policy_id = PolicyUtils.extract_policy_id(policy_name)
        if not policy_id:
            return None
        return {POLICY_ID:policy_id, POLICY_BODY:policy_body}

    @staticmethod
    def select_latest_policy(policy_bodies, expected_versions=None, ignore_policy_names=None):
        """For some reason, the policy-engine returns all version of the policy_bodies.
        DCAE-Controller is only interested in the latest version
        """
        if not policy_bodies:
            return
        latest_policy_body = {}
        for policy_body in policy_bodies:
            policy_name = policy_body.get(POLICY_NAME)
            policy_version = policy_body.get(POLICY_VERSION)
            if not policy_name or policy_version is None or not policy_version.isdigit():
                continue
            if expected_versions and policy_version not in expected_versions:
                continue
            if ignore_policy_names and policy_name in ignore_policy_names:
                continue

            if (not latest_policy_body
                    or int(latest_policy_body[POLICY_VERSION]) < int(policy_version)):
                latest_policy_body = policy_body

        return PolicyUtils.parse_policy_config(PolicyUtils.convert_to_policy(latest_policy_body))

    @staticmethod
    def select_latest_policies(policy_bodies):
        """For some reason, the policy-engine returns all version of the policy_bodies.
        DCAE-Controller is only interested in the latest versions
        """
        if not policy_bodies:
            return {}
        policies = {}
        for policy_body in policy_bodies:
            policy = PolicyUtils.convert_to_policy(policy_body)
            if not policy:
                continue
            policy_id = policy.get(POLICY_ID)
            policy_version = policy.get(POLICY_BODY, {}).get(POLICY_VERSION)
            if not policy_id or policy_version is None or not policy_version.isdigit():
                continue
            if (policy_id not in policies
                    or int(policy_version) > int(policies[policy_id][POLICY_BODY][POLICY_VERSION])):
                policies[policy_id] = policy

        for policy_id in policies:
            policies[policy_id] = PolicyUtils.parse_policy_config(policies[policy_id])

        return policies