aboutsummaryrefslogtreecommitdiffstats
path: root/kubernetes/aaf/components/aaf-sshsm/components/aaf-sshsm-abrmd
diff options
context:
space:
mode:
authorLukasz Rajewski <lukasz.rajewski@t-mobile.pl>2024-09-12 06:34:41 +0000
committerGerrit Code Review <gerrit@onap.org>2024-09-12 06:34:41 +0000
commit0765a2124569cfa38a2828f2c9d88cf8cca8eedc (patch)
tree90c78e2dfe39dd6e97b476d246a0b70b2b20d274 /kubernetes/aaf/components/aaf-sshsm/components/aaf-sshsm-abrmd
parent6de7e5a6e6f8b06ee420b5926efd3d53c639e1b1 (diff)
parent4b5e5842a6a83ded838417aa863530c0ebd1ab5e (diff)
Merge "[STRIMZI] Solve Security Policy violations"HEADmaster
Diffstat (limited to 'kubernetes/aaf/components/aaf-sshsm/components/aaf-sshsm-abrmd')
0 files changed, 0 insertions, 0 deletions
8' href='#n148'>148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314
# -------------------------------------------------------------------------
#   Copyright (c) 2015-2017 AT&T Intellectual Property
#
#   Licensed under the Apache License, Version 2.0 (the "License");
#   you may not use this file except in compliance with the License.
#   You may obtain a copy of the License at
#
#       http://www.apache.org/licenses/LICENSE-2.0
#
#   Unless required by applicable law or agreed to in writing, software
#   distributed under the License is distributed on an "AS IS" BASIS,
#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#   See the License for the specific language governing permissions and
#   limitations under the License.
#
# -------------------------------------------------------------------------
#
import copy
import json
import re

import yaml

from osdf.utils.programming_utils import dot_notation

policy_config_mapping = yaml.safe_load(open('config/has_config.yaml')).get('policy_config_mapping')


def get_opt_query_data(req_json, policies):
    """
    Fetch service and order specific details from the requestParameters field of a request.
    :param req_json: a request file
    :param policies: A set of policies
    :return: A dictionary with service and order-specific attributes.
    """
    req_param_dict = {}
    if 'requestParameters' in req_json["placementInfo"]:
        req_params = req_json["placementInfo"]["requestParameters"]
        for policy in policies:
            for queryProp in policy[list(policy.keys())[0]]['properties']['queryProperties']:
                attr_val = queryProp['value'] if 'value' in queryProp and queryProp['value'] != "" \
                    else dot_notation(req_params, queryProp['attribute_location'])
                if attr_val is not None:
                    req_param_dict.update({queryProp['attribute']: attr_val})
    return req_param_dict


def gen_optimization_policy(vnf_list, optimization_policy):
    """Generate optimization policy details to pass to Conductor
    :param vnf_list: List of vnf's to used in placement request
    :param optimization_policy: optimization objective policy information provided in the incoming request
    :return: List of optimization objective policies in a format required by Conductor
    """
    optimization_policy_list = []
    for policy in optimization_policy:
        content = policy[list(policy.keys())[0]]['properties']
        parameter_list = []
        parameters = ["cloud_version", "hpa_score"]

        for attr in content['objectiveParameter']['parameterAttributes']:
            parameter = attr['parameter'] if attr['parameter'] in parameters else attr['parameter']+"_between"
            default, vnfs = get_matching_vnfs(attr['resources'], vnf_list)
            for vnf in vnfs:
                value = [vnf] if attr['parameter'] in parameters else [attr['customerLocationInfo'], vnf]
                parameter_list.append({
                    attr['operator']: [attr['weight'], {parameter: value}]
                })

        optimization_policy_list.append({
                content['objective']: {content['objectiveParameter']['operator']: parameter_list }
        })
    return optimization_policy_list


