diff options
Diffstat (limited to 'ice_validator/tests/utils')
-rw-r--r-- | ice_validator/tests/utils/__init__.py | 6 | ||||
-rw-r--r-- | ice_validator/tests/utils/nested_files.py | 50 | ||||
-rw-r--r-- | ice_validator/tests/utils/network_roles.py | 82 | ||||
-rw-r--r-- | ice_validator/tests/utils/ports.py | 176 | ||||
-rw-r--r-- | ice_validator/tests/utils/vm_types.py | 34 |
5 files changed, 177 insertions, 171 deletions
diff --git a/ice_validator/tests/utils/__init__.py b/ice_validator/tests/utils/__init__.py index e8f24cd..ec11176 100644 --- a/ice_validator/tests/utils/__init__.py +++ b/ice_validator/tests/utils/__init__.py @@ -2,11 +2,11 @@ # ============LICENSE_START======================================================= # org.onap.vvp/validation-scripts # =================================================================== -# Copyright © 2017 AT&T Intellectual Property. All rights reserved. +# Copyright © 2018 AT&T Intellectual Property. All rights reserved. # =================================================================== # # Unless otherwise specified, all software contained herein is licensed -# under the Apache License, Version 2.0 (the “License”); +# under the Apache License, Version 2.0 (the "License"); # you may not use this software except in compliance with the License. # You may obtain a copy of the License at # @@ -21,7 +21,7 @@ # # # Unless otherwise specified, all documentation contained herein is licensed -# under the Creative Commons License, Attribution 4.0 Intl. (the “License”); +# under the Creative Commons License, Attribution 4.0 Intl. (the "License"); # you may not use this documentation except in compliance with the License. # You may obtain a copy of the License at # diff --git a/ice_validator/tests/utils/nested_files.py b/ice_validator/tests/utils/nested_files.py index 39306eb..02f733d 100644 --- a/ice_validator/tests/utils/nested_files.py +++ b/ice_validator/tests/utils/nested_files.py @@ -45,15 +45,15 @@ from os import path import re import yaml -VERSION = '1.0.2' +VERSION = "1.0.2" def get_list_of_nested_files(yml, dirpath): - ''' + """ return a list of all nested files - ''' + """ - if not hasattr(yml, 'items'): + if not hasattr(yml, "items"): return [] nested_files = [] @@ -68,34 +68,29 @@ def get_list_of_nested_files(yml, dirpath): nested_files.append(filepath) nested_files.extend(get_list_of_nested_files(t_yml, dirpath)) elif t == "OS::Heat::ResourceGroup": - rdt = (v.get("properties", {}) - .get("resource_def", {}) - .get("type", None)) + rdt = v.get("properties", {}).get("resource_def", {}).get("type", None) if rdt and (rdt.endswith(".yml") or rdt.endswith(".yaml")): filepath = path.join(dirpath, rdt) with open(filepath) as fh: rdt_yml = yaml.load(fh) nested_files.append(filepath) - nested_files.extend( - get_list_of_nested_files(rdt_yml, dirpath)) + nested_files.extend(get_list_of_nested_files(rdt_yml, dirpath)) if isinstance(v, dict): - nested_files.extend( - get_list_of_nested_files(v, dirpath)) + nested_files.extend(get_list_of_nested_files(v, dirpath)) elif isinstance(v, list): for d in v: - nested_files.extend( - get_list_of_nested_files(d, dirpath)) + nested_files.extend(get_list_of_nested_files(d, dirpath)) return nested_files def check_for_invalid_nesting(yml, yaml_file, dirpath): - ''' + """ return a list of all nested files - ''' - if not hasattr(yml, 'items'): + """ + if not hasattr(yml, "items"): return [] invalid_nesting = [] - p = re.compile('^[A-z]*::[A-z]*::[A-z]*$') + p = re.compile("^[A-z]*::[A-z]*::[A-z]*$") for v in yml.values(): if isinstance(v, dict) and "type" in v: @@ -108,8 +103,8 @@ def check_for_invalid_nesting(yml, yaml_file, dirpath): invalid_nesting.append(yaml_file) continue elif not p.match(rd["type"]) and not ( - rd["type"].endswith(".yml") - or rd["type"].endswith(".yaml")): + rd["type"].endswith(".yml") or rd["type"].endswith(".yaml") + ): filepath = path.join(dirpath, rd["type"]) else: continue @@ -120,20 +115,11 @@ def check_for_invalid_nesting(yml, yaml_file, dirpath): yml = yaml.load(fh) except yaml.YAMLError as e: invalid_nesting.append(filepath) - print(e) # pylint: disable=superfluous-parens - invalid_nesting.extend(check_for_invalid_nesting( - yml, - filepath, - dirpath)) + print(e) # pylint: disable=superfluous-parens + invalid_nesting.extend(check_for_invalid_nesting(yml, filepath, dirpath)) if isinstance(v, dict): - invalid_nesting.extend(check_for_invalid_nesting( - v, - yaml_file, - dirpath)) + invalid_nesting.extend(check_for_invalid_nesting(v, yaml_file, dirpath)) elif isinstance(v, list): for d in v: - invalid_nesting.extend(check_for_invalid_nesting( - d, - yaml_file, - dirpath)) + invalid_nesting.extend(check_for_invalid_nesting(d, yaml_file, dirpath)) return invalid_nesting diff --git a/ice_validator/tests/utils/network_roles.py b/ice_validator/tests/utils/network_roles.py index 3f8d6b8..d4b2cce 100644 --- a/ice_validator/tests/utils/network_roles.py +++ b/ice_validator/tests/utils/network_roles.py @@ -43,37 +43,31 @@ import socket def get_network_role_from_port(resource): - ''' + """ get the network role from a neutron port resource - ''' + """ if not isinstance(resource, dict): return None - if 'type' not in resource: + if "type" not in resource: return None - if resource['type'] != 'OS::Neutron::Port': + if resource["type"] != "OS::Neutron::Port": return None - if 'properties' not in resource: + if "properties" not in resource: return None formats = [ - ["network", "string", "internal", - re.compile(r'int_(.+?)_net_id')], - ["network", "string", "internal", - re.compile(r'int_(.+?)_net_name')], - ["network", "string", "external", - re.compile(r'(.+?)_net_id')], - ["network", "string", "external", - re.compile(r'(.+?)_net_name')], - ] + ["network", "string", "internal", re.compile(r"int_(.+?)_net_id")], + ["network", "string", "internal", re.compile(r"int_(.+?)_net_name")], + ["network", "string", "external", re.compile(r"(.+?)_net_id")], + ["network", "string", "external", re.compile(r"(.+?)_net_name")], + ] for k1, v1 in resource["properties"].items(): - if k1 != 'network': + if k1 != "network": continue # get the network id or name - network = ( - v1.get('get_param') or - v1.get('get_resource')) + network = v1.get("get_param") or v1.get("get_resource") if not network: continue @@ -86,31 +80,27 @@ def get_network_role_from_port(resource): def get_network_type_from_port(resource): - ''' + """ get the network type from a neutron port resource - ''' + """ if not isinstance(resource, dict): return None - if 'type' not in resource: + if "type" not in resource: return None - if resource['type'] != 'OS::Neutron::Port': + if resource["type"] != "OS::Neutron::Port": return None - if 'properties' not in resource: + if "properties" not in resource: return None formats = [ - ["network", "string", "internal", - re.compile(r'int_(.+?)_net_id')], - ["network", "string", "internal", - re.compile(r'int_(.+?)_net_name')], - ["network", "string", "external", - re.compile(r'(.+?)_net_id')], - ["network", "string", "external", - re.compile(r'(.+?)_net_name')], - ] + ["network", "string", "internal", re.compile(r"int_(.+?)_net_id")], + ["network", "string", "internal", re.compile(r"int_(.+?)_net_name")], + ["network", "string", "external", re.compile(r"(.+?)_net_id")], + ["network", "string", "external", re.compile(r"(.+?)_net_name")], + ] for k1, v1 in resource["properties"].items(): - if k1 != 'network': + if k1 != "network": continue if "get_param" not in v1: continue @@ -122,22 +112,22 @@ def get_network_type_from_port(resource): return None -def is_valid_ip_address(ip_address, ip_type='ipv4'): - ''' +def is_valid_ip_address(ip_address, ip_type="ipv4"): + """ check if an ip address is valid - ''' - if ip_type == 'ipv4': + """ + if ip_type == "ipv4": return is_valid_ipv4_address(ip_address) - elif ip_type == 'ipv6': + elif ip_type == "ipv6": return is_valid_ipv6_address(ip_address) return False def is_valid_ipv4_address(ip_address): - ''' + """ check if an ip address of the type ipv4 is valid - ''' + """ try: socket.inet_pton(socket.AF_INET, ip_address) except AttributeError: @@ -145,17 +135,17 @@ def is_valid_ipv4_address(ip_address): socket.inet_aton(ip_address) except (OSError, socket.error): return False - return ip_address.count('.') == 3 + return ip_address.count(".") == 3 except (OSError, socket.error): return False return True def is_valid_ipv6_address(ip_address): - ''' + """ check if an ip address of the type ipv6 is valid - ''' + """ try: socket.inet_pton(socket.AF_INET6, ip_address) except (OSError, socket.error): @@ -164,13 +154,13 @@ def is_valid_ipv6_address(ip_address): def property_uses_get_resource(resource, property_name): - ''' + """ returns true if a port's network property uses the get_resource function - ''' + """ if not isinstance(resource, dict): return False - if 'properties' not in resource: + if "properties" not in resource: return False for k1, v1 in resource["properties"].items(): if k1 != property_name: diff --git a/ice_validator/tests/utils/ports.py b/ice_validator/tests/utils/ports.py index c9d4be9..e479201 100644 --- a/ice_validator/tests/utils/ports.py +++ b/ice_validator/tests/utils/ports.py @@ -44,53 +44,93 @@ import re def is_valid_ip_address(ip_address, vm_type, network_role, port_property): - ''' + """ Check the ip_address to make sure it is properly formatted and also contains {vm_type} and {network_role} - ''' + """ allowed_formats = [ - ["allowed_address_pairs", "string", "internal", - re.compile(r'(.+?)_int_(.+?)_floating_v6_ip')], - ["allowed_address_pairs", "string", "internal", - re.compile(r'(.+?)_int_(.+?)_floating_ip')], - ["allowed_address_pairs", "string", "external", - re.compile(r'(.+?)_floating_v6_ip')], - ["allowed_address_pairs", "string", "external", - re.compile(r'(.+?)_floating_ip')], - ["allowed_address_pairs", "string", "internal", - re.compile(r'(.+?)_int_(.+?)_v6_ip_\d+')], - ["allowed_address_pairs", "string", "internal", - re.compile(r'(.+?)_int_(.+?)_ip_\d+')], - ["allowed_address_pairs", "string", "external", - re.compile(r'(.+?)_v6_ip_\d+')], - ["allowed_address_pairs", "string", "external", - re.compile(r'(.+?)_ip_\d+')], - ["allowed_address_pairs", "comma_delimited_list", - "internal", re.compile(r'(.+?)_int_(.+?)_v6_ips')], - ["allowed_address_pairs", "comma_delimited_list", - "internal", re.compile(r'(.+?)_int_(.+?)_ips')], - ["allowed_address_pairs", "comma_delimited_list", - "external", re.compile(r'(.+?)_v6_ips')], - ["allowed_address_pairs", "comma_delimited_list", - "external", re.compile(r'(.+?)_ips')], - ["fixed_ips", "string", "internal", - re.compile(r'(.+?)_int_(.+?)_v6_ip_\d+')], - ["fixed_ips", "string", "internal", - re.compile(r'(.+?)_int_(.+?)_ip_\d+')], - ["fixed_ips", "string", "external", - re.compile(r'(.+?)_v6_ip_\d+')], - ["fixed_ips", "string", "external", - re.compile(r'(.+?)_ip_\d+')], - ["fixed_ips", "comma_delimited_list", "internal", - re.compile(r'(.+?)_int_(.+?)_v6_ips')], - ["fixed_ips", "comma_delimited_list", "internal", - re.compile(r'(.+?)_int_(.+?)_ips')], - ["fixed_ips", "comma_delimited_list", "external", - re.compile(r'(.+?)_v6_ips')], - ["fixed_ips", "comma_delimited_list", "external", - re.compile(r'(.+?)_ips')], - ] + [ + "allowed_address_pairs", + "string", + "internal", + re.compile(r"(.+?)_int_(.+?)_floating_v6_ip"), + ], + [ + "allowed_address_pairs", + "string", + "internal", + re.compile(r"(.+?)_int_(.+?)_floating_ip"), + ], + [ + "allowed_address_pairs", + "string", + "external", + re.compile(r"(.+?)_floating_v6_ip"), + ], + [ + "allowed_address_pairs", + "string", + "external", + re.compile(r"(.+?)_floating_ip"), + ], + [ + "allowed_address_pairs", + "string", + "internal", + re.compile(r"(.+?)_int_(.+?)_v6_ip_\d+"), + ], + [ + "allowed_address_pairs", + "string", + "internal", + re.compile(r"(.+?)_int_(.+?)_ip_\d+"), + ], + ["allowed_address_pairs", "string", "external", re.compile(r"(.+?)_v6_ip_\d+")], + ["allowed_address_pairs", "string", "external", re.compile(r"(.+?)_ip_\d+")], + [ + "allowed_address_pairs", + "comma_delimited_list", + "internal", + re.compile(r"(.+?)_int_(.+?)_v6_ips"), + ], + [ + "allowed_address_pairs", + "comma_delimited_list", + "internal", + re.compile(r"(.+?)_int_(.+?)_ips"), + ], + [ + "allowed_address_pairs", + "comma_delimited_list", + "external", + re.compile(r"(.+?)_v6_ips"), + ], + [ + "allowed_address_pairs", + "comma_delimited_list", + "external", + re.compile(r"(.+?)_ips"), + ], + ["fixed_ips", "string", "internal", re.compile(r"(.+?)_int_(.+?)_v6_ip_\d+")], + ["fixed_ips", "string", "internal", re.compile(r"(.+?)_int_(.+?)_ip_\d+")], + ["fixed_ips", "string", "external", re.compile(r"(.+?)_v6_ip_\d+")], + ["fixed_ips", "string", "external", re.compile(r"(.+?)_ip_\d+")], + [ + "fixed_ips", + "comma_delimited_list", + "internal", + re.compile(r"(.+?)_int_(.+?)_v6_ips"), + ], + [ + "fixed_ips", + "comma_delimited_list", + "internal", + re.compile(r"(.+?)_int_(.+?)_ips"), + ], + ["fixed_ips", "comma_delimited_list", "external", re.compile(r"(.+?)_v6_ips")], + ["fixed_ips", "comma_delimited_list", "external", re.compile(r"(.+?)_ips")], + ] for v3 in allowed_formats: if v3[0] != port_property: @@ -98,33 +138,30 @@ def is_valid_ip_address(ip_address, vm_type, network_role, port_property): # check if pattern matches m = v3[3].match(ip_address) if m: - if (v3[2] == "internal" and - len(m.groups()) > 1): - return m.group(1) == vm_type and\ - m.group(2) == network_role - elif (v3[2] == "external" and - len(m.groups()) > 0): + if v3[2] == "internal" and len(m.groups()) > 1: + return m.group(1) == vm_type and m.group(2) == network_role + elif v3[2] == "external" and len(m.groups()) > 0: return m.group(1) == vm_type + "_" + network_role return False def get_invalid_ip_addresses(resources, port_property): - ''' + """ Get a list of valid ip addresses for a heat resources section - ''' + """ invalid_ip_addresses = [] for k, v in resources.items(): if not isinstance(v, dict): continue - if 'type' not in v: + if "type" not in v: continue - if v['type'] not in 'OS::Nova::Server': + if v["type"] not in "OS::Nova::Server": continue - if 'properties' not in v: + if "properties" not in v: continue - if 'networks' not in v['properties']: + if "networks" not in v["properties"]: continue port_resource = None @@ -134,16 +171,16 @@ def get_invalid_ip_addresses(resources, port_property): continue # get all ports associated with the nova server - properties = v['properties'] - for network in properties['networks']: + properties = v["properties"] + for network in properties["networks"]: for k3, v3 in network.items(): - if k3 != 'port': + if k3 != "port": continue if not isinstance(v3, dict): continue - if 'get_resource' in v3: - port_id = v3['get_resource'] + if "get_resource" in v3: + port_id = v3["get_resource"] if not resources[port_id]: continue port_resource = resources[port_id] @@ -168,10 +205,9 @@ def get_invalid_ip_addresses(resources, port_property): if isinstance(ip_address, list): ip_address = ip_address[0] - valid_ip_address = is_valid_ip_address(ip_address, - vm_type, - network_role, - port_property) + valid_ip_address = is_valid_ip_address( + ip_address, vm_type, network_role, port_property + ) if not valid_ip_address: invalid_ip_addresses.append(ip_address) @@ -180,16 +216,14 @@ def get_invalid_ip_addresses(resources, port_property): def is_reserved_port(port_id): - ''' + """ Checks to see if the resource id for a port follows the reserve port concept - ''' + """ formats = [ - ["port_id", - re.compile(r'reserve_port_(.+?)_floating_ip_\d+')], - ["port_id", - re.compile(r'reserve_port_(.+?)_floating_v6_ip_\d+')], - ] + ["port_id", re.compile(r"reserve_port_(.+?)_floating_ip_\d+")], + ["port_id", re.compile(r"reserve_port_(.+?)_floating_v6_ip_\d+")], + ] for f in formats: m = f[1].match(port_id.lower()) if m and m.group(1): diff --git a/ice_validator/tests/utils/vm_types.py b/ice_validator/tests/utils/vm_types.py index 41da7d9..78006b9 100644 --- a/ice_validator/tests/utils/vm_types.py +++ b/ice_validator/tests/utils/vm_types.py @@ -42,37 +42,33 @@ import re def get_vm_types_for_resource(resource): - ''' + """ Get all unique vm_types for a resource Notes: - Returns set([]) if the resource is not formatted properly, the passed resource is not a nova server - If more than one vm_type is detected all vm_types will be returned - ''' + """ if not isinstance(resource, dict): return set() - if 'type' not in resource: + if "type" not in resource: return set() - if resource['type'] != 'OS::Nova::Server': + if resource["type"] != "OS::Nova::Server": return set() - if 'properties' not in resource: + if "properties" not in resource: return set() key_values = ["name", "flavor", "image"] key_value_formats = [ - ["name", "string", - re.compile(r'(.+?)_name_\d+')], - ["name", "comma_delimited_list", - re.compile(r'(.+?)_names')], - ["flavor", "string", - re.compile(r'(.+?)_flavor_name')], - ["image", "string", - re.compile(r'(.+?)_image_name')], - ] + ["name", "string", re.compile(r"(.+?)_name_\d+")], + ["name", "comma_delimited_list", re.compile(r"(.+?)_names")], + ["flavor", "string", re.compile(r"(.+?)_flavor_name")], + ["image", "string", re.compile(r"(.+?)_image_name")], + ] vm_types = [] - for k2, v2 in resource['properties'].items(): + for k2, v2 in resource["properties"].items(): if k2 not in key_values: continue if "get_param" not in v2: @@ -90,12 +86,12 @@ def get_vm_types_for_resource(resource): def get_vm_type_for_nova_server(resource): - ''' + """ Get the vm_type for a resource Note: Returns None if not exactly one vm_type is detected, if the resource is not formatted properly, or the passed resource is not a nova server - ''' + """ vm_types = get_vm_types_for_resource(resource) # if more than one vm_type was identified, return None @@ -106,10 +102,10 @@ def get_vm_type_for_nova_server(resource): def get_vm_types(resources): - ''' + """ Get all vm_types for a list of heat resources, do note that some of the values retrieved may be invalid - ''' + """ vm_types = [] for v in resources.values(): vm_types.extend(list(get_vm_types_for_resource(v))) |