aboutsummaryrefslogtreecommitdiffstats
path: root/ice_validator/tests/utils/ports.py
diff options
context:
space:
mode:
Diffstat (limited to 'ice_validator/tests/utils/ports.py')
-rw-r--r--ice_validator/tests/utils/ports.py189
1 files changed, 81 insertions, 108 deletions
diff --git a/ice_validator/tests/utils/ports.py b/ice_validator/tests/utils/ports.py
index e479201..a2ae8a9 100644
--- a/ice_validator/tests/utils/ports.py
+++ b/ice_validator/tests/utils/ports.py
@@ -43,125 +43,89 @@ from .vm_types import get_vm_type_for_nova_server
import re
-def is_valid_ip_address(ip_address, vm_type, network_role, port_property):
- """
+def is_valid_ip_address(ip_address, vm_type, network_role, port_property, parameter_type):
+ '''
Check the ip_address to make sure it is properly formatted and
also contains {vm_type} and {network_role}
- """
+ '''
allowed_formats = [
- [
- "allowed_address_pairs",
- "string",
- "internal",
- re.compile(r"(.+?)_int_(.+?)_floating_v6_ip"),
- ],
- [
- "allowed_address_pairs",
- "string",
- "internal",
- re.compile(r"(.+?)_int_(.+?)_floating_ip"),
- ],
- [
- "allowed_address_pairs",
- "string",
- "external",
- re.compile(r"(.+?)_floating_v6_ip"),
- ],
- [
- "allowed_address_pairs",
- "string",
- "external",
- re.compile(r"(.+?)_floating_ip"),
- ],
- [
- "allowed_address_pairs",
- "string",
- "internal",
- re.compile(r"(.+?)_int_(.+?)_v6_ip_\d+"),
- ],
- [
- "allowed_address_pairs",
- "string",
- "internal",
- re.compile(r"(.+?)_int_(.+?)_ip_\d+"),
- ],
- ["allowed_address_pairs", "string", "external", re.compile(r"(.+?)_v6_ip_\d+")],
- ["allowed_address_pairs", "string", "external", re.compile(r"(.+?)_ip_\d+")],
- [
- "allowed_address_pairs",
- "comma_delimited_list",
- "internal",
- re.compile(r"(.+?)_int_(.+?)_v6_ips"),
- ],
- [
- "allowed_address_pairs",
- "comma_delimited_list",
- "internal",
- re.compile(r"(.+?)_int_(.+?)_ips"),
- ],
- [
- "allowed_address_pairs",
- "comma_delimited_list",
- "external",
- re.compile(r"(.+?)_v6_ips"),
- ],
- [
- "allowed_address_pairs",
- "comma_delimited_list",
- "external",
- re.compile(r"(.+?)_ips"),
- ],
- ["fixed_ips", "string", "internal", re.compile(r"(.+?)_int_(.+?)_v6_ip_\d+")],
- ["fixed_ips", "string", "internal", re.compile(r"(.+?)_int_(.+?)_ip_\d+")],
- ["fixed_ips", "string", "external", re.compile(r"(.+?)_v6_ip_\d+")],
- ["fixed_ips", "string", "external", re.compile(r"(.+?)_ip_\d+")],
- [
- "fixed_ips",
- "comma_delimited_list",
- "internal",
- re.compile(r"(.+?)_int_(.+?)_v6_ips"),
- ],
- [
- "fixed_ips",
- "comma_delimited_list",
- "internal",
- re.compile(r"(.+?)_int_(.+?)_ips"),
- ],
- ["fixed_ips", "comma_delimited_list", "external", re.compile(r"(.+?)_v6_ips")],
- ["fixed_ips", "comma_delimited_list", "external", re.compile(r"(.+?)_ips")],
- ]
+ ["allowed_address_pairs", "string", "internal",
+ re.compile(r'(.+?)_int_(.+?)_floating_v6_ip')],
+ ["allowed_address_pairs", "string", "internal",
+ re.compile(r'(.+?)_int_(.+?)_floating_ip')],
+ ["allowed_address_pairs", "string", "external",
+ re.compile(r'(.+?)_floating_v6_ip')],
+ ["allowed_address_pairs", "string", "external",
+ re.compile(r'(.+?)_floating_ip')],
+ ["allowed_address_pairs", "string", "internal",
+ re.compile(r'(.+?)_int_(.+?)_v6_ip_\d+')],
+ ["allowed_address_pairs", "string", "internal",
+ re.compile(r'(.+?)_int_(.+?)_ip_\d+')],
+ ["allowed_address_pairs", "string", "external",
+ re.compile(r'(.+?)_v6_ip_\d+')],
+ ["allowed_address_pairs", "string", "external",
+ re.compile(r'(.+?)_ip_\d+')],
+ ["allowed_address_pairs", "comma_delimited_list",
+ "internal", re.compile(r'(.+?)_int_(.+?)_v6_ips')],
+ ["allowed_address_pairs", "comma_delimited_list",
+ "internal", re.compile(r'(.+?)_int_(.+?)_ips')],
+ ["allowed_address_pairs", "comma_delimited_list",
+ "external", re.compile(r'(.+?)_v6_ips')],
+ ["allowed_address_pairs", "comma_delimited_list",
+ "external", re.compile(r'(.+?)_ips')],
+ ["fixed_ips", "string", "internal",
+ re.compile(r'(.+?)_int_(.+?)_v6_ip_\d+')],
+ ["fixed_ips", "string", "internal",
+ re.compile(r'(.+?)_int_(.+?)_ip_\d+')],
+ ["fixed_ips", "string", "external",
+ re.compile(r'(.+?)_v6_ip_\d+')],
+ ["fixed_ips", "string", "external",
+ re.compile(r'(.+?)_ip_\d+')],
+ ["fixed_ips", "comma_delimited_list", "internal",
+ re.compile(r'(.+?)_int_(.+?)_v6_ips')],
+ ["fixed_ips", "comma_delimited_list", "internal",
+ re.compile(r'(.+?)_int_(.+?)_ips')],
+ ["fixed_ips", "comma_delimited_list", "external",
+ re.compile(r'(.+?)_v6_ips')],
+ ["fixed_ips", "comma_delimited_list", "external",
+ re.compile(r'(.+?)_ips')]]
for v3 in allowed_formats:
+ if v3[1] != parameter_type:
+ continue
if v3[0] != port_property:
continue
# check if pattern matches
m = v3[3].match(ip_address)
if m:
- if v3[2] == "internal" and len(m.groups()) > 1:
- return m.group(1) == vm_type and m.group(2) == network_role
- elif v3[2] == "external" and len(m.groups()) > 0:
+ if (v3[2] == "internal" and
+ len(m.groups()) > 1):
+ return m.group(1) == vm_type and\
+ m.group(2) == network_role
+ elif (v3[2] == "external" and
+ len(m.groups()) > 0):
return m.group(1) == vm_type + "_" + network_role
return False
-def get_invalid_ip_addresses(resources, port_property):
- """
+def get_invalid_ip_addresses(resources, port_property, parameters):
+ '''
Get a list of valid ip addresses for a heat resources section
- """
+ '''
invalid_ip_addresses = []
for k, v in resources.items():
if not isinstance(v, dict):
continue
- if "type" not in v:
+ if 'type' not in v:
continue
- if v["type"] not in "OS::Nova::Server":
+ if v['type'] not in 'OS::Nova::Server':
continue
- if "properties" not in v:
+ if 'properties' not in v:
continue
- if "networks" not in v["properties"]:
+ if 'networks' not in v['properties']:
continue
port_resource = None
@@ -171,16 +135,16 @@ def get_invalid_ip_addresses(resources, port_property):
continue
# get all ports associated with the nova server
- properties = v["properties"]
- for network in properties["networks"]:
+ properties = v['properties']
+ for network in properties['networks']:
for k3, v3 in network.items():
- if k3 != "port":
+ if k3 != 'port':
continue
if not isinstance(v3, dict):
continue
- if "get_resource" in v3:
- port_id = v3["get_resource"]
+ if 'get_resource' in v3:
+ port_id = v3['get_resource']
if not resources[port_id]:
continue
port_resource = resources[port_id]
@@ -199,15 +163,23 @@ def get_invalid_ip_addresses(resources, port_property):
continue
if "get_param" not in v2["ip_address"]:
continue
-
ip_address = v2["ip_address"]["get_param"]
if isinstance(ip_address, list):
ip_address = ip_address[0]
- valid_ip_address = is_valid_ip_address(
- ip_address, vm_type, network_role, port_property
- )
+ if ip_address not in parameters:
+ continue
+
+ parameter_type = parameters[ip_address].get("type")
+ if not parameter_type:
+ continue
+
+ valid_ip_address = is_valid_ip_address(ip_address,
+ vm_type,
+ network_role,
+ port_property,
+ parameter_type)
if not valid_ip_address:
invalid_ip_addresses.append(ip_address)
@@ -216,14 +188,15 @@ def get_invalid_ip_addresses(resources, port_property):
def is_reserved_port(port_id):
- """
+ '''
Checks to see if the resource id for a port follows
the reserve port concept
- """
+ '''
formats = [
- ["port_id", re.compile(r"reserve_port_(.+?)_floating_ip_\d+")],
- ["port_id", re.compile(r"reserve_port_(.+?)_floating_v6_ip_\d+")],
- ]
+ ["port_id",
+ re.compile(r'reserve_port_(.+?)_floating_ip_\d+')],
+ ["port_id",
+ re.compile(r'reserve_port_(.+?)_floating_v6_ip_\d+')]]
for f in formats:
m = f[1].match(port_id.lower())
if m and m.group(1):