def get_matching_vnfs(resources, vnf_list, match_type="intersection"):
    """Get a list of matching VNFs from the list of resources
    :param resources:
    :param vnf_list: List of vnfs to used in placement request
    :param match_type: "intersection" or "all" or "any" (any => send all_vnfs if there is any intersection)
    :return: List of matching VNFs
    """
    # Check if it is a default policy
    default = True if resources == [] else False
    resources_lcase = [x.lower() for x in resources] if not default else [x.lower() for x in vnf_list]
    if match_type == "all":  # don't bother with any comparisons
        return default, resources if set(resources_lcase) <= set(vnf_list) else None
    common_vnfs = set(vnf_list) & set(resources_lcase) if not default else set(vnf_list)
    common_resources = [x for x in resources if x.lower() in common_vnfs] if not default else list(common_vnfs)
    if match_type == "intersection":  # specifically requested intersection
        return default, list(common_resources)
    return default, resources if common_vnfs else None  # "any" match => all resources to be returned


def gen_policy_instance(vnf_list, resource_policy, match_type="intersection", rtype=None):
    """Generate a list of policies
    :param vnf_list: List of vnf's to used in placement request
    :param resource_policy: policy for this specific resource
    :param match_type: How to match the vnf_names with the vnf_list (intersection or "any")
             intersection => return intersection; "any" implies return all vnf_names if intersection is not null
    :param rtype: resource type (e.g. resourceRegionProperty or resourceInstanceProperty)
             None => no controller information added to the policy specification to Conductor
    :return: resource policy list in a format required by Conductor
    """
    resource_policy_list = []
    related_policies = []
    for policy in resource_policy:
        pc = policy[list(policy.keys())[0]]
        default, demands = get_matching_vnfs(pc['properties']['resources'], vnf_list, match_type=match_type)
        resource = {pc['properties']['identity']: {'type': pc['type'], 'demands': demands}}

        if rtype:
            resource[pc['properties']['identity']]['properties'] = {'controller': pc[rtype]['controller'],
                                                                    'request': json.loads(pc[rtype]['request'])}
        if demands and len(demands) != 0:
            # The default policy shall not override the specific policy that already appended
            if default:
                for d in demands:
                    resource_repeated = True \
                        if {pc['properties']['identity']: {'type': pc['type'], 'demands': d}} \
                           in resource_policy_list else False
                    if resource_repeated:
                        continue
                    else:
                        resource_policy_list.append(
                            {pc['properties']['identity']: {'type': pc['type'], 'demands': d }})
                        policy[list(policy.keys())[0]]['properties']['resources'] = d
                        related_policies.append(policy)
            # Need to override the default policies, here delete the outdated policy stored in the db
            if resource in resource_policy_list:
                for pc in related_policies:
                    if pc[list(pc.keys()[0])]['properties']['resources'] == resource:
                        related_policies.remove(pc)
                resource_policy_list.remove(resource)
            related_policies.append(policy)
            resource_policy_list.append(resource)

    return resource_policy_list, related_policies


def gen_resource_instance_policy(vnf_list, resource_instance_policy):
    """Get policies governing resource instances in order to populate the Conductor API call"""
    cur_policies, _ = gen_policy_instance(vnf_list, resource_instance_policy, rtype='resourceInstanceProperty')
    return cur_policies


def gen_resource_region_policy(vnf_list, resource_region_policy):
    """Get policies governing resource region in order to populate the Conductor API call"""
    cur_policies, _ = gen_policy_instance(vnf_list, resource_region_policy, rtype='resourceRegionProperty')
    return cur_policies


def gen_inventory_group_policy(vnf_list, inventory_group_policy):
    """Get policies governing inventory group in order to populate the Conductor API call"""
    cur_policies, _ = gen_policy_instance(vnf_list, inventory_group_policy, rtype=None)
    return cur_policies


def gen_reservation_policy(vnf_list, reservation_policy):
    """Get policies governing resource instances in order to populate the Conductor API call"""
    cur_policies, _ = gen_policy_instance(vnf_list, reservation_policy, rtype='instanceReservationProperty')
    return cur_policies


