diff options
Diffstat (limited to 'ice_validator/tests/utils')
-rw-r--r-- | ice_validator/tests/utils/nested_dict.py | 1 | ||||
-rw-r--r-- | ice_validator/tests/utils/nested_files.py | 69 | ||||
-rw-r--r-- | ice_validator/tests/utils/network_roles.py | 90 | ||||
-rw-r--r-- | ice_validator/tests/utils/ports.py | 189 | ||||
-rw-r--r-- | ice_validator/tests/utils/vm_types.py | 33 | ||||
-rw-r--r-- | ice_validator/tests/utils/volumes.py | 2 |
6 files changed, 197 insertions, 187 deletions
diff --git a/ice_validator/tests/utils/nested_dict.py b/ice_validator/tests/utils/nested_dict.py index 9bc3e99..24f7e5e 100644 --- a/ice_validator/tests/utils/nested_dict.py +++ b/ice_validator/tests/utils/nested_dict.py @@ -62,3 +62,4 @@ def is_dict_has_key(obj, key): '''return True/False `obj` is a dict and has `key` ''' return isinstance(obj, dict) and key in obj + diff --git a/ice_validator/tests/utils/nested_files.py b/ice_validator/tests/utils/nested_files.py index 02f733d..c551646 100644 --- a/ice_validator/tests/utils/nested_files.py +++ b/ice_validator/tests/utils/nested_files.py @@ -43,17 +43,17 @@ from os import path import re -import yaml +from tests import cached_yaml as yaml -VERSION = "1.0.2" +VERSION = '1.0.2' def get_list_of_nested_files(yml, dirpath): - """ + ''' return a list of all nested files - """ + ''' - if not hasattr(yml, "items"): + if not hasattr(yml, 'items'): return [] nested_files = [] @@ -63,34 +63,41 @@ def get_list_of_nested_files(yml, dirpath): t = v["type"] if t.endswith(".yml") or t.endswith(".yaml"): filepath = path.join(dirpath, t) - with open(filepath) as fh: - t_yml = yaml.load(fh) - nested_files.append(filepath) - nested_files.extend(get_list_of_nested_files(t_yml, dirpath)) + if path.exists(filepath): + with open(filepath) as fh: + t_yml = yaml.load(fh) + nested_files.append(filepath) + nested_files.extend(get_list_of_nested_files(t_yml, dirpath)) elif t == "OS::Heat::ResourceGroup": - rdt = v.get("properties", {}).get("resource_def", {}).get("type", None) + rdt = (v.get("properties", {}) + .get("resource_def", {}) + .get("type", None)) if rdt and (rdt.endswith(".yml") or rdt.endswith(".yaml")): filepath = path.join(dirpath, rdt) - with open(filepath) as fh: - rdt_yml = yaml.load(fh) - nested_files.append(filepath) - nested_files.extend(get_list_of_nested_files(rdt_yml, dirpath)) + if path.exists(filepath): + with open(filepath) as fh: + rdt_yml = yaml.load(fh) + nested_files.append(filepath) + nested_files.extend( + get_list_of_nested_files(rdt_yml, dirpath)) if isinstance(v, dict): - nested_files.extend(get_list_of_nested_files(v, dirpath)) + nested_files.extend( + get_list_of_nested_files(v, dirpath)) elif isinstance(v, list): for d in v: - nested_files.extend(get_list_of_nested_files(d, dirpath)) + nested_files.extend( + get_list_of_nested_files(d, dirpath)) return nested_files def check_for_invalid_nesting(yml, yaml_file, dirpath): - """ + ''' return a list of all nested files - """ - if not hasattr(yml, "items"): + ''' + if not hasattr(yml, 'items'): return [] invalid_nesting = [] - p = re.compile("^[A-z]*::[A-z]*::[A-z]*$") + p = re.compile('^[A-z]*::[A-z]*::[A-z]*$') for v in yml.values(): if isinstance(v, dict) and "type" in v: @@ -102,9 +109,7 @@ def check_for_invalid_nesting(yml, yaml_file, dirpath): if not isinstance(rd, dict) or "type" not in rd: invalid_nesting.append(yaml_file) continue - elif not p.match(rd["type"]) and not ( - rd["type"].endswith(".yml") or rd["type"].endswith(".yaml") - ): + elif not p.match(rd["type"]): filepath = path.join(dirpath, rd["type"]) else: continue @@ -115,11 +120,21 @@ def check_for_invalid_nesting(yml, yaml_file, dirpath): yml = yaml.load(fh) except yaml.YAMLError as e: invalid_nesting.append(filepath) - print(e) # pylint: disable=superfluous-parens - invalid_nesting.extend(check_for_invalid_nesting(yml, filepath, dirpath)) + print(e) # pylint: disable=superfluous-parens + invalid_nesting.extend(check_for_invalid_nesting( + yml, + filepath, + dirpath)) if isinstance(v, dict): - invalid_nesting.extend(check_for_invalid_nesting(v, yaml_file, dirpath)) + invalid_nesting.extend(check_for_invalid_nesting( + v, + yaml_file, + dirpath)) elif isinstance(v, list): for d in v: - invalid_nesting.extend(check_for_invalid_nesting(d, yaml_file, dirpath)) + invalid_nesting.extend(check_for_invalid_nesting( + d, + yaml_file, + dirpath)) return invalid_nesting + diff --git a/ice_validator/tests/utils/network_roles.py b/ice_validator/tests/utils/network_roles.py index d4b2cce..bed3a5a 100644 --- a/ice_validator/tests/utils/network_roles.py +++ b/ice_validator/tests/utils/network_roles.py @@ -43,31 +43,36 @@ import socket def get_network_role_from_port(resource): - """ + ''' get the network role from a neutron port resource - """ + ''' if not isinstance(resource, dict): return None - if "type" not in resource: + if 'type' not in resource: return None - if resource["type"] != "OS::Neutron::Port": + if resource['type'] != 'OS::Neutron::Port': return None - if "properties" not in resource: + if 'properties' not in resource: return None formats = [ - ["network", "string", "internal", re.compile(r"int_(.+?)_net_id")], - ["network", "string", "internal", re.compile(r"int_(.+?)_net_name")], - ["network", "string", "external", re.compile(r"(.+?)_net_id")], - ["network", "string", "external", re.compile(r"(.+?)_net_name")], - ] + ["network", "string", "internal", + re.compile(r'int_(.+?)_net_id')], + ["network", "string", "internal", + re.compile(r'int_(.+?)_net_name')], + ["network", "string", "external", + re.compile(r'(.+?)_net_id')], + ["network", "string", "external", + re.compile(r'(.+?)_net_name')]] for k1, v1 in resource["properties"].items(): - if k1 != "network": + if k1 != 'network': continue # get the network id or name - network = v1.get("get_param") or v1.get("get_resource") + network = ( + v1.get('get_param') or + v1.get('get_resource')) if not network: continue @@ -79,28 +84,41 @@ def get_network_role_from_port(resource): return None +def get_network_roles(resources): + network_roles = [] + for v in resources.values(): + nr = get_network_role_from_port(v) + if nr: + network_roles.append(nr) + + return set(network_roles) + + def get_network_type_from_port(resource): - """ + ''' get the network type from a neutron port resource - """ + ''' if not isinstance(resource, dict): return None - if "type" not in resource: + if 'type' not in resource: return None - if resource["type"] != "OS::Neutron::Port": + if resource['type'] != 'OS::Neutron::Port': return None - if "properties" not in resource: + if 'properties' not in resource: return None formats = [ - ["network", "string", "internal", re.compile(r"int_(.+?)_net_id")], - ["network", "string", "internal", re.compile(r"int_(.+?)_net_name")], - ["network", "string", "external", re.compile(r"(.+?)_net_id")], - ["network", "string", "external", re.compile(r"(.+?)_net_name")], - ] + ["network", "string", "internal", + re.compile(r'int_(.+?)_net_id')], + ["network", "string", "internal", + re.compile(r'int_(.+?)_net_name')], + ["network", "string", "external", + re.compile(r'(.+?)_net_id')], + ["network", "string", "external", + re.compile(r'(.+?)_net_name')]] for k1, v1 in resource["properties"].items(): - if k1 != "network": + if k1 != 'network': continue if "get_param" not in v1: continue @@ -112,22 +130,22 @@ def get_network_type_from_port(resource): return None -def is_valid_ip_address(ip_address, ip_type="ipv4"): - """ +def is_valid_ip_address(ip_address, ip_type='ipv4'): + ''' check if an ip address is valid - """ - if ip_type == "ipv4": + ''' + if ip_type == 'ipv4': return is_valid_ipv4_address(ip_address) - elif ip_type == "ipv6": + elif ip_type == 'ipv6': return is_valid_ipv6_address(ip_address) return False def is_valid_ipv4_address(ip_address): - """ + ''' check if an ip address of the type ipv4 is valid - """ + ''' try: socket.inet_pton(socket.AF_INET, ip_address) except AttributeError: @@ -135,17 +153,17 @@ def is_valid_ipv4_address(ip_address): socket.inet_aton(ip_address) except (OSError, socket.error): return False - return ip_address.count(".") == 3 + return ip_address.count('.') == 3 except (OSError, socket.error): return False return True def is_valid_ipv6_address(ip_address): - """ + ''' check if an ip address of the type ipv6 is valid - """ + ''' try: socket.inet_pton(socket.AF_INET6, ip_address) except (OSError, socket.error): @@ -154,13 +172,13 @@ def is_valid_ipv6_address(ip_address): def property_uses_get_resource(resource, property_name): - """ + ''' returns true if a port's network property uses the get_resource function - """ + ''' if not isinstance(resource, dict): return False - if "properties" not in resource: + if 'properties' not in resource: return False for k1, v1 in resource["properties"].items(): if k1 != property_name: diff --git a/ice_validator/tests/utils/ports.py b/ice_validator/tests/utils/ports.py index e479201..a2ae8a9 100644 --- a/ice_validator/tests/utils/ports.py +++ b/ice_validator/tests/utils/ports.py @@ -43,125 +43,89 @@ from .vm_types import get_vm_type_for_nova_server import re -def is_valid_ip_address(ip_address, vm_type, network_role, port_property): - """ +def is_valid_ip_address(ip_address, vm_type, network_role, port_property, parameter_type): + ''' Check the ip_address to make sure it is properly formatted and also contains {vm_type} and {network_role} - """ + ''' allowed_formats = [ - [ - "allowed_address_pairs", - "string", - "internal", - re.compile(r"(.+?)_int_(.+?)_floating_v6_ip"), - ], - [ - "allowed_address_pairs", - "string", - "internal", - re.compile(r"(.+?)_int_(.+?)_floating_ip"), - ], - [ - "allowed_address_pairs", - "string", - "external", - re.compile(r"(.+?)_floating_v6_ip"), - ], - [ - "allowed_address_pairs", - "string", - "external", - re.compile(r"(.+?)_floating_ip"), - ], - [ - "allowed_address_pairs", - "string", - "internal", - re.compile(r"(.+?)_int_(.+?)_v6_ip_\d+"), - ], - [ - "allowed_address_pairs", - "string", - "internal", - re.compile(r"(.+?)_int_(.+?)_ip_\d+"), - ], - ["allowed_address_pairs", "string", "external", re.compile(r"(.+?)_v6_ip_\d+")], - ["allowed_address_pairs", "string", "external", re.compile(r"(.+?)_ip_\d+")], - [ - "allowed_address_pairs", - "comma_delimited_list", - "internal", - re.compile(r"(.+?)_int_(.+?)_v6_ips"), - ], - [ - "allowed_address_pairs", - "comma_delimited_list", - "internal", - re.compile(r"(.+?)_int_(.+?)_ips"), - ], - [ - "allowed_address_pairs", - "comma_delimited_list", - "external", - re.compile(r"(.+?)_v6_ips"), - ], - [ - "allowed_address_pairs", - "comma_delimited_list", - "external", - re.compile(r"(.+?)_ips"), - ], - ["fixed_ips", "string", "internal", re.compile(r"(.+?)_int_(.+?)_v6_ip_\d+")], - ["fixed_ips", "string", "internal", re.compile(r"(.+?)_int_(.+?)_ip_\d+")], - ["fixed_ips", "string", "external", re.compile(r"(.+?)_v6_ip_\d+")], - ["fixed_ips", "string", "external", re.compile(r"(.+?)_ip_\d+")], - [ - "fixed_ips", - "comma_delimited_list", - "internal", - re.compile(r"(.+?)_int_(.+?)_v6_ips"), - ], - [ - "fixed_ips", - "comma_delimited_list", - "internal", - re.compile(r"(.+?)_int_(.+?)_ips"), - ], - ["fixed_ips", "comma_delimited_list", "external", re.compile(r"(.+?)_v6_ips")], - ["fixed_ips", "comma_delimited_list", "external", re.compile(r"(.+?)_ips")], - ] + ["allowed_address_pairs", "string", "internal", + re.compile(r'(.+?)_int_(.+?)_floating_v6_ip')], + ["allowed_address_pairs", "string", "internal", + re.compile(r'(.+?)_int_(.+?)_floating_ip')], + ["allowed_address_pairs", "string", "external", + re.compile(r'(.+?)_floating_v6_ip')], + ["allowed_address_pairs", "string", "external", + re.compile(r'(.+?)_floating_ip')], + ["allowed_address_pairs", "string", "internal", + re.compile(r'(.+?)_int_(.+?)_v6_ip_\d+')], + ["allowed_address_pairs", "string", "internal", + re.compile(r'(.+?)_int_(.+?)_ip_\d+')], + ["allowed_address_pairs", "string", "external", + re.compile(r'(.+?)_v6_ip_\d+')], + ["allowed_address_pairs", "string", "external", + re.compile(r'(.+?)_ip_\d+')], + ["allowed_address_pairs", "comma_delimited_list", + "internal", re.compile(r'(.+?)_int_(.+?)_v6_ips')], + ["allowed_address_pairs", "comma_delimited_list", + "internal", re.compile(r'(.+?)_int_(.+?)_ips')], + ["allowed_address_pairs", "comma_delimited_list", + "external", re.compile(r'(.+?)_v6_ips')], + ["allowed_address_pairs", "comma_delimited_list", + "external", re.compile(r'(.+?)_ips')], + ["fixed_ips", "string", "internal", + re.compile(r'(.+?)_int_(.+?)_v6_ip_\d+')], + ["fixed_ips", "string", "internal", + re.compile(r'(.+?)_int_(.+?)_ip_\d+')], + ["fixed_ips", "string", "external", + re.compile(r'(.+?)_v6_ip_\d+')], + ["fixed_ips", "string", "external", + re.compile(r'(.+?)_ip_\d+')], + ["fixed_ips", "comma_delimited_list", "internal", + re.compile(r'(.+?)_int_(.+?)_v6_ips')], + ["fixed_ips", "comma_delimited_list", "internal", + re.compile(r'(.+?)_int_(.+?)_ips')], + ["fixed_ips", "comma_delimited_list", "external", + re.compile(r'(.+?)_v6_ips')], + ["fixed_ips", "comma_delimited_list", "external", + re.compile(r'(.+?)_ips')]] for v3 in allowed_formats: + if v3[1] != parameter_type: + continue if v3[0] != port_property: continue # check if pattern matches m = v3[3].match(ip_address) if m: - if v3[2] == "internal" and len(m.groups()) > 1: - return m.group(1) == vm_type and m.group(2) == network_role - elif v3[2] == "external" and len(m.groups()) > 0: + if (v3[2] == "internal" and + len(m.groups()) > 1): + return m.group(1) == vm_type and\ + m.group(2) == network_role + elif (v3[2] == "external" and + len(m.groups()) > 0): return m.group(1) == vm_type + "_" + network_role return False -def get_invalid_ip_addresses(resources, port_property): - """ +def get_invalid_ip_addresses(resources, port_property, parameters): + ''' Get a list of valid ip addresses for a heat resources section - """ + ''' invalid_ip_addresses = [] for k, v in resources.items(): if not isinstance(v, dict): continue - if "type" not in v: + if 'type' not in v: continue - if v["type"] not in "OS::Nova::Server": + if v['type'] not in 'OS::Nova::Server': continue - if "properties" not in v: + if 'properties' not in v: continue - if "networks" not in v["properties"]: + if 'networks' not in v['properties']: continue port_resource = None @@ -171,16 +135,16 @@ def get_invalid_ip_addresses(resources, port_property): continue # get all ports associated with the nova server - properties = v["properties"] - for network in properties["networks"]: + properties = v['properties'] + for network in properties['networks']: for k3, v3 in network.items(): - if k3 != "port": + if k3 != 'port': continue if not isinstance(v3, dict): continue - if "get_resource" in v3: - port_id = v3["get_resource"] + if 'get_resource' in v3: + port_id = v3['get_resource'] if not resources[port_id]: continue port_resource = resources[port_id] @@ -199,15 +163,23 @@ def get_invalid_ip_addresses(resources, port_property): continue if "get_param" not in v2["ip_address"]: continue - ip_address = v2["ip_address"]["get_param"] if isinstance(ip_address, list): ip_address = ip_address[0] - valid_ip_address = is_valid_ip_address( - ip_address, vm_type, network_role, port_property - ) + if ip_address not in parameters: + continue + + parameter_type = parameters[ip_address].get("type") + if not parameter_type: + continue + + valid_ip_address = is_valid_ip_address(ip_address, + vm_type, + network_role, + port_property, + parameter_type) if not valid_ip_address: invalid_ip_addresses.append(ip_address) @@ -216,14 +188,15 @@ def get_invalid_ip_addresses(resources, port_property): def is_reserved_port(port_id): - """ + ''' Checks to see if the resource id for a port follows the reserve port concept - """ + ''' formats = [ - ["port_id", re.compile(r"reserve_port_(.+?)_floating_ip_\d+")], - ["port_id", re.compile(r"reserve_port_(.+?)_floating_v6_ip_\d+")], - ] + ["port_id", + re.compile(r'reserve_port_(.+?)_floating_ip_\d+')], + ["port_id", + re.compile(r'reserve_port_(.+?)_floating_v6_ip_\d+')]] for f in formats: m = f[1].match(port_id.lower()) if m and m.group(1): diff --git a/ice_validator/tests/utils/vm_types.py b/ice_validator/tests/utils/vm_types.py index 78006b9..6802666 100644 --- a/ice_validator/tests/utils/vm_types.py +++ b/ice_validator/tests/utils/vm_types.py @@ -42,33 +42,36 @@ import re def get_vm_types_for_resource(resource): - """ + ''' Get all unique vm_types for a resource Notes: - Returns set([]) if the resource is not formatted properly, the passed resource is not a nova server - If more than one vm_type is detected all vm_types will be returned - """ + ''' if not isinstance(resource, dict): return set() - if "type" not in resource: + if 'type' not in resource: return set() - if resource["type"] != "OS::Nova::Server": + if resource['type'] != 'OS::Nova::Server': return set() - if "properties" not in resource: + if 'properties' not in resource: return set() key_values = ["name", "flavor", "image"] key_value_formats = [ - ["name", "string", re.compile(r"(.+?)_name_\d+")], - ["name", "comma_delimited_list", re.compile(r"(.+?)_names")], - ["flavor", "string", re.compile(r"(.+?)_flavor_name")], - ["image", "string", re.compile(r"(.+?)_image_name")], - ] + ["name", "string", + re.compile(r'(.+?)_name_\d+')], + ["name", "comma_delimited_list", + re.compile(r'(.+?)_names')], + ["flavor", "string", + re.compile(r'(.+?)_flavor_name')], + ["image", "string", + re.compile(r'(.+?)_image_name')]] vm_types = [] - for k2, v2 in resource["properties"].items(): + for k2, v2 in resource['properties'].items(): if k2 not in key_values: continue if "get_param" not in v2: @@ -86,12 +89,12 @@ def get_vm_types_for_resource(resource): def get_vm_type_for_nova_server(resource): - """ + ''' Get the vm_type for a resource Note: Returns None if not exactly one vm_type is detected, if the resource is not formatted properly, or the passed resource is not a nova server - """ + ''' vm_types = get_vm_types_for_resource(resource) # if more than one vm_type was identified, return None @@ -102,10 +105,10 @@ def get_vm_type_for_nova_server(resource): def get_vm_types(resources): - """ + ''' Get all vm_types for a list of heat resources, do note that some of the values retrieved may be invalid - """ + ''' vm_types = [] for v in resources.values(): vm_types.extend(list(get_vm_types_for_resource(v))) diff --git a/ice_validator/tests/utils/volumes.py b/ice_validator/tests/utils/volumes.py index c64c0ee..40731bf 100644 --- a/ice_validator/tests/utils/volumes.py +++ b/ice_validator/tests/utils/volumes.py @@ -42,7 +42,7 @@ """ from os import path -import yaml +from tests import cached_yaml as yaml VERSION = '1.0.0' |