aboutsummaryrefslogtreecommitdiffstats
path: root/ice_validator/tests/utils/ports.py
diff options
context:
space:
mode:
Diffstat (limited to 'ice_validator/tests/utils/ports.py')
-rw-r--r--ice_validator/tests/utils/ports.py201
1 files changed, 121 insertions, 80 deletions
diff --git a/ice_validator/tests/utils/ports.py b/ice_validator/tests/utils/ports.py
index a2ae8a9..4029d3c 100644
--- a/ice_validator/tests/utils/ports.py
+++ b/ice_validator/tests/utils/ports.py
@@ -43,53 +43,96 @@ from .vm_types import get_vm_type_for_nova_server
import re
-def is_valid_ip_address(ip_address, vm_type, network_role, port_property, parameter_type):
- '''
+def is_valid_ip_address(
+ ip_address, vm_type, network_role, port_property, parameter_type
+):
+ """
Check the ip_address to make sure it is properly formatted and
also contains {vm_type} and {network_role}
- '''
+ """
allowed_formats = [
- ["allowed_address_pairs", "string", "internal",
- re.compile(r'(.+?)_int_(.+?)_floating_v6_ip')],
- ["allowed_address_pairs", "string", "internal",
- re.compile(r'(.+?)_int_(.+?)_floating_ip')],
- ["allowed_address_pairs", "string", "external",
- re.compile(r'(.+?)_floating_v6_ip')],
- ["allowed_address_pairs", "string", "external",
- re.compile(r'(.+?)_floating_ip')],
- ["allowed_address_pairs", "string", "internal",
- re.compile(r'(.+?)_int_(.+?)_v6_ip_\d+')],
- ["allowed_address_pairs", "string", "internal",
- re.compile(r'(.+?)_int_(.+?)_ip_\d+')],
- ["allowed_address_pairs", "string", "external",
- re.compile(r'(.+?)_v6_ip_\d+')],
- ["allowed_address_pairs", "string", "external",
- re.compile(r'(.+?)_ip_\d+')],
- ["allowed_address_pairs", "comma_delimited_list",
- "internal", re.compile(r'(.+?)_int_(.+?)_v6_ips')],
- ["allowed_address_pairs", "comma_delimited_list",
- "internal", re.compile(r'(.+?)_int_(.+?)_ips')],
- ["allowed_address_pairs", "comma_delimited_list",
- "external", re.compile(r'(.+?)_v6_ips')],
- ["allowed_address_pairs", "comma_delimited_list",
- "external", re.compile(r'(.+?)_ips')],
- ["fixed_ips", "string", "internal",
- re.compile(r'(.+?)_int_(.+?)_v6_ip_\d+')],
- ["fixed_ips", "string", "internal",
- re.compile(r'(.+?)_int_(.+?)_ip_\d+')],
- ["fixed_ips", "string", "external",
- re.compile(r'(.+?)_v6_ip_\d+')],
- ["fixed_ips", "string", "external",
- re.compile(r'(.+?)_ip_\d+')],
- ["fixed_ips", "comma_delimited_list", "internal",
- re.compile(r'(.+?)_int_(.+?)_v6_ips')],
- ["fixed_ips", "comma_delimited_list", "internal",
- re.compile(r'(.+?)_int_(.+?)_ips')],
- ["fixed_ips", "comma_delimited_list", "external",
- re.compile(r'(.+?)_v6_ips')],
- ["fixed_ips", "comma_delimited_list", "external",
- re.compile(r'(.+?)_ips')]]
+ [
+ "allowed_address_pairs",
+ "string",
+ "internal",
+ re.compile(r"(.+?)_int_(.+?)_floating_v6_ip"),
+ ],
+ [
+ "allowed_address_pairs",
+ "string",
+ "internal",
+ re.compile(r"(.+?)_int_(.+?)_floating_ip"),
+ ],
+ [
+ "allowed_address_pairs",
+ "string",
+ "external",
+ re.compile(r"(.+?)_floating_v6_ip"),
+ ],
+ [
+ "allowed_address_pairs",
+ "string",
+ "external",
+ re.compile(r"(.+?)_floating_ip"),
+ ],
+ [
+ "allowed_address_pairs",
+ "string",
+ "internal",
+ re.compile(r"(.+?)_int_(.+?)_v6_ip_\d+"),
+ ],
+ [
+ "allowed_address_pairs",
+ "string",
+ "internal",
+ re.compile(r"(.+?)_int_(.+?)_ip_\d+"),
+ ],
+ ["allowed_address_pairs", "string", "external", re.compile(r"(.+?)_v6_ip_\d+")],
+ ["allowed_address_pairs", "string", "external", re.compile(r"(.+?)_ip_\d+")],
+ [
+ "allowed_address_pairs",
+ "comma_delimited_list",
+ "internal",
+ re.compile(r"(.+?)_int_(.+?)_v6_ips"),
+ ],
+ [
+ "allowed_address_pairs",
+ "comma_delimited_list",
+ "internal",
+ re.compile(r"(.+?)_int_(.+?)_ips"),
+ ],
+ [
+ "allowed_address_pairs",
+ "comma_delimited_list",
+ "external",
+ re.compile(r"(.+?)_v6_ips"),
+ ],
+ [
+ "allowed_address_pairs",
+ "comma_delimited_list",
+ "external",
+ re.compile(r"(.+?)_ips"),
+ ],
+ ["fixed_ips", "string", "internal", re.compile(r"(.+?)_int_(.+?)_v6_ip_\d+")],
+ ["fixed_ips", "string", "internal", re.compile(r"(.+?)_int_(.+?)_ip_\d+")],
+ ["fixed_ips", "string", "external", re.compile(r"(.+?)_v6_ip_\d+")],
+ ["fixed_ips", "string", "external", re.compile(r"(.+?)_ip_\d+")],
+ [
+ "fixed_ips",
+ "comma_delimited_list",
+ "internal",
+ re.compile(r"(.+?)_int_(.+?)_v6_ips"),
+ ],
+ [
+ "fixed_ips",
+ "comma_delimited_list",
+ "internal",
+ re.compile(r"(.+?)_int_(.+?)_ips"),
+ ],
+ ["fixed_ips", "comma_delimited_list", "external", re.compile(r"(.+?)_v6_ips")],
+ ["fixed_ips", "comma_delimited_list", "external", re.compile(r"(.+?)_ips")],
+ ]
for v3 in allowed_formats:
if v3[1] != parameter_type:
@@ -99,33 +142,30 @@ def is_valid_ip_address(ip_address, vm_type, network_role, port_property, parame
# check if pattern matches
m = v3[3].match(ip_address)
if m:
- if (v3[2] == "internal" and
- len(m.groups()) > 1):
- return m.group(1) == vm_type and\
- m.group(2) == network_role
- elif (v3[2] == "external" and
- len(m.groups()) > 0):
+ if v3[2] == "internal" and len(m.groups()) > 1:
+ return m.group(1) == vm_type and m.group(2) == network_role
+ elif v3[2] == "external" and len(m.groups()) > 0:
return m.group(1) == vm_type + "_" + network_role
return False
def get_invalid_ip_addresses(resources, port_property, parameters):
- '''
+ """
Get a list of valid ip addresses for a heat resources section
- '''
+ """
invalid_ip_addresses = []
for k, v in resources.items():
if not isinstance(v, dict):
continue
- if 'type' not in v:
+ if "type" not in v:
continue
- if v['type'] not in 'OS::Nova::Server':
+ if v["type"] not in "OS::Nova::Server":
continue
- if 'properties' not in v:
+ if "properties" not in v:
continue
- if 'networks' not in v['properties']:
+ if "networks" not in v["properties"]:
continue
port_resource = None
@@ -135,16 +175,16 @@ def get_invalid_ip_addresses(resources, port_property, parameters):
continue
# get all ports associated with the nova server
- properties = v['properties']
- for network in properties['networks']:
+ properties = v["properties"]
+ for network in properties["networks"]:
for k3, v3 in network.items():
- if k3 != 'port':
+ if k3 != "port":
continue
if not isinstance(v3, dict):
continue
- if 'get_resource' in v3:
- port_id = v3['get_resource']
+ if "get_resource" in v3:
+ port_id = v3["get_resource"]
if not resources[port_id]:
continue
port_resource = resources[port_id]
@@ -175,11 +215,13 @@ def get_invalid_ip_addresses(resources, port_property, parameters):
if not parameter_type:
continue
- valid_ip_address = is_valid_ip_address(ip_address,
- vm_type,
- network_role,
- port_property,
- parameter_type)
+ valid_ip_address = is_valid_ip_address(
+ ip_address,
+ vm_type,
+ network_role,
+ port_property,
+ parameter_type,
+ )
if not valid_ip_address:
invalid_ip_addresses.append(ip_address)
@@ -187,18 +229,17 @@ def get_invalid_ip_addresses(resources, port_property, parameters):
return invalid_ip_addresses
-def is_reserved_port(port_id):
- '''
- Checks to see if the resource id for a port follows
- the reserve port concept
- '''
- formats = [
- ["port_id",
- re.compile(r'reserve_port_(.+?)_floating_ip_\d+')],
- ["port_id",
- re.compile(r'reserve_port_(.+?)_floating_v6_ip_\d+')]]
- for f in formats:
- m = f[1].match(port_id.lower())
- if m and m.group(1):
- return True
- return False
+def get_list_of_ports_attached_to_nova_server(nova_server):
+ networks_list = nova_server.get("properties", {}).get("networks")
+
+ port_ids = []
+ if networks_list:
+ for network in networks_list:
+ network_prop = network.get("port")
+ if network_prop:
+ pid = network_prop.get("get_param")
+ if not pid:
+ pid = network_prop.get("get_resource")
+ port_ids.append(pid)
+
+ return port_ids