def gen_distance_to_location_policy(vnf_list, distance_to_location_policy):
    """Get policies governing distance-to-location for VNFs in order to populate the Conductor API call"""
    cur_policies, related_policies = gen_policy_instance(vnf_list, distance_to_location_policy, rtype=None)
    for p_new, p_main in zip(cur_policies, related_policies):  # add additional fields to each policy
        properties = p_main[list(p_main.keys())[0]]['properties']['distanceProperties']
        pcp_d = properties['distance']
        p_new[p_main[list(p_main.keys())[0]]['properties']['identity']]['properties'] = {
            'distance': pcp_d['operator'] + " " + pcp_d['value'].lower() + " " + pcp_d['unit'].lower(),
            'location': properties['locationInfo']
        }
    return cur_policies


def gen_attribute_policy(vnf_list, attribute_policy):
    """Get policies governing attributes of VNFs in order to populate the Conductor API call"""
    cur_policies, related_policies = gen_policy_instance(vnf_list, attribute_policy, rtype=None)
    for p_new, p_main in zip(cur_policies, related_policies):  # add additional fields to each policy
        properties = p_main[list(p_main.keys())[0]]['properties']['cloudAttributeProperty']
        attribute_mapping = policy_config_mapping['filtering_attributes']  # wanted attributes and mapping
        p_new[p_main[list(p_main.keys())[0]]['properties']['identity']]['properties'] = {
            'evaluate': dict((k, properties.get(attribute_mapping.get(k))) for k in attribute_mapping.keys())
        }
    return cur_policies  # cur_policies gets updated in place...


def gen_zone_policy(vnf_list, zone_policy):
    """Get zone policies in order to populate the Conductor API call"""
    cur_policies, related_policies = gen_policy_instance(vnf_list, zone_policy, match_type="all", rtype=None)
    for p_new, p_main in zip(cur_policies, related_policies):  # add additional fields to each policy
        pmz = p_main[list(p_main.keys())[0]]['properties']['affinityProperties']
        p_new[p_main[list(p_main.keys())[0]]['properties']['identity']]['properties'] = \
            {'category': pmz['category'], 'qualifier': pmz['qualifier']}
    return cur_policies


def gen_capacity_policy(vnf_list, capacity_policy):
    """Get zone policies in order to populate the Conductor API call"""
    cur_policies, related_policies = gen_policy_instance(vnf_list, capacity_policy, rtype=None)
    for p_new, p_main in zip(cur_policies, related_policies):  # add additional fields to each policy
        pmz = p_main[list(p_main.keys())[0]]['properties']['capacityProperty']
        p_new[p_main[list(p_main.keys())[0]]['properties']['identity']]['properties'] = \
            {"controller": pmz['controller'], 'request': json.loads(pmz['request'])}
    return cur_policies


def gen_hpa_policy(vnf_list, hpa_policy):
    """Get zone policies in order to populate the Conductor API call"""
    cur_policies, related_policies = gen_policy_instance(vnf_list, hpa_policy, rtype=None)
    for p_new, p_main in zip(cur_policies, related_policies):  # add additional fields to each policy
        p_new[p_main[list(p_main.keys())[0]]['properties']['identity']]['properties'] = \
            {'evaluate': p_main[list(p_main.keys())[0]]['properties']['flavorFeatures']}
    return cur_policies


def get_augmented_policy_attributes(policy_property, demand):
    """Get policy attributes and augment them using policy_config_mapping and demand information"""
    attributes = copy.copy(policy_property['attributes'])
    remapping = policy_config_mapping['remapping']
    extra = dict((x, demand['resourceModelInfo'][remapping[x]]) for x in attributes if x in remapping)
    attributes.update(extra)
    return attributes


def get_candidates_demands(demand):
    """Get demands related to candidates; e.g. excluded/required"""
    res = {}
    for k, v in policy_config_mapping['candidates'].items():
        if k not in demand:
            continue
        res[v] = [{'inventory_type': x['identifierType'], 'candidate_id': x['identifiers']} for x in demand[k]]
    return res


