From 1f4df7c7ad27b23773ad9cdbe4db1632ce388cf1 Mon Sep 17 00:00:00 2001 From: "stark, steven" Date: Mon, 17 Dec 2018 12:43:02 -0800 Subject: [VVP] updating validation scripts in dublin - adding backlog of new validation scripts for dublin - updating existing tests - removing outdated tests Issue-ID: VVP-123 Change-Id: Ib8260889ac957c1dd28d8ede450fc8edc6fb0ec0 Signed-off-by: stark, steven --- ice_validator/tests/utils/nested_dict.py | 30 ++-- ice_validator/tests/utils/nested_files.py | 239 +++++++++++++++++++------ ice_validator/tests/utils/nested_iterables.py | 65 ++++--- ice_validator/tests/utils/network_roles.py | 189 +++++++++---------- ice_validator/tests/utils/ports.py | 201 ++++++++++++--------- ice_validator/tests/utils/vm_types.py | 33 ++-- ice_validator/tests/utils/volumes.py | 18 +- ice_validator/tests/utils/yaml_custom_utils.py | 7 +- 8 files changed, 472 insertions(+), 310 deletions(-) (limited to 'ice_validator/tests/utils') diff --git a/ice_validator/tests/utils/nested_dict.py b/ice_validator/tests/utils/nested_dict.py index 24f7e5e..692ef5f 100644 --- a/ice_validator/tests/utils/nested_dict.py +++ b/ice_validator/tests/utils/nested_dict.py @@ -38,28 +38,30 @@ # ECOMP is a trademark and service mark of AT&T Intellectual Property. # -'''nested_dict.get -''' +"""nested_dict.get +""" -VERSION = '1.0.0' +VERSION = "1.1.1" -def get(dic, *keys): - '''Return the value of the last key given a (nested) dict - and list of keys. If any key is missing, or if the value - of any key except the last is not a dict, then None is returned. - ''' +def get(dic, *keys, **kwargs): + """Return the value of the last key given a (nested) dict and + list of keys. If any key is missing, or if the value of any key + except the last is not a dict, then the default value is returned. + The default value may be passed in using the keyword 'default=', + otherwise the default value is None. + """ d = dic + default = kwargs.get("default", None) for key in keys: - if hasattr(d, 'get'): - d = d.get(key) + if hasattr(d, "get"): + d = d.get(key, default) else: - return None + return default return d def is_dict_has_key(obj, key): - '''return True/False `obj` is a dict and has `key` - ''' + """return True/False `obj` is a dict and has `key` + """ return isinstance(obj, dict) and key in obj - diff --git a/ice_validator/tests/utils/nested_files.py b/ice_validator/tests/utils/nested_files.py index c551646..aff5a6b 100644 --- a/ice_validator/tests/utils/nested_files.py +++ b/ice_validator/tests/utils/nested_files.py @@ -44,16 +44,77 @@ from os import path import re from tests import cached_yaml as yaml +from tests.structures import Heat -VERSION = '1.0.2' +VERSION = "1.4.0" + +""" +test nesting depth +0 -> 1 -> 2 -> too deep. +""" +MAX_DEPTH = 3 + + +def check_for_invalid_nesting( # pylint: disable=too-many-branches + yml, yaml_file, dirpath +): + """ + return a list of all nested files + """ + if not hasattr(yml, "items"): + return [] + invalid_nesting = [] + p = re.compile("^[A-z]*::[A-z]*::[A-z]*$") + + for v in yml.values(): + if isinstance(v, dict) and "type" in v: + t = v["type"] + if t.endswith(".yml") or t.endswith(".yaml"): + filepath = path.join(dirpath, t) + elif t == "OS::Heat::ResourceGroup": + rd = v["properties"]["resource_def"] + if not isinstance(rd, dict) or "type" not in rd: + invalid_nesting.append(yaml_file) + continue + elif not p.match(rd["type"]): + filepath = path.join(dirpath, rd["type"]) + else: + continue + else: + continue + try: + with open(filepath) as fh: + yml = yaml.load(fh) + except yaml.YAMLError as e: + invalid_nesting.append(filepath) + print(e) # pylint: disable=superfluous-parens + invalid_nesting.extend(check_for_invalid_nesting(yml, filepath, dirpath)) + if isinstance(v, dict): + invalid_nesting.extend(check_for_invalid_nesting(v, yaml_file, dirpath)) + elif isinstance(v, list): + for d in v: + invalid_nesting.extend(check_for_invalid_nesting(d, yaml_file, dirpath)) + return invalid_nesting + + +def get_dict_of_nested_files(yml, dirpath): + """Return dict. + key: resource id in yml which references a nested file. + value: the nested file name. + Nested files are either referenced through "type", or + for OS::Heat::ResourceGroup, through "resource_def type". + """ + nested_files = get_type_nested_files(yml, dirpath) + nested_files.update(get_resourcegroup_nested_files(yml, dirpath)) + return nested_files def get_list_of_nested_files(yml, dirpath): - ''' + """ return a list of all nested files - ''' + """ - if not hasattr(yml, 'items'): + if not hasattr(yml, "items"): return [] nested_files = [] @@ -69,72 +130,132 @@ def get_list_of_nested_files(yml, dirpath): nested_files.append(filepath) nested_files.extend(get_list_of_nested_files(t_yml, dirpath)) elif t == "OS::Heat::ResourceGroup": - rdt = (v.get("properties", {}) - .get("resource_def", {}) - .get("type", None)) + rdt = v.get("properties", {}).get("resource_def", {}).get("type", None) if rdt and (rdt.endswith(".yml") or rdt.endswith(".yaml")): filepath = path.join(dirpath, rdt) if path.exists(filepath): with open(filepath) as fh: rdt_yml = yaml.load(fh) nested_files.append(filepath) - nested_files.extend( - get_list_of_nested_files(rdt_yml, dirpath)) + nested_files.extend(get_list_of_nested_files(rdt_yml, dirpath)) if isinstance(v, dict): - nested_files.extend( - get_list_of_nested_files(v, dirpath)) + nested_files.extend(get_list_of_nested_files(v, dirpath)) elif isinstance(v, list): for d in v: - nested_files.extend( - get_list_of_nested_files(d, dirpath)) + nested_files.extend(get_list_of_nested_files(d, dirpath)) return nested_files -def check_for_invalid_nesting(yml, yaml_file, dirpath): - ''' - return a list of all nested files - ''' - if not hasattr(yml, 'items'): - return [] - invalid_nesting = [] - p = re.compile('^[A-z]*::[A-z]*::[A-z]*$') +def get_nesting(yaml_files): + """return bad, files, heat, depths + bad - list of error messages. + files - dict: key is filename, value is dict of nested files. + This is the tree. + heat - dict,: key is filename, value is Heat instance. + depths - dict: key is filename, value is a depth tuple - for v in yml.values(): - if isinstance(v, dict) and "type" in v: - t = v["type"] - if t.endswith(".yml") or t.endswith(".yaml"): - filepath = path.join(dirpath, t) - elif t == "OS::Heat::ResourceGroup": - rd = v["properties"]["resource_def"] - if not isinstance(rd, dict) or "type" not in rd: - invalid_nesting.append(yaml_file) - continue - elif not p.match(rd["type"]): - filepath = path.join(dirpath, rd["type"]) - else: - continue - else: - continue - try: - with open(filepath) as fh: - yml = yaml.load(fh) - except yaml.YAMLError as e: - invalid_nesting.append(filepath) - print(e) # pylint: disable=superfluous-parens - invalid_nesting.extend(check_for_invalid_nesting( - yml, - filepath, - dirpath)) - if isinstance(v, dict): - invalid_nesting.extend(check_for_invalid_nesting( - v, - yaml_file, - dirpath)) - elif isinstance(v, list): - for d in v: - invalid_nesting.extend(check_for_invalid_nesting( - d, - yaml_file, - dirpath)) - return invalid_nesting + level: 0 1 2 3 + file: template -> nested -> nested -> nested + depth: 3 2 1 0 + """ + bad = [] + files = {} + heat = {} + depths = {} + for yaml_file in yaml_files: + dirname, basename = path.split(yaml_file) + h = Heat(filepath=yaml_file) + heat[basename] = h + files[basename] = get_dict_of_nested_files(h.yml, dirname) + for filename in files: + depths[filename] = _get_nesting_depth_start(0, filename, files, []) + for depth in depths[filename]: + if depth[0] > MAX_DEPTH: + bad.append("{} {}".format(filename, str(depth[1]))) + return bad, files, heat, depths + + +def _get_nesting_depth_start(depth, filename, files, context): + depths = [] + for rid, nf in files[filename].items(): + depths.append(_get_nesting_depth(1, nf, files, context)) + return depths + + +def _get_nesting_depth(depth, filename, files, context): + """Return a depth tuple (max_depth, current_context). + `context` is the list of filenames. + `depth` is the length of `context`. + Finds the max_depth of all the resources of `filename`. + current_context is the updated list of filenames + and max_depth is its length. + """ + max_depth = depth + 1 + current_context = context + [filename] + if depth <= MAX_DEPTH: + nested_filenames = files.get(filename, {}) + if nested_filenames: + max_depth, current_context = max( + _get_nesting_depth(depth + 1, nested_filename, files, current_context) + for nested_filename in nested_filenames.values() + ) + return max_depth, current_context + +def get_resourcegroup_nested_files(yml, dirpath): + """ + return a dict. + key: key in yml which references a nested ResourceGroup file. + (resource->type is ResourceGroup + and resource->properties->resource_def->type is a yaml file) + value: the nested file name. + + The keys are assumed to be unique across files. + A separate test checks for that. + """ + + if not hasattr(yml, "get"): + return {} + + nested_files = {} + for rid, r in yml.get("resources", {}).items(): + if isinstance(r, dict) and "type" in r: + t = r["type"] + nested_file = None + if t == "OS::Heat::ResourceGroup": + rdt = r.get("properties", {}).get("resource_def", {}).get("type", None) + if rdt and (rdt.endswith(".yml") or rdt.endswith(".yaml")): + nested_file = rdt + if nested_file: + filepath = path.join(dirpath, nested_file) + if path.exists(filepath): + nested_files[rid] = nested_file + return nested_files + + +def get_type_nested_files(yml, dirpath): + """ + return a dict. + key: key in yml which references a nested type file. + (the resource "type" is a yaml file.) + value: the nested file name. + + The keys are assumed to be unique across files. + A separate test checks for that. + """ + + if not hasattr(yml, "get"): + return {} + + nested_files = {} + for rid, r in yml.get("resources", {}).items(): + if isinstance(r, dict) and "type" in r: + t = r["type"] + nested_file = None + if t.endswith(".yml") or t.endswith(".yaml"): + nested_file = t + if nested_file: + filepath = path.join(dirpath, nested_file) + if path.exists(filepath): + nested_files[rid] = nested_file + return nested_files diff --git a/ice_validator/tests/utils/nested_iterables.py b/ice_validator/tests/utils/nested_iterables.py index b6deaba..0768339 100644 --- a/ice_validator/tests/utils/nested_iterables.py +++ b/ice_validator/tests/utils/nested_iterables.py @@ -40,10 +40,10 @@ def parse_nested_dict(d, key=""): - ''' + """ parse the nested dictionary and return values of given key of function parameter only - ''' + """ nested_elements = [] for k, v in d.items(): if isinstance(v, dict): @@ -60,29 +60,27 @@ def parse_nested_dict(d, key=""): def find_all_get_param_in_yml(yml): - ''' + """ Recursively find all referenced parameters in a parsed yaml body and return a list of parameters - ''' - os_pseudo_parameters = ['OS::stack_name', - 'OS::stack_id', - 'OS::project_id'] + """ + os_pseudo_parameters = ["OS::stack_name", "OS::stack_id", "OS::project_id"] - if not hasattr(yml, 'items'): + if not hasattr(yml, "items"): return [] params = [] for k, v in yml.items(): - if k == 'get_param' and v not in os_pseudo_parameters: + if k == "get_param" and v not in os_pseudo_parameters: if isinstance(v, list) and not isinstance(v[0], dict): params.append(v[0]) elif not isinstance(v, dict) and isinstance(v, str): params.append(v) - for item in (v if isinstance(v, list) else [v]): + for item in v if isinstance(v, list) else [v]: if isinstance(item, dict): params.extend(find_all_get_param_in_yml(item)) continue - elif k == 'list_join': - for item in (v if isinstance(v, list) else [v]): + elif k == "list_join": + for item in v if isinstance(v, list) else [v]: if isinstance(item, list): for d in item: params.extend(find_all_get_param_in_yml(d)) @@ -96,15 +94,15 @@ def find_all_get_param_in_yml(yml): def find_all_get_resource_in_yml(yml): - ''' + """ Recursively find all referenced resources in a parsed yaml body and return a list of resource ids - ''' - if not hasattr(yml, 'items'): + """ + if not hasattr(yml, "items"): return [] resources = [] for k, v in yml.items(): - if k == 'get_resource': + if k == "get_resource": if isinstance(v, list): resources.append(v[0]) else: @@ -119,15 +117,15 @@ def find_all_get_resource_in_yml(yml): def find_all_get_file_in_yml(yml): - ''' + """ Recursively find all get_file in a parsed yaml body and return the list of referenced files/urls - ''' - if not hasattr(yml, 'items'): + """ + if not hasattr(yml, "items"): return [] resources = [] for k, v in yml.items(): - if k == 'get_file': + if k == "get_file": if isinstance(v, list): resources.append(v[0]) else: @@ -142,37 +140,35 @@ def find_all_get_file_in_yml(yml): def find_all_get_resource_in_resource(resource): - ''' + """ Recursively find all referenced resources in a heat resource and return a list of resource ids - ''' - if not hasattr(resource, 'items'): + """ + if not hasattr(resource, "items"): return [] resources = [] for k, v in resource.items(): - if k == 'get_resource': + if k == "get_resource": if isinstance(v, list): resources.append(v[0]) else: resources.append(v) continue if isinstance(v, dict): - resources.extend( - find_all_get_resource_in_resource(v)) + resources.extend(find_all_get_resource_in_resource(v)) elif isinstance(v, list): for d in v: - resources.extend( - find_all_get_resource_in_resource(d)) + resources.extend(find_all_get_resource_in_resource(d)) return resources def get_associated_resources_per_resource(resources): - ''' + """ Recursively find all referenced resources for each resource in a list of resource ids - ''' - if not hasattr(resources, 'items'): + """ + if not hasattr(resources, "items"): return None resources_dict = {} @@ -183,8 +179,7 @@ def get_associated_resources_per_resource(resources): get_resources = [] for k, v in res_value: - if k == 'get_resource' and\ - isinstance(v, dict): + if k == "get_resource" and isinstance(v, dict): get_resources = find_all_get_resource_in_resource(v) # if resources found, add to dict @@ -201,9 +196,9 @@ def get_associated_resources_per_resource(resources): def flatten(items): - ''' + """ flatten items from any nested iterable - ''' + """ merged_list = [] for item in items: diff --git a/ice_validator/tests/utils/network_roles.py b/ice_validator/tests/utils/network_roles.py index bed3a5a..ffb9870 100644 --- a/ice_validator/tests/utils/network_roles.py +++ b/ice_validator/tests/utils/network_roles.py @@ -41,111 +41,114 @@ import re import socket - -def get_network_role_from_port(resource): - ''' - get the network role from a neutron port resource - ''' +PARAM_FORMATS = [ + ["network", "string", "internal", re.compile(r"int_(.+?)_net_id")], + ["network", "string", "internal", re.compile(r"int_(.+?)_net_name")], + ["network", "string", "external", re.compile(r"(.+?)_net_id")], + ["network", "string", "external", re.compile(r"(.+?)_net_name")], +] + +RESOURCE_FORMATS = [ + re.compile(r"int_(.+?)_network"), # OS::ContrailV2::VirtualNetwork + re.compile(r"int_(.+?)_RVN"), # OS::ContrailV2::VirtualNetwork + re.compile(r"int_(.+?)"), # OS::Neutron::Net +] + + +def get_network_role_and_type(resource): + """ + Derive the network role and type (internal vs. external) from an + OS::Neutron::Port. + + :param resource: dict of Resource attributes + :return: tuple of (network_role, network_type) where network_type is + 'internal' or 'external'. Returns (None, None) if resource + is not a port or the values cannot be derived. + """ if not isinstance(resource, dict): - return None - if 'type' not in resource: - return None - if resource['type'] != 'OS::Neutron::Port': - return None - if 'properties' not in resource: - return None - - formats = [ - ["network", "string", "internal", - re.compile(r'int_(.+?)_net_id')], - ["network", "string", "internal", - re.compile(r'int_(.+?)_net_name')], - ["network", "string", "external", - re.compile(r'(.+?)_net_id')], - ["network", "string", "external", - re.compile(r'(.+?)_net_name')]] - - for k1, v1 in resource["properties"].items(): - if k1 != 'network': - continue - - # get the network id or name - network = ( - v1.get('get_param') or - v1.get('get_resource')) - if not network: - continue - - for v2 in formats: - m = v2[3].match(network) + return None, None + if resource.get("type", "") != "OS::Neutron::Port": + return None, None + + network_props = resource.get("properties", {}).get("network", {}) + is_resource = "get_resource" in network_props + if is_resource: + network = network_props.get("get_resource", "") + else: + network = network_props.get("get_param", "") + + if is_resource: # connecting to an network in the template + for format in RESOURCE_FORMATS: + m = format.match(network) if m and m.group(1): - return m.group(1) - - return None + return m.group(1), "internal" + else: + for format in PARAM_FORMATS: + m = format[3].match(network) + if m and m.group(1): + return m.group(1), format[2] + return None, None -def get_network_roles(resources): - network_roles = [] +def get_network_role_from_port(resource): + """ + Get the network-role from a OS::Neutron::Port resource. Returns None + if resource is not a port or the network-role cannot be derived + """ + return get_network_role_and_type(resource)[0] + + +def get_network_roles(resources, of_type=""): + """ + Returns the network roles derived from the OS::Neutron::Port resources + in the collection of ``resources``. If ``of_type`` is not specified + then all network roles will be returned, or ``external`` or ``internal`` + can be passed to select only those network roles + + :param resources: collection of resource attributes (dict) + :param of_type: "internal" or "external" + :return: set of network roles discovered + """ + valid_of_type = ("", "external", "internal") + if of_type not in ("", "external", "internal"): + raise RuntimeError("of_type must one of " + ", ".join(valid_of_type)) + network_roles = set() for v in resources.values(): - nr = get_network_role_from_port(v) - if nr: - network_roles.append(nr) - - return set(network_roles) + nr, nt = get_network_role_and_type(v) + if not nr: + continue + if not of_type: + network_roles.add(nr) + elif of_type and of_type == nt: + network_roles.add(nr) + return network_roles def get_network_type_from_port(resource): - ''' - get the network type from a neutron port resource - ''' - if not isinstance(resource, dict): - return None - if 'type' not in resource: - return None - if resource['type'] != 'OS::Neutron::Port': - return None - if 'properties' not in resource: - return None - - formats = [ - ["network", "string", "internal", - re.compile(r'int_(.+?)_net_id')], - ["network", "string", "internal", - re.compile(r'int_(.+?)_net_name')], - ["network", "string", "external", - re.compile(r'(.+?)_net_id')], - ["network", "string", "external", - re.compile(r'(.+?)_net_name')]] - - for k1, v1 in resource["properties"].items(): - if k1 != 'network': - continue - if "get_param" not in v1: - continue - for v2 in formats: - m = v2[3].match(v1["get_param"]) - if m and m.group(1): - return v2[2] - - return None + """ + Get the network-type (internal or external) from an OS::Neutron::Port + resource. Returns None if the resource is not a port or the type + cannot be derived. + """ + return get_network_role_and_type(resource)[1] -def is_valid_ip_address(ip_address, ip_type='ipv4'): - ''' +def is_valid_ip_address(ip_address, ip_type="ipv4"): + """ check if an ip address is valid - ''' - if ip_type == 'ipv4': + """ + if ip_type == "ipv4": return is_valid_ipv4_address(ip_address) - elif ip_type == 'ipv6': + elif ip_type == "ipv6": return is_valid_ipv6_address(ip_address) return False def is_valid_ipv4_address(ip_address): - ''' + """ check if an ip address of the type ipv4 is valid - ''' + """ try: socket.inet_pton(socket.AF_INET, ip_address) except AttributeError: @@ -153,17 +156,17 @@ def is_valid_ipv4_address(ip_address): socket.inet_aton(ip_address) except (OSError, socket.error): return False - return ip_address.count('.') == 3 + return ip_address.count(".") == 3 except (OSError, socket.error): return False return True def is_valid_ipv6_address(ip_address): - ''' + """ check if an ip address of the type ipv6 is valid - ''' + """ try: socket.inet_pton(socket.AF_INET6, ip_address) except (OSError, socket.error): @@ -172,17 +175,17 @@ def is_valid_ipv6_address(ip_address): def property_uses_get_resource(resource, property_name): - ''' + """ returns true if a port's network property uses the get_resource function - ''' + """ if not isinstance(resource, dict): return False - if 'properties' not in resource: + if "properties" not in resource: return False for k1, v1 in resource["properties"].items(): if k1 != property_name: continue - if "get_resource" in v1: + if isinstance(v1, dict) and "get_resource" in v1: return True return False diff --git a/ice_validator/tests/utils/ports.py b/ice_validator/tests/utils/ports.py index a2ae8a9..4029d3c 100644 --- a/ice_validator/tests/utils/ports.py +++ b/ice_validator/tests/utils/ports.py @@ -43,53 +43,96 @@ from .vm_types import get_vm_type_for_nova_server import re -def is_valid_ip_address(ip_address, vm_type, network_role, port_property, parameter_type): - ''' +def is_valid_ip_address( + ip_address, vm_type, network_role, port_property, parameter_type +): + """ Check the ip_address to make sure it is properly formatted and also contains {vm_type} and {network_role} - ''' + """ allowed_formats = [ - ["allowed_address_pairs", "string", "internal", - re.compile(r'(.+?)_int_(.+?)_floating_v6_ip')], - ["allowed_address_pairs", "string", "internal", - re.compile(r'(.+?)_int_(.+?)_floating_ip')], - ["allowed_address_pairs", "string", "external", - re.compile(r'(.+?)_floating_v6_ip')], - ["allowed_address_pairs", "string", "external", - re.compile(r'(.+?)_floating_ip')], - ["allowed_address_pairs", "string", "internal", - re.compile(r'(.+?)_int_(.+?)_v6_ip_\d+')], - ["allowed_address_pairs", "string", "internal", - re.compile(r'(.+?)_int_(.+?)_ip_\d+')], - ["allowed_address_pairs", "string", "external", - re.compile(r'(.+?)_v6_ip_\d+')], - ["allowed_address_pairs", "string", "external", - re.compile(r'(.+?)_ip_\d+')], - ["allowed_address_pairs", "comma_delimited_list", - "internal", re.compile(r'(.+?)_int_(.+?)_v6_ips')], - ["allowed_address_pairs", "comma_delimited_list", - "internal", re.compile(r'(.+?)_int_(.+?)_ips')], - ["allowed_address_pairs", "comma_delimited_list", - "external", re.compile(r'(.+?)_v6_ips')], - ["allowed_address_pairs", "comma_delimited_list", - "external", re.compile(r'(.+?)_ips')], - ["fixed_ips", "string", "internal", - re.compile(r'(.+?)_int_(.+?)_v6_ip_\d+')], - ["fixed_ips", "string", "internal", - re.compile(r'(.+?)_int_(.+?)_ip_\d+')], - ["fixed_ips", "string", "external", - re.compile(r'(.+?)_v6_ip_\d+')], - ["fixed_ips", "string", "external", - re.compile(r'(.+?)_ip_\d+')], - ["fixed_ips", "comma_delimited_list", "internal", - re.compile(r'(.+?)_int_(.+?)_v6_ips')], - ["fixed_ips", "comma_delimited_list", "internal", - re.compile(r'(.+?)_int_(.+?)_ips')], - ["fixed_ips", "comma_delimited_list", "external", - re.compile(r'(.+?)_v6_ips')], - ["fixed_ips", "comma_delimited_list", "external", - re.compile(r'(.+?)_ips')]] + [ + "allowed_address_pairs", + "string", + "internal", + re.compile(r"(.+?)_int_(.+?)_floating_v6_ip"), + ], + [ + "allowed_address_pairs", + "string", + "internal", + re.compile(r"(.+?)_int_(.+?)_floating_ip"), + ], + [ + "allowed_address_pairs", + "string", + "external", + re.compile(r"(.+?)_floating_v6_ip"), + ], + [ + "allowed_address_pairs", + "string", + "external", + re.compile(r"(.+?)_floating_ip"), + ], + [ + "allowed_address_pairs", + "string", + "internal", + re.compile(r"(.+?)_int_(.+?)_v6_ip_\d+"), + ], + [ + "allowed_address_pairs", + "string", + "internal", + re.compile(r"(.+?)_int_(.+?)_ip_\d+"), + ], + ["allowed_address_pairs", "string", "external", re.compile(r"(.+?)_v6_ip_\d+")], + ["allowed_address_pairs", "string", "external", re.compile(r"(.+?)_ip_\d+")], + [ + "allowed_address_pairs", + "comma_delimited_list", + "internal", + re.compile(r"(.+?)_int_(.+?)_v6_ips"), + ], + [ + "allowed_address_pairs", + "comma_delimited_list", + "internal", + re.compile(r"(.+?)_int_(.+?)_ips"), + ], + [ + "allowed_address_pairs", + "comma_delimited_list", + "external", + re.compile(r"(.+?)_v6_ips"), + ], + [ + "allowed_address_pairs", + "comma_delimited_list", + "external", + re.compile(r"(.+?)_ips"), + ], + ["fixed_ips", "string", "internal", re.compile(r"(.+?)_int_(.+?)_v6_ip_\d+")], + ["fixed_ips", "string", "internal", re.compile(r"(.+?)_int_(.+?)_ip_\d+")], + ["fixed_ips", "string", "external", re.compile(r"(.+?)_v6_ip_\d+")], + ["fixed_ips", "string", "external", re.compile(r"(.+?)_ip_\d+")], + [ + "fixed_ips", + "comma_delimited_list", + "internal", + re.compile(r"(.+?)_int_(.+?)_v6_ips"), + ], + [ + "fixed_ips", + "comma_delimited_list", + "internal", + re.compile(r"(.+?)_int_(.+?)_ips"), + ], + ["fixed_ips", "comma_delimited_list", "external", re.compile(r"(.+?)_v6_ips")], + ["fixed_ips", "comma_delimited_list", "external", re.compile(r"(.+?)_ips")], + ] for v3 in allowed_formats: if v3[1] != parameter_type: @@ -99,33 +142,30 @@ def is_valid_ip_address(ip_address, vm_type, network_role, port_property, parame # check if pattern matches m = v3[3].match(ip_address) if m: - if (v3[2] == "internal" and - len(m.groups()) > 1): - return m.group(1) == vm_type and\ - m.group(2) == network_role - elif (v3[2] == "external" and - len(m.groups()) > 0): + if v3[2] == "internal" and len(m.groups()) > 1: + return m.group(1) == vm_type and m.group(2) == network_role + elif v3[2] == "external" and len(m.groups()) > 0: return m.group(1) == vm_type + "_" + network_role return False def get_invalid_ip_addresses(resources, port_property, parameters): - ''' + """ Get a list of valid ip addresses for a heat resources section - ''' + """ invalid_ip_addresses = [] for k, v in resources.items(): if not isinstance(v, dict): continue - if 'type' not in v: + if "type" not in v: continue - if v['type'] not in 'OS::Nova::Server': + if v["type"] not in "OS::Nova::Server": continue - if 'properties' not in v: + if "properties" not in v: continue - if 'networks' not in v['properties']: + if "networks" not in v["properties"]: continue port_resource = None @@ -135,16 +175,16 @@ def get_invalid_ip_addresses(resources, port_property, parameters): continue # get all ports associated with the nova server - properties = v['properties'] - for network in properties['networks']: + properties = v["properties"] + for network in properties["networks"]: for k3, v3 in network.items(): - if k3 != 'port': + if k3 != "port": continue if not isinstance(v3, dict): continue - if 'get_resource' in v3: - port_id = v3['get_resource'] + if "get_resource" in v3: + port_id = v3["get_resource"] if not resources[port_id]: continue port_resource = resources[port_id] @@ -175,11 +215,13 @@ def get_invalid_ip_addresses(resources, port_property, parameters): if not parameter_type: continue - valid_ip_address = is_valid_ip_address(ip_address, - vm_type, - network_role, - port_property, - parameter_type) + valid_ip_address = is_valid_ip_address( + ip_address, + vm_type, + network_role, + port_property, + parameter_type, + ) if not valid_ip_address: invalid_ip_addresses.append(ip_address) @@ -187,18 +229,17 @@ def get_invalid_ip_addresses(resources, port_property, parameters): return invalid_ip_addresses -def is_reserved_port(port_id): - ''' - Checks to see if the resource id for a port follows - the reserve port concept - ''' - formats = [ - ["port_id", - re.compile(r'reserve_port_(.+?)_floating_ip_\d+')], - ["port_id", - re.compile(r'reserve_port_(.+?)_floating_v6_ip_\d+')]] - for f in formats: - m = f[1].match(port_id.lower()) - if m and m.group(1): - return True - return False +def get_list_of_ports_attached_to_nova_server(nova_server): + networks_list = nova_server.get("properties", {}).get("networks") + + port_ids = [] + if networks_list: + for network in networks_list: + network_prop = network.get("port") + if network_prop: + pid = network_prop.get("get_param") + if not pid: + pid = network_prop.get("get_resource") + port_ids.append(pid) + + return port_ids diff --git a/ice_validator/tests/utils/vm_types.py b/ice_validator/tests/utils/vm_types.py index 6802666..78006b9 100644 --- a/ice_validator/tests/utils/vm_types.py +++ b/ice_validator/tests/utils/vm_types.py @@ -42,36 +42,33 @@ import re def get_vm_types_for_resource(resource): - ''' + """ Get all unique vm_types for a resource Notes: - Returns set([]) if the resource is not formatted properly, the passed resource is not a nova server - If more than one vm_type is detected all vm_types will be returned - ''' + """ if not isinstance(resource, dict): return set() - if 'type' not in resource: + if "type" not in resource: return set() - if resource['type'] != 'OS::Nova::Server': + if resource["type"] != "OS::Nova::Server": return set() - if 'properties' not in resource: + if "properties" not in resource: return set() key_values = ["name", "flavor", "image"] key_value_formats = [ - ["name", "string", - re.compile(r'(.+?)_name_\d+')], - ["name", "comma_delimited_list", - re.compile(r'(.+?)_names')], - ["flavor", "string", - re.compile(r'(.+?)_flavor_name')], - ["image", "string", - re.compile(r'(.+?)_image_name')]] + ["name", "string", re.compile(r"(.+?)_name_\d+")], + ["name", "comma_delimited_list", re.compile(r"(.+?)_names")], + ["flavor", "string", re.compile(r"(.+?)_flavor_name")], + ["image", "string", re.compile(r"(.+?)_image_name")], + ] vm_types = [] - for k2, v2 in resource['properties'].items(): + for k2, v2 in resource["properties"].items(): if k2 not in key_values: continue if "get_param" not in v2: @@ -89,12 +86,12 @@ def get_vm_types_for_resource(resource): def get_vm_type_for_nova_server(resource): - ''' + """ Get the vm_type for a resource Note: Returns None if not exactly one vm_type is detected, if the resource is not formatted properly, or the passed resource is not a nova server - ''' + """ vm_types = get_vm_types_for_resource(resource) # if more than one vm_type was identified, return None @@ -105,10 +102,10 @@ def get_vm_type_for_nova_server(resource): def get_vm_types(resources): - ''' + """ Get all vm_types for a list of heat resources, do note that some of the values retrieved may be invalid - ''' + """ vm_types = [] for v in resources.values(): vm_types.extend(list(get_vm_types_for_resource(v))) diff --git a/ice_validator/tests/utils/volumes.py b/ice_validator/tests/utils/volumes.py index 40731bf..6f52dd8 100644 --- a/ice_validator/tests/utils/volumes.py +++ b/ice_validator/tests/utils/volumes.py @@ -44,19 +44,19 @@ from os import path from tests import cached_yaml as yaml -VERSION = '1.0.0' +VERSION = "1.0.0" def get_volume_resources(heat_template): - ''' + """ get the resources from the volume template Note: Returns an empty dict if there is no volume template or for any other error - ''' + """ basename = path.splitext(heat_template)[0] - for ext in ['.yaml', '.yml']: - volume_template = basename + '_volume' + ext + for ext in [".yaml", ".yml"]: + volume_template = basename + "_volume" + ext if path.isfile(volume_template): break else: @@ -66,12 +66,12 @@ def get_volume_resources(heat_template): with open(volume_template) as fh: yml = yaml.load(fh) except yaml.YAMLError as e: - print(e) # pylint: disable=superfluous-parens + print(e) # pylint: disable=superfluous-parens return {} - if 'outputs' not in yml: + if "outputs" not in yml: return {} - if 'resources' not in yml: + if "resources" not in yml: return {} - return yml['resources'] + return yml["resources"] diff --git a/ice_validator/tests/utils/yaml_custom_utils.py b/ice_validator/tests/utils/yaml_custom_utils.py index 8a5006c..1e4b0e5 100644 --- a/ice_validator/tests/utils/yaml_custom_utils.py +++ b/ice_validator/tests/utils/yaml_custom_utils.py @@ -50,8 +50,11 @@ def raise_duplicates_keys(loader, node, deep=False): value = loader.construct_object(value_node, deep=deep) if key in mapping: raise ConstructorError( - "while constructing a mapping", node.start_mark, - "found duplicate key (%s)" % key, key_node.start_mark) + "while constructing a mapping", + node.start_mark, + "found duplicate key (%s)" % key, + key_node.start_mark, + ) mapping[key] = value return loader.construct_mapping(node, deep) -- cgit 1.2.3-korg