diff options
Diffstat (limited to 'ice_validator/tests/utils/network_roles.py')
-rw-r--r-- | ice_validator/tests/utils/network_roles.py | 189 |
1 files changed, 96 insertions, 93 deletions
diff --git a/ice_validator/tests/utils/network_roles.py b/ice_validator/tests/utils/network_roles.py index bed3a5a..ffb9870 100644 --- a/ice_validator/tests/utils/network_roles.py +++ b/ice_validator/tests/utils/network_roles.py @@ -41,111 +41,114 @@ import re import socket - -def get_network_role_from_port(resource): - ''' - get the network role from a neutron port resource - ''' +PARAM_FORMATS = [ + ["network", "string", "internal", re.compile(r"int_(.+?)_net_id")], + ["network", "string", "internal", re.compile(r"int_(.+?)_net_name")], + ["network", "string", "external", re.compile(r"(.+?)_net_id")], + ["network", "string", "external", re.compile(r"(.+?)_net_name")], +] + +RESOURCE_FORMATS = [ + re.compile(r"int_(.+?)_network"), # OS::ContrailV2::VirtualNetwork + re.compile(r"int_(.+?)_RVN"), # OS::ContrailV2::VirtualNetwork + re.compile(r"int_(.+?)"), # OS::Neutron::Net +] + + +def get_network_role_and_type(resource): + """ + Derive the network role and type (internal vs. external) from an + OS::Neutron::Port. + + :param resource: dict of Resource attributes + :return: tuple of (network_role, network_type) where network_type is + 'internal' or 'external'. Returns (None, None) if resource + is not a port or the values cannot be derived. + """ if not isinstance(resource, dict): - return None - if 'type' not in resource: - return None - if resource['type'] != 'OS::Neutron::Port': - return None - if 'properties' not in resource: - return None - - formats = [ - ["network", "string", "internal", - re.compile(r'int_(.+?)_net_id')], - ["network", "string", "internal", - re.compile(r'int_(.+?)_net_name')], - ["network", "string", "external", - re.compile(r'(.+?)_net_id')], - ["network", "string", "external", - re.compile(r'(.+?)_net_name')]] - - for k1, v1 in resource["properties"].items(): - if k1 != 'network': - continue - - # get the network id or name - network = ( - v1.get('get_param') or - v1.get('get_resource')) - if not network: - continue - - for v2 in formats: - m = v2[3].match(network) + return None, None + if resource.get("type", "") != "OS::Neutron::Port": + return None, None + + network_props = resource.get("properties", {}).get("network", {}) + is_resource = "get_resource" in network_props + if is_resource: + network = network_props.get("get_resource", "") + else: + network = network_props.get("get_param", "") + + if is_resource: # connecting to an network in the template + for format in RESOURCE_FORMATS: + m = format.match(network) if m and m.group(1): - return m.group(1) - - return None + return m.group(1), "internal" + else: + for format in PARAM_FORMATS: + m = format[3].match(network) + if m and m.group(1): + return m.group(1), format[2] + return None, None -def get_network_roles(resources): - network_roles = [] +def get_network_role_from_port(resource): + """ + Get the network-role from a OS::Neutron::Port resource. Returns None + if resource is not a port or the network-role cannot be derived + """ + return get_network_role_and_type(resource)[0] + + +def get_network_roles(resources, of_type=""): + """ + Returns the network roles derived from the OS::Neutron::Port resources + in the collection of ``resources``. If ``of_type`` is not specified + then all network roles will be returned, or ``external`` or ``internal`` + can be passed to select only those network roles + + :param resources: collection of resource attributes (dict) + :param of_type: "internal" or "external" + :return: set of network roles discovered + """ + valid_of_type = ("", "external", "internal") + if of_type not in ("", "external", "internal"): + raise RuntimeError("of_type must one of " + ", ".join(valid_of_type)) + network_roles = set() for v in resources.values(): - nr = get_network_role_from_port(v) - if nr: - network_roles.append(nr) - - return set(network_roles) + nr, nt = get_network_role_and_type(v) + if not nr: + continue + if not of_type: + network_roles.add(nr) + elif of_type and of_type == nt: + network_roles.add(nr) + return network_roles def get_network_type_from_port(resource): - ''' - get the network type from a neutron port resource - ''' - if not isinstance(resource, dict): - return None - if 'type' not in resource: - return None - if resource['type'] != 'OS::Neutron::Port': - return None - if 'properties' not in resource: - return None - - formats = [ - ["network", "string", "internal", - re.compile(r'int_(.+?)_net_id')], - ["network", "string", "internal", - re.compile(r'int_(.+?)_net_name')], - ["network", "string", "external", - re.compile(r'(.+?)_net_id')], - ["network", "string", "external", - re.compile(r'(.+?)_net_name')]] - - for k1, v1 in resource["properties"].items(): - if k1 != 'network': - continue - if "get_param" not in v1: - continue - for v2 in formats: - m = v2[3].match(v1["get_param"]) - if m and m.group(1): - return v2[2] - - return None + """ + Get the network-type (internal or external) from an OS::Neutron::Port + resource. Returns None if the resource is not a port or the type + cannot be derived. + """ + return get_network_role_and_type(resource)[1] -def is_valid_ip_address(ip_address, ip_type='ipv4'): - ''' +def is_valid_ip_address(ip_address, ip_type="ipv4"): + """ check if an ip address is valid - ''' - if ip_type == 'ipv4': + """ + if ip_type == "ipv4": return is_valid_ipv4_address(ip_address) - elif ip_type == 'ipv6': + elif ip_type == "ipv6": return is_valid_ipv6_address(ip_address) return False def is_valid_ipv4_address(ip_address): - ''' + """ check if an ip address of the type ipv4 is valid - ''' + """ try: socket.inet_pton(socket.AF_INET, ip_address) except AttributeError: @@ -153,17 +156,17 @@ def is_valid_ipv4_address(ip_address): socket.inet_aton(ip_address) except (OSError, socket.error): return False - return ip_address.count('.') == 3 + return ip_address.count(".") == 3 except (OSError, socket.error): return False return True def is_valid_ipv6_address(ip_address): - ''' + """ check if an ip address of the type ipv6 is valid - ''' + """ try: socket.inet_pton(socket.AF_INET6, ip_address) except (OSError, socket.error): @@ -172,17 +175,17 @@ def is_valid_ipv6_address(ip_address): def property_uses_get_resource(resource, property_name): - ''' + """ returns true if a port's network property uses the get_resource function - ''' + """ if not isinstance(resource, dict): return False - if 'properties' not in resource: + if "properties" not in resource: return False for k1, v1 in resource["properties"].items(): if k1 != property_name: continue - if "get_resource" in v1: + if isinstance(v1, dict) and "get_resource" in v1: return True return False |