def get_policy_properties(demand, policies):
    """Get policy_properties for cases where there is a match with the demand"""
    for policy in policies:
        policy_demands = set([x.lower() for x in policy[list(policy.keys())[0]]['properties']['resources']])
        if policy_demands and demand['resourceModuleName'].lower() not in policy_demands:
            continue  # no match for this policy
        elif policy_demands == set(): # Append resource name for default policy
            policy[list(policy.keys())[0]]['properties'].update(resources=list(demand.get('resourceModuleName')))
        for policy_property in policy[list(policy.keys())[0]]['properties']['vnfProperties']:
            yield policy_property


def get_demand_properties(demand, policies):
    """Get list demand properties objects (named tuples) from policy"""
    demand_properties = []
    for policy_property in get_policy_properties(demand, policies):
        prop = dict(inventory_provider=policy_property['inventoryProvider'],
                    inventory_type=policy_property['inventoryType'],
                    service_type=demand['serviceResourceId'],
                    service_resource_id=demand['serviceResourceId'])

        prop.update({'unique': policy_property['unique']} if 'unique' in policy_property and
                                                             policy_property['unique'] else {})
        prop['filtering_attributes'] = dict()
        prop['filtering_attributes'].update({'global-customer-id': policy_property['customerId']}
                                  if policy_property['customerId'] else {})
        prop['filtering_attributes'].update({'model-invariant-id': demand['resourceModelInfo']['modelInvariantId']}
                                  if demand['resourceModelInfo']['modelInvariantId'] else {})
        prop['filtering_attributes'].update({'model-version-id': demand['resourceModelInfo']['modelVersionId']}
                                  if demand['resourceModelInfo']['modelVersionId'] else {})
        prop['filtering_attributes'].update({'equipment-role': policy_property['equipmentRole']}
                                  if policy_property['equipmentRole'] else {})

        if policy_property.get('attributes'):
            for attr_key, attr_val in policy_property['attributes'].items():
                update_converted_attribute(attr_key, attr_val, prop, 'filtering_attributes')
        if policy_property.get('passthroughAttributes'):
            prop['passthrough_attributes'] = dict()
            for attr_key, attr_val in policy_property['passthroughAttributes'].items():
                update_converted_attribute(attr_key, attr_val, prop, 'passthrough_attributes')

        prop.update(get_candidates_demands(demand))
        demand_properties.append(prop)
    return demand_properties


def update_converted_attribute(attr_key, attr_val, properties, attribute_type):
    """
    Updates dictonary of attributes with one specified in the arguments.
    Automatically translates key namr from camelCase to hyphens
    :param attribute_type: attribute section name
    :param attr_key: key of the attribute
    :param attr_val: value of the attribute
    :param properties: dictionary with attributes to update
    :return:
    """
    if attr_val:
        remapping = policy_config_mapping[attribute_type]
        if remapping.get(attr_key):
            key_value = remapping.get(attr_key)
        else:
            key_value = re.sub('(.)([A-Z][a-z]+)', r'\1-\2', attr_key)
            key_value = re.sub('([a-z0-9])([A-Z])', r'\1-\2', key_value).lower()
        properties[attribute_type].update({key_value: attr_val})


def gen_demands(req_json, vnf_policies):
    """Generate list of demands based on request and VNF policies
    :param req_json: Request object from the client (e.g. MSO)
    :param vnf_policies: Policies associated with demand resources (e.g. from grouped_policies['vnfPolicy'])
    :return: list of demand parameters to populate the Conductor API call
    """
    demand_dictionary = {}
    for demand in req_json['placementInfo']['placementDemands']:
        prop = get_demand_properties(demand, vnf_policies)
        if len(prop) > 0:
            demand_dictionary.update({demand['resourceModuleName']: prop})
    return demand_dictionary