aboutsummaryrefslogtreecommitdiffstats
path: root/ice_validator/tests/utils/ports.py
diff options
context:
space:
mode:
Diffstat (limited to 'ice_validator/tests/utils/ports.py')
-rw-r--r--ice_validator/tests/utils/ports.py289
1 files changed, 105 insertions, 184 deletions
diff --git a/ice_validator/tests/utils/ports.py b/ice_validator/tests/utils/ports.py
index 89440eb..c005e32 100644
--- a/ice_validator/tests/utils/ports.py
+++ b/ice_validator/tests/utils/ports.py
@@ -36,199 +36,120 @@
# ============LICENSE_END============================================
#
#
-
from .network_roles import get_network_role_and_type
-from .vm_types import get_vm_type_for_nova_server
-import re
-
-
-def is_valid_ip_address(
- ip_address, vm_type, network_role, port_property, parameter_type, network_type
-):
- """
- Check the ip_address to make sure it is properly formatted and
- also contains {vm_type} and {network_role}
- """
-
- allowed_formats = [
- [
- "allowed_address_pairs",
- "string",
- "internal",
- re.compile(r"(.+?)_int_(.+?)_floating_v6_ip"),
- ],
- [
- "allowed_address_pairs",
- "string",
- "internal",
- re.compile(r"(.+?)_int_(.+?)_floating_ip"),
- ],
- [
- "allowed_address_pairs",
- "string",
- "external",
- re.compile(r"(.+?)_floating_v6_ip"),
- ],
- [
- "allowed_address_pairs",
- "string",
- "external",
- re.compile(r"(.+?)_floating_ip"),
- ],
- [
- "allowed_address_pairs",
- "string",
- "internal",
- re.compile(r"(.+?)_int_(.+?)_v6_ip_\d+"),
- ],
- [
- "allowed_address_pairs",
- "string",
- "internal",
- re.compile(r"(.+?)_int_(.+?)_ip_\d+"),
- ],
- ["allowed_address_pairs", "string", "external", re.compile(r"(.+?)_v6_ip_\d+")],
- ["allowed_address_pairs", "string", "external", re.compile(r"(.+?)_ip_\d+")],
- [
- "allowed_address_pairs",
- "comma_delimited_list",
- "internal",
- re.compile(r"(.+?)_int_(.+?)_v6_ips"),
- ],
- [
- "allowed_address_pairs",
- "comma_delimited_list",
- "internal",
- re.compile(r"(.+?)_int_(.+?)_ips"),
- ],
- [
- "allowed_address_pairs",
- "comma_delimited_list",
- "external",
- re.compile(r"(.+?)_v6_ips"),
- ],
- [
- "allowed_address_pairs",
- "comma_delimited_list",
- "external",
- re.compile(r"(.+?)_ips"),
- ],
- ["fixed_ips", "string", "internal", re.compile(r"(.+?)_int_(.+?)_v6_ip_\d+")],
- ["fixed_ips", "string", "internal", re.compile(r"(.+?)_int_(.+?)_ip_\d+")],
- ["fixed_ips", "string", "external", re.compile(r"(.+?)_v6_ip_\d+")],
- ["fixed_ips", "string", "external", re.compile(r"(.+?)_ip_\d+")],
- [
- "fixed_ips",
- "comma_delimited_list",
- "internal",
- re.compile(r"(.+?)_int_(.+?)_v6_ips"),
- ],
- [
- "fixed_ips",
- "comma_delimited_list",
- "internal",
- re.compile(r"(.+?)_int_(.+?)_ips"),
- ],
- ["fixed_ips", "comma_delimited_list", "external", re.compile(r"(.+?)_v6_ips")],
- ["fixed_ips", "comma_delimited_list", "external", re.compile(r"(.+?)_ips")],
- ]
-
- for v3 in allowed_formats:
- if v3[1] != parameter_type:
- continue
- if v3[0] != port_property:
- continue
- if v3[2] != network_type:
- continue
- # check if pattern matches
- m = v3[3].match(ip_address)
- if m:
- if v3[2] == "internal" and len(m.groups()) > 1:
- return m.group(1) == vm_type and m.group(2) == network_role
- elif v3[2] == "external" and len(m.groups()) > 0:
- return m.group(1) == vm_type + "_" + network_role
+from tests.structures import Heat, NeutronPortProcessor
+from tests.helpers import parameter_type_to_heat_type
+from . import nested_dict
- return False
-
-def get_invalid_ip_addresses(resources, port_property, parameters):
+def check_ip_format(yaml_file, regx, port_type, resource_property, nested_property):
"""
- Get a list of valid ip addresses for a heat resources section
+ yaml_file: input file to check
+ regx: dictionary containing the regex to use to validate parameter
+ port_type: internal or external
+ resource_property: OS::Neutron::Port property to check for parameter
+ nested_property: resource_property will be a list of dicts, this is the key to index into
"""
- invalid_ip_addresses = []
-
- for k, v in resources.items():
- if not isinstance(v, dict):
- continue
- if "type" not in v:
+ invalid_ips = []
+ heat = Heat(filepath=yaml_file)
+ ports = heat.get_resource_by_type("OS::Neutron::Port")
+ heat_parameters = heat.parameters
+
+ for rid, resource in ports.items():
+ network_role, network_type = get_network_role_and_type(resource)
+ if (
+ network_type != port_type
+ ): # skipping if port type (internal/external) doesn't match
continue
- if v["type"] not in "OS::Nova::Server":
- continue
- if "properties" not in v:
- continue
- if "networks" not in v["properties"]:
- continue
-
- port_resource = None
-
- vm_type = get_vm_type_for_nova_server(v)
- if not vm_type:
- continue
-
- # get all ports associated with the nova server
- properties = v["properties"]
- for network in properties["networks"]:
- for k3, v3 in network.items():
- if k3 != "port":
- continue
- if not isinstance(v3, dict):
- continue
-
- if "get_resource" in v3:
- port_id = v3["get_resource"]
- if not resources[port_id]:
- continue
- port_resource = resources[port_id]
- else:
- continue
- network_role, network_type = get_network_role_and_type(port_resource)
- if not network_role or not network_type:
+ name, port_match = NeutronPortProcessor.get_rid_match_tuple(rid)
+ if not port_match:
+ continue # port resource ID not formatted correctely
+
+ params = nested_dict.get(resource, "properties", resource_property, default={})
+
+ for param in params:
+ prop = nested_dict.get(param, nested_property)
+ if (
+ not prop
+ or not isinstance(prop, dict)
+ or "get_resource" in prop
+ or "get_attr" in prop
+ # or "str_replace" in prop - should str_replace be checked?
+ ):
+ continue # lets only check parameters shall we?
+
+ # checking parameter uses get_param
+ parameter = nested_dict.get(prop, "get_param")
+ if not parameter:
+ msg = (
+ "Unexpected parameter format for OS::Neutron::Port {} property {}: {}. "
+ + "Please consult the heat guidelines documentation for details."
+ ).format(rid, resource_property, prop)
+ invalid_ips.append(msg) # should this be a failure?
+ continue
+
+ # getting parameter if the get_param uses list, and getting official HEAT parameter type
+ parameter_type = parameter_type_to_heat_type(parameter)
+ if parameter_type == "comma_delimited_list":
+ parameter = parameter[0]
+ elif parameter_type != "string":
+ continue
+
+ # checking parameter format = type defined in template
+ heat_parameter_type = nested_dict.get(heat_parameters, parameter, "type")
+ if not heat_parameter_type or heat_parameter_type != parameter_type:
+ msg = (
+ "OS::Neutron::Port {} parameter {} defined as type {} "
+ + "is being used as type {} in the heat template"
+ ).format(
+ resource_property, parameter, heat_parameter_type, parameter_type
+ )
+ invalid_ips.append(msg)
+ continue
+
+ # if parameter type is not in regx dict, then it is not supported by automation
+ regx_dict = regx[port_type].get(parameter_type)
+ if not regx_dict:
+ msg = (
+ "WARNING: OS::Neutron::Port {} parameter {} defined as type {} "
+ + "is not supported by platform automation. If this VNF is not able "
+ + "to adhere to this requirement, please consult the Heat Orchestration "
+ + "Template guidelines for alternative solutions. If already adhering to "
+ + "an alternative provided by the heat guidelines, please disregard this "
+ + "message."
+ ).format(resource_property, parameter, parameter_type)
+ invalid_ips.append(msg)
+ continue
+
+ # checking if param adheres to guidelines format
+ regexp = regx[port_type][parameter_type]["machine"]
+ readable_format = regx[port_type][parameter_type]["readable"]
+ match = regexp.match(parameter)
+ if not match:
+ msg = "{} parameter {} does not follow format {}".format(
+ resource_property, parameter, readable_format
+ )
+ invalid_ips.append(msg)
+ continue
+
+ # checking that parameter includes correct vm_type/network_role
+ parameter_checks = regx.get("parameter_to_resource_comparisons", [])
+ for check in parameter_checks:
+ resource_match = port_match.group(check)
+ if (
+ resource_match
+ and not parameter.startswith(resource_match)
+ and parameter.find("_{}_".format(resource_match)) == -1
+ ):
+ msg = (
+ "OS::Neutron::Port {0} property {1} parameter "
+ + "{2} {3} does match resource {3} {4}"
+ ).format(rid, resource_property, parameter, check, resource_match)
+ invalid_ips.append(msg)
continue
- for k1, v1 in port_resource["properties"].items():
- if k1 != port_property:
- continue
- for v2 in v1:
- if "ip_address" not in v2:
- continue
- if "get_param" not in v2["ip_address"]:
- continue
- ip_address = v2["ip_address"]["get_param"]
-
- if isinstance(ip_address, list):
- ip_address = ip_address[0]
-
- if ip_address not in parameters:
- continue
-
- parameter_type = parameters[ip_address].get("type")
- if not parameter_type:
- continue
-
- valid_ip_address = is_valid_ip_address(
- ip_address,
- vm_type,
- network_role,
- port_property,
- parameter_type,
- network_type,
- )
-
- if not valid_ip_address:
- invalid_ip_addresses.append(ip_address)
-
- return invalid_ip_addresses
+ assert not invalid_ips, "%s" % "\n".join(invalid_ips)
def get_list_of_ports_attached_to_nova_server(nova_server):