aboutsummaryrefslogtreecommitdiffstats
path: root/ice_validator/tests/utils
diff options
context:
space:
mode:
authorstark, steven <steven.stark@att.com>2018-12-17 12:43:02 -0800
committerstark, steven <steven.stark@att.com>2018-12-17 13:04:00 -0800
commit1f4df7c7ad27b23773ad9cdbe4db1632ce388cf1 (patch)
tree8092104f8be23051ff81c9f71ee34116df4d33ba /ice_validator/tests/utils
parentca9085f0f77d442d3741a8c754e65cc45b6a318d (diff)
[VVP] updating validation scripts in dublin
- adding backlog of new validation scripts for dublin - updating existing tests - removing outdated tests Issue-ID: VVP-123 Change-Id: Ib8260889ac957c1dd28d8ede450fc8edc6fb0ec0 Signed-off-by: stark, steven <steven.stark@att.com>
Diffstat (limited to 'ice_validator/tests/utils')
-rw-r--r--ice_validator/tests/utils/nested_dict.py30
-rw-r--r--ice_validator/tests/utils/nested_files.py239
-rw-r--r--ice_validator/tests/utils/nested_iterables.py65
-rw-r--r--ice_validator/tests/utils/network_roles.py189
-rw-r--r--ice_validator/tests/utils/ports.py201
-rw-r--r--ice_validator/tests/utils/vm_types.py33
-rw-r--r--ice_validator/tests/utils/volumes.py18
-rw-r--r--ice_validator/tests/utils/yaml_custom_utils.py7
8 files changed, 472 insertions, 310 deletions
diff --git a/ice_validator/tests/utils/nested_dict.py b/ice_validator/tests/utils/nested_dict.py
index 24f7e5e..692ef5f 100644
--- a/ice_validator/tests/utils/nested_dict.py
+++ b/ice_validator/tests/utils/nested_dict.py
@@ -38,28 +38,30 @@
# ECOMP is a trademark and service mark of AT&T Intellectual Property.
#
-'''nested_dict.get
-'''
+"""nested_dict.get
+"""
-VERSION = '1.0.0'
+VERSION = "1.1.1"
-def get(dic, *keys):
- '''Return the value of the last key given a (nested) dict
- and list of keys. If any key is missing, or if the value
- of any key except the last is not a dict, then None is returned.
- '''
+def get(dic, *keys, **kwargs):
+ """Return the value of the last key given a (nested) dict and
+ list of keys. If any key is missing, or if the value of any key
+ except the last is not a dict, then the default value is returned.
+ The default value may be passed in using the keyword 'default=',
+ otherwise the default value is None.
+ """
d = dic
+ default = kwargs.get("default", None)
for key in keys:
- if hasattr(d, 'get'):
- d = d.get(key)
+ if hasattr(d, "get"):
+ d = d.get(key, default)
else:
- return None
+ return default
return d
def is_dict_has_key(obj, key):
- '''return True/False `obj` is a dict and has `key`
- '''
+ """return True/False `obj` is a dict and has `key`
+ """
return isinstance(obj, dict) and key in obj
-
diff --git a/ice_validator/tests/utils/nested_files.py b/ice_validator/tests/utils/nested_files.py
index c551646..aff5a6b 100644
--- a/ice_validator/tests/utils/nested_files.py
+++ b/ice_validator/tests/utils/nested_files.py
@@ -44,16 +44,77 @@
from os import path
import re
from tests import cached_yaml as yaml
+from tests.structures import Heat
-VERSION = '1.0.2'
+VERSION = "1.4.0"
+
+"""
+test nesting depth
+0 -> 1 -> 2 -> too deep.
+"""
+MAX_DEPTH = 3
+
+
+def check_for_invalid_nesting( # pylint: disable=too-many-branches
+ yml, yaml_file, dirpath
+):
+ """
+ return a list of all nested files
+ """
+ if not hasattr(yml, "items"):
+ return []
+ invalid_nesting = []
+ p = re.compile("^[A-z]*::[A-z]*::[A-z]*$")
+
+ for v in yml.values():
+ if isinstance(v, dict) and "type" in v:
+ t = v["type"]
+ if t.endswith(".yml") or t.endswith(".yaml"):
+ filepath = path.join(dirpath, t)
+ elif t == "OS::Heat::ResourceGroup":
+ rd = v["properties"]["resource_def"]
+ if not isinstance(rd, dict) or "type" not in rd:
+ invalid_nesting.append(yaml_file)
+ continue
+ elif not p.match(rd["type"]):
+ filepath = path.join(dirpath, rd["type"])
+ else:
+ continue
+ else:
+ continue
+ try:
+ with open(filepath) as fh:
+ yml = yaml.load(fh)
+ except yaml.YAMLError as e:
+ invalid_nesting.append(filepath)
+ print(e) # pylint: disable=superfluous-parens
+ invalid_nesting.extend(check_for_invalid_nesting(yml, filepath, dirpath))
+ if isinstance(v, dict):
+ invalid_nesting.extend(check_for_invalid_nesting(v, yaml_file, dirpath))
+ elif isinstance(v, list):
+ for d in v:
+ invalid_nesting.extend(check_for_invalid_nesting(d, yaml_file, dirpath))
+ return invalid_nesting
+
+
+def get_dict_of_nested_files(yml, dirpath):
+ """Return dict.
+ key: resource id in yml which references a nested file.
+ value: the nested file name.
+ Nested files are either referenced through "type", or
+ for OS::Heat::ResourceGroup, through "resource_def type".
+ """
+ nested_files = get_type_nested_files(yml, dirpath)
+ nested_files.update(get_resourcegroup_nested_files(yml, dirpath))
+ return nested_files
def get_list_of_nested_files(yml, dirpath):
- '''
+ """
return a list of all nested files
- '''
+ """
- if not hasattr(yml, 'items'):
+ if not hasattr(yml, "items"):
return []
nested_files = []
@@ -69,72 +130,132 @@ def get_list_of_nested_files(yml, dirpath):
nested_files.append(filepath)
nested_files.extend(get_list_of_nested_files(t_yml, dirpath))
elif t == "OS::Heat::ResourceGroup":
- rdt = (v.get("properties", {})
- .get("resource_def", {})
- .get("type", None))
+ rdt = v.get("properties", {}).get("resource_def", {}).get("type", None)
if rdt and (rdt.endswith(".yml") or rdt.endswith(".yaml")):
filepath = path.join(dirpath, rdt)
if path.exists(filepath):
with open(filepath) as fh:
rdt_yml = yaml.load(fh)
nested_files.append(filepath)
- nested_files.extend(
- get_list_of_nested_files(rdt_yml, dirpath))
+ nested_files.extend(get_list_of_nested_files(rdt_yml, dirpath))
if isinstance(v, dict):
- nested_files.extend(
- get_list_of_nested_files(v, dirpath))
+ nested_files.extend(get_list_of_nested_files(v, dirpath))
elif isinstance(v, list):
for d in v:
- nested_files.extend(
- get_list_of_nested_files(d, dirpath))
+ nested_files.extend(get_list_of_nested_files(d, dirpath))
return nested_files
-def check_for_invalid_nesting(yml, yaml_file, dirpath):
- '''
- return a list of all nested files
- '''
- if not hasattr(yml, 'items'):
- return []
- invalid_nesting = []
- p = re.compile('^[A-z]*::[A-z]*::[A-z]*$')
+def get_nesting(yaml_files):
+ """return bad, files, heat, depths
+ bad - list of error messages.
+ files - dict: key is filename, value is dict of nested files.
+ This is the tree.
+ heat - dict,: key is filename, value is Heat instance.
+ depths - dict: key is filename, value is a depth tuple
- for v in yml.values():
- if isinstance(v, dict) and "type" in v:
- t = v["type"]
- if t.endswith(".yml") or t.endswith(".yaml"):
- filepath = path.join(dirpath, t)
- elif t == "OS::Heat::ResourceGroup":
- rd = v["properties"]["resource_def"]
- if not isinstance(rd, dict) or "type" not in rd:
- invalid_nesting.append(yaml_file)
- continue
- elif not p.match(rd["type"]):
- filepath = path.join(dirpath, rd["type"])
- else:
- continue
- else:
- continue
- try:
- with open(filepath) as fh:
- yml = yaml.load(fh)
- except yaml.YAMLError as e:
- invalid_nesting.append(filepath)
- print(e) # pylint: disable=superfluous-parens
- invalid_nesting.extend(check_for_invalid_nesting(
- yml,
- filepath,
- dirpath))
- if isinstance(v, dict):
- invalid_nesting.extend(check_for_invalid_nesting(
- v,
- yaml_file,
- dirpath))
- elif isinstance(v, list):
- for d in v:
- invalid_nesting.extend(check_for_invalid_nesting(
- d,
- yaml_file,
- dirpath))
- return invalid_nesting
+ level: 0 1 2 3
+ file: template -> nested -> nested -> nested
+ depth: 3 2 1 0
+ """
+ bad = []
+ files = {}
+ heat = {}
+ depths = {}
+ for yaml_file in yaml_files:
+ dirname, basename = path.split(yaml_file)
+ h = Heat(filepath=yaml_file)
+ heat[basename] = h
+ files[basename] = get_dict_of_nested_files(h.yml, dirname)
+ for filename in files:
+ depths[filename] = _get_nesting_depth_start(0, filename, files, [])
+ for depth in depths[filename]:
+ if depth[0] > MAX_DEPTH:
+ bad.append("{} {}".format(filename, str(depth[1])))
+ return bad, files, heat, depths
+
+
+def _get_nesting_depth_start(depth, filename, files, context):
+ depths = []
+ for rid, nf in files[filename].items():
+ depths.append(_get_nesting_depth(1, nf, files, context))
+ return depths
+
+
+def _get_nesting_depth(depth, filename, files, context):
+ """Return a depth tuple (max_depth, current_context).
+ `context` is the list of filenames.
+ `depth` is the length of `context`.
+ Finds the max_depth of all the resources of `filename`.
+ current_context is the updated list of filenames
+ and max_depth is its length.
+ """
+ max_depth = depth + 1
+ current_context = context + [filename]
+ if depth <= MAX_DEPTH:
+ nested_filenames = files.get(filename, {})
+ if nested_filenames:
+ max_depth, current_context = max(
+ _get_nesting_depth(depth + 1, nested_filename, files, current_context)
+ for nested_filename in nested_filenames.values()
+ )
+ return max_depth, current_context
+
+def get_resourcegroup_nested_files(yml, dirpath):
+ """
+ return a dict.
+ key: key in yml which references a nested ResourceGroup file.
+ (resource->type is ResourceGroup
+ and resource->properties->resource_def->type is a yaml file)
+ value: the nested file name.
+
+ The keys are assumed to be unique across files.
+ A separate test checks for that.
+ """
+
+ if not hasattr(yml, "get"):
+ return {}
+
+ nested_files = {}
+ for rid, r in yml.get("resources", {}).items():
+ if isinstance(r, dict) and "type" in r:
+ t = r["type"]
+ nested_file = None
+ if t == "OS::Heat::ResourceGroup":
+ rdt = r.get("properties", {}).get("resource_def", {}).get("type", None)
+ if rdt and (rdt.endswith(".yml") or rdt.endswith(".yaml")):
+ nested_file = rdt
+ if nested_file:
+ filepath = path.join(dirpath, nested_file)
+ if path.exists(filepath):
+ nested_files[rid] = nested_file
+ return nested_files
+
+
+def get_type_nested_files(yml, dirpath):
+ """
+ return a dict.
+ key: key in yml which references a nested type file.
+ (the resource "type" is a yaml file.)
+ value: the nested file name.
+
+ The keys are assumed to be unique across files.
+ A separate test checks for that.
+ """
+
+ if not hasattr(yml, "get"):
+ return {}
+
+ nested_files = {}
+ for rid, r in yml.get("resources", {}).items():
+ if isinstance(r, dict) and "type" in r:
+ t = r["type"]
+ nested_file = None
+ if t.endswith(".yml") or t.endswith(".yaml"):
+ nested_file = t
+ if nested_file:
+ filepath = path.join(dirpath, nested_file)
+ if path.exists(filepath):
+ nested_files[rid] = nested_file
+ return nested_files
diff --git a/ice_validator/tests/utils/nested_iterables.py b/ice_validator/tests/utils/nested_iterables.py
index b6deaba..0768339 100644
--- a/ice_validator/tests/utils/nested_iterables.py
+++ b/ice_validator/tests/utils/nested_iterables.py
@@ -40,10 +40,10 @@
def parse_nested_dict(d, key=""):
- '''
+ """
parse the nested dictionary and return values of
given key of function parameter only
- '''
+ """
nested_elements = []
for k, v in d.items():
if isinstance(v, dict):
@@ -60,29 +60,27 @@ def parse_nested_dict(d, key=""):
def find_all_get_param_in_yml(yml):
- '''
+ """
Recursively find all referenced parameters in a parsed yaml body
and return a list of parameters
- '''
- os_pseudo_parameters = ['OS::stack_name',
- 'OS::stack_id',
- 'OS::project_id']
+ """
+ os_pseudo_parameters = ["OS::stack_name", "OS::stack_id", "OS::project_id"]
- if not hasattr(yml, 'items'):
+ if not hasattr(yml, "items"):
return []
params = []
for k, v in yml.items():
- if k == 'get_param' and v not in os_pseudo_parameters:
+ if k == "get_param" and v not in os_pseudo_parameters:
if isinstance(v, list) and not isinstance(v[0], dict):
params.append(v[0])
elif not isinstance(v, dict) and isinstance(v, str):
params.append(v)
- for item in (v if isinstance(v, list) else [v]):
+ for item in v if isinstance(v, list) else [v]:
if isinstance(item, dict):
params.extend(find_all_get_param_in_yml(item))
continue
- elif k == 'list_join':
- for item in (v if isinstance(v, list) else [v]):
+ elif k == "list_join":
+ for item in v if isinstance(v, list) else [v]:
if isinstance(item, list):
for d in item:
params.extend(find_all_get_param_in_yml(d))
@@ -96,15 +94,15 @@ def find_all_get_param_in_yml(yml):
def find_all_get_resource_in_yml(yml):
- '''
+ """
Recursively find all referenced resources
in a parsed yaml body and return a list of resource ids
- '''
- if not hasattr(yml, 'items'):
+ """
+ if not hasattr(yml, "items"):
return []
resources = []
for k, v in yml.items():
- if k == 'get_resource':
+ if k == "get_resource":
if isinstance(v, list):
resources.append(v[0])
else:
@@ -119,15 +117,15 @@ def find_all_get_resource_in_yml(yml):
def find_all_get_file_in_yml(yml):
- '''
+ """
Recursively find all get_file in a parsed yaml body
and return the list of referenced files/urls
- '''
- if not hasattr(yml, 'items'):
+ """
+ if not hasattr(yml, "items"):
return []
resources = []
for k, v in yml.items():
- if k == 'get_file':
+ if k == "get_file":
if isinstance(v, list):
resources.append(v[0])
else:
@@ -142,37 +140,35 @@ def find_all_get_file_in_yml(yml):
def find_all_get_resource_in_resource(resource):
- '''
+ """
Recursively find all referenced resources
in a heat resource and return a list of resource ids
- '''
- if not hasattr(resource, 'items'):
+ """
+ if not hasattr(resource, "items"):
return []
resources = []
for k, v in resource.items():
- if k == 'get_resource':
+ if k == "get_resource":
if isinstance(v, list):
resources.append(v[0])
else:
resources.append(v)
continue
if isinstance(v, dict):
- resources.extend(
- find_all_get_resource_in_resource(v))
+ resources.extend(find_all_get_resource_in_resource(v))
elif isinstance(v, list):
for d in v:
- resources.extend(
- find_all_get_resource_in_resource(d))
+ resources.extend(find_all_get_resource_in_resource(d))
return resources
def get_associated_resources_per_resource(resources):
- '''
+ """
Recursively find all referenced resources for each resource
in a list of resource ids
- '''
- if not hasattr(resources, 'items'):
+ """
+ if not hasattr(resources, "items"):
return None
resources_dict = {}
@@ -183,8 +179,7 @@ def get_associated_resources_per_resource(resources):
get_resources = []
for k, v in res_value:
- if k == 'get_resource' and\
- isinstance(v, dict):
+ if k == "get_resource" and isinstance(v, dict):
get_resources = find_all_get_resource_in_resource(v)
# if resources found, add to dict
@@ -201,9 +196,9 @@ def get_associated_resources_per_resource(resources):
def flatten(items):
- '''
+ """
flatten items from any nested iterable
- '''
+ """
merged_list = []
for item in items:
diff --git a/ice_validator/tests/utils/network_roles.py b/ice_validator/tests/utils/network_roles.py
index bed3a5a..ffb9870 100644
--- a/ice_validator/tests/utils/network_roles.py
+++ b/ice_validator/tests/utils/network_roles.py
@@ -41,111 +41,114 @@
import re
import socket
-
-def get_network_role_from_port(resource):
- '''
- get the network role from a neutron port resource
- '''
+PARAM_FORMATS = [
+ ["network", "string", "internal", re.compile(r"int_(.+?)_net_id")],
+ ["network", "string", "internal", re.compile(r"int_(.+?)_net_name")],
+ ["network", "string", "external", re.compile(r"(.+?)_net_id")],
+ ["network", "string", "external", re.compile(r"(.+?)_net_name")],
+]
+
+RESOURCE_FORMATS = [
+ re.compile(r"int_(.+?)_network"), # OS::ContrailV2::VirtualNetwork
+ re.compile(r"int_(.+?)_RVN"), # OS::ContrailV2::VirtualNetwork
+ re.compile(r"int_(.+?)"), # OS::Neutron::Net
+]
+
+
+def get_network_role_and_type(resource):
+ """
+ Derive the network role and type (internal vs. external) from an
+ OS::Neutron::Port.
+
+ :param resource: dict of Resource attributes
+ :return: tuple of (network_role, network_type) where network_type is
+ 'internal' or 'external'. Returns (None, None) if resource
+ is not a port or the values cannot be derived.
+ """
if not isinstance(resource, dict):
- return None
- if 'type' not in resource:
- return None
- if resource['type'] != 'OS::Neutron::Port':
- return None
- if 'properties' not in resource:
- return None
-
- formats = [
- ["network", "string", "internal",
- re.compile(r'int_(.+?)_net_id')],
- ["network", "string", "internal",
- re.compile(r'int_(.+?)_net_name')],
- ["network", "string", "external",
- re.compile(r'(.+?)_net_id')],
- ["network", "string", "external",
- re.compile(r'(.+?)_net_name')]]
-
- for k1, v1 in resource["properties"].items():
- if k1 != 'network':
- continue
-
- # get the network id or name
- network = (
- v1.get('get_param') or
- v1.get('get_resource'))
- if not network:
- continue
-
- for v2 in formats:
- m = v2[3].match(network)
+ return None, None
+ if resource.get("type", "") != "OS::Neutron::Port":
+ return None, None
+
+ network_props = resource.get("properties", {}).get("network", {})
+ is_resource = "get_resource" in network_props
+ if is_resource:
+ network = network_props.get("get_resource", "")
+ else:
+ network = network_props.get("get_param", "")
+
+ if is_resource: # connecting to an network in the template
+ for format in RESOURCE_FORMATS:
+ m = format.match(network)
if m and m.group(1):
- return m.group(1)
-
- return None
+ return m.group(1), "internal"
+ else:
+ for format in PARAM_FORMATS:
+ m = format[3].match(network)
+ if m and m.group(1):
+ return m.group(1), format[2]
+ return None, None
-def get_network_roles(resources):
- network_roles = []
+def get_network_role_from_port(resource):
+ """
+ Get the network-role from a OS::Neutron::Port resource. Returns None
+ if resource is not a port or the network-role cannot be derived
+ """
+ return get_network_role_and_type(resource)[0]
+
+
+def get_network_roles(resources, of_type=""):
+ """
+ Returns the network roles derived from the OS::Neutron::Port resources
+ in the collection of ``resources``. If ``of_type`` is not specified
+ then all network roles will be returned, or ``external`` or ``internal``
+ can be passed to select only those network roles
+
+ :param resources: collection of resource attributes (dict)
+ :param of_type: "internal" or "external"
+ :return: set of network roles discovered
+ """
+ valid_of_type = ("", "external", "internal")
+ if of_type not in ("", "external", "internal"):
+ raise RuntimeError("of_type must one of " + ", ".join(valid_of_type))
+ network_roles = set()
for v in resources.values():
- nr = get_network_role_from_port(v)
- if nr:
- network_roles.append(nr)
-
- return set(network_roles)
+ nr, nt = get_network_role_and_type(v)
+ if not nr:
+ continue
+ if not of_type:
+ network_roles.add(nr)
+ elif of_type and of_type == nt:
+ network_roles.add(nr)
+ return network_roles
def get_network_type_from_port(resource):
- '''
- get the network type from a neutron port resource
- '''
- if not isinstance(resource, dict):
- return None
- if 'type' not in resource:
- return None
- if resource['type'] != 'OS::Neutron::Port':
- return None
- if 'properties' not in resource:
- return None
-
- formats = [
- ["network", "string", "internal",
- re.compile(r'int_(.+?)_net_id')],
- ["network", "string", "internal",
- re.compile(r'int_(.+?)_net_name')],
- ["network", "string", "external",
- re.compile(r'(.+?)_net_id')],
- ["network", "string", "external",
- re.compile(r'(.+?)_net_name')]]
-
- for k1, v1 in resource["properties"].items():
- if k1 != 'network':
- continue
- if "get_param" not in v1:
- continue
- for v2 in formats:
- m = v2[3].match(v1["get_param"])
- if m and m.group(1):
- return v2[2]
-
- return None
+ """
+ Get the network-type (internal or external) from an OS::Neutron::Port
+ resource. Returns None if the resource is not a port or the type
+ cannot be derived.
+ """
+ return get_network_role_and_type(resource)[1]
-def is_valid_ip_address(ip_address, ip_type='ipv4'):
- '''
+def is_valid_ip_address(ip_address, ip_type="ipv4"):
+ """
check if an ip address is valid
- '''
- if ip_type == 'ipv4':
+ """
+ if ip_type == "ipv4":
return is_valid_ipv4_address(ip_address)
- elif ip_type == 'ipv6':
+ elif ip_type == "ipv6":
return is_valid_ipv6_address(ip_address)
return False
def is_valid_ipv4_address(ip_address):
- '''
+ """
check if an ip address of the type ipv4
is valid
- '''
+ """
try:
socket.inet_pton(socket.AF_INET, ip_address)
except AttributeError:
@@ -153,17 +156,17 @@ def is_valid_ipv4_address(ip_address):
socket.inet_aton(ip_address)
except (OSError, socket.error):
return False
- return ip_address.count('.') == 3
+ return ip_address.count(".") == 3
except (OSError, socket.error):
return False
return True
def is_valid_ipv6_address(ip_address):
- '''
+ """
check if an ip address of the type ipv6
is valid
- '''
+ """
try:
socket.inet_pton(socket.AF_INET6, ip_address)
except (OSError, socket.error):
@@ -172,17 +175,17 @@ def is_valid_ipv6_address(ip_address):
def property_uses_get_resource(resource, property_name):
- '''
+ """
returns true if a port's network property
uses the get_resource function
- '''
+ """
if not isinstance(resource, dict):
return False
- if 'properties' not in resource:
+ if "properties" not in resource:
return False
for k1, v1 in resource["properties"].items():
if k1 != property_name:
continue
- if "get_resource" in v1:
+ if isinstance(v1, dict) and "get_resource" in v1:
return True
return False
diff --git a/ice_validator/tests/utils/ports.py b/ice_validator/tests/utils/ports.py
index a2ae8a9..4029d3c 100644
--- a/ice_validator/tests/utils/ports.py
+++ b/ice_validator/tests/utils/ports.py
@@ -43,53 +43,96 @@ from .vm_types import get_vm_type_for_nova_server
import re
-def is_valid_ip_address(ip_address, vm_type, network_role, port_property, parameter_type):
- '''
+def is_valid_ip_address(
+ ip_address, vm_type, network_role, port_property, parameter_type
+):
+ """
Check the ip_address to make sure it is properly formatted and
also contains {vm_type} and {network_role}
- '''
+ """
allowed_formats = [
- ["allowed_address_pairs", "string", "internal",
- re.compile(r'(.+?)_int_(.+?)_floating_v6_ip')],
- ["allowed_address_pairs", "string", "internal",
- re.compile(r'(.+?)_int_(.+?)_floating_ip')],
- ["allowed_address_pairs", "string", "external",
- re.compile(r'(.+?)_floating_v6_ip')],
- ["allowed_address_pairs", "string", "external",
- re.compile(r'(.+?)_floating_ip')],
- ["allowed_address_pairs", "string", "internal",
- re.compile(r'(.+?)_int_(.+?)_v6_ip_\d+')],
- ["allowed_address_pairs", "string", "internal",
- re.compile(r'(.+?)_int_(.+?)_ip_\d+')],
- ["allowed_address_pairs", "string", "external",
- re.compile(r'(.+?)_v6_ip_\d+')],
- ["allowed_address_pairs", "string", "external",
- re.compile(r'(.+?)_ip_\d+')],
- ["allowed_address_pairs", "comma_delimited_list",
- "internal", re.compile(r'(.+?)_int_(.+?)_v6_ips')],
- ["allowed_address_pairs", "comma_delimited_list",
- "internal", re.compile(r'(.+?)_int_(.+?)_ips')],
- ["allowed_address_pairs", "comma_delimited_list",
- "external", re.compile(r'(.+?)_v6_ips')],
- ["allowed_address_pairs", "comma_delimited_list",
- "external", re.compile(r'(.+?)_ips')],
- ["fixed_ips", "string", "internal",
- re.compile(r'(.+?)_int_(.+?)_v6_ip_\d+')],
- ["fixed_ips", "string", "internal",
- re.compile(r'(.+?)_int_(.+?)_ip_\d+')],
- ["fixed_ips", "string", "external",
- re.compile(r'(.+?)_v6_ip_\d+')],
- ["fixed_ips", "string", "external",
- re.compile(r'(.+?)_ip_\d+')],
- ["fixed_ips", "comma_delimited_list", "internal",
- re.compile(r'(.+?)_int_(.+?)_v6_ips')],
- ["fixed_ips", "comma_delimited_list", "internal",
- re.compile(r'(.+?)_int_(.+?)_ips')],
- ["fixed_ips", "comma_delimited_list", "external",
- re.compile(r'(.+?)_v6_ips')],
- ["fixed_ips", "comma_delimited_list", "external",
- re.compile(r'(.+?)_ips')]]
+ [
+ "allowed_address_pairs",
+ "string",
+ "internal",
+ re.compile(r"(.+?)_int_(.+?)_floating_v6_ip"),
+ ],
+ [
+ "allowed_address_pairs",
+ "string",
+ "internal",
+ re.compile(r"(.+?)_int_(.+?)_floating_ip"),
+ ],
+ [
+ "allowed_address_pairs",
+ "string",
+ "external",
+ re.compile(r"(.+?)_floating_v6_ip"),
+ ],
+ [
+ "allowed_address_pairs",
+ "string",
+ "external",
+ re.compile(r"(.+?)_floating_ip"),
+ ],
+ [
+ "allowed_address_pairs",
+ "string",
+ "internal",
+ re.compile(r"(.+?)_int_(.+?)_v6_ip_\d+"),
+ ],
+ [
+ "allowed_address_pairs",
+ "string",
+ "internal",
+ re.compile(r"(.+?)_int_(.+?)_ip_\d+"),
+ ],
+ ["allowed_address_pairs", "string", "external", re.compile(r"(.+?)_v6_ip_\d+")],
+ ["allowed_address_pairs", "string", "external", re.compile(r"(.+?)_ip_\d+")],
+ [
+ "allowed_address_pairs",
+ "comma_delimited_list",
+ "internal",
+ re.compile(r"(.+?)_int_(.+?)_v6_ips"),
+ ],
+ [
+ "allowed_address_pairs",
+ "comma_delimited_list",
+ "internal",
+ re.compile(r"(.+?)_int_(.+?)_ips"),
+ ],
+ [
+ "allowed_address_pairs",
+ "comma_delimited_list",
+ "external",
+ re.compile(r"(.+?)_v6_ips"),
+ ],
+ [
+ "allowed_address_pairs",
+ "comma_delimited_list",
+ "external",
+ re.compile(r"(.+?)_ips"),
+ ],
+ ["fixed_ips", "string", "internal", re.compile(r"(.+?)_int_(.+?)_v6_ip_\d+")],
+ ["fixed_ips", "string", "internal", re.compile(r"(.+?)_int_(.+?)_ip_\d+")],
+ ["fixed_ips", "string", "external", re.compile(r"(.+?)_v6_ip_\d+")],
+ ["fixed_ips", "string", "external", re.compile(r"(.+?)_ip_\d+")],
+ [
+ "fixed_ips",
+ "comma_delimited_list",
+ "internal",
+ re.compile(r"(.+?)_int_(.+?)_v6_ips"),
+ ],
+ [
+ "fixed_ips",
+ "comma_delimited_list",
+ "internal",
+ re.compile(r"(.+?)_int_(.+?)_ips"),
+ ],
+ ["fixed_ips", "comma_delimited_list", "external", re.compile(r"(.+?)_v6_ips")],
+ ["fixed_ips", "comma_delimited_list", "external", re.compile(r"(.+?)_ips")],
+ ]
for v3 in allowed_formats:
if v3[1] != parameter_type:
@@ -99,33 +142,30 @@ def is_valid_ip_address(ip_address, vm_type, network_role, port_property, parame
# check if pattern matches
m = v3[3].match(ip_address)
if m:
- if (v3[2] == "internal" and
- len(m.groups()) > 1):
- return m.group(1) == vm_type and\
- m.group(2) == network_role
- elif (v3[2] == "external" and
- len(m.groups()) > 0):
+ if v3[2] == "internal" and len(m.groups()) > 1:
+ return m.group(1) == vm_type and m.group(2) == network_role
+ elif v3[2] == "external" and len(m.groups()) > 0:
return m.group(1) == vm_type + "_" + network_role
return False
def get_invalid_ip_addresses(resources, port_property, parameters):
- '''
+ """
Get a list of valid ip addresses for a heat resources section
- '''
+ """
invalid_ip_addresses = []
for k, v in resources.items():
if not isinstance(v, dict):
continue
- if 'type' not in v:
+ if "type" not in v:
continue
- if v['type'] not in 'OS::Nova::Server':
+ if v["type"] not in "OS::Nova::Server":
continue
- if 'properties' not in v:
+ if "properties" not in v:
continue
- if 'networks' not in v['properties']:
+ if "networks" not in v["properties"]:
continue
port_resource = None
@@ -135,16 +175,16 @@ def get_invalid_ip_addresses(resources, port_property, parameters):
continue
# get all ports associated with the nova server
- properties = v['properties']
- for network in properties['networks']:
+ properties = v["properties"]
+ for network in properties["networks"]:
for k3, v3 in network.items():
- if k3 != 'port':
+ if k3 != "port":
continue
if not isinstance(v3, dict):
continue
- if 'get_resource' in v3:
- port_id = v3['get_resource']
+ if "get_resource" in v3:
+ port_id = v3["get_resource"]
if not resources[port_id]:
continue
port_resource = resources[port_id]
@@ -175,11 +215,13 @@ def get_invalid_ip_addresses(resources, port_property, parameters):
if not parameter_type:
continue
- valid_ip_address = is_valid_ip_address(ip_address,
- vm_type,
- network_role,
- port_property,
- parameter_type)
+ valid_ip_address = is_valid_ip_address(
+ ip_address,
+ vm_type,
+ network_role,
+ port_property,
+ parameter_type,
+ )
if not valid_ip_address:
invalid_ip_addresses.append(ip_address)
@@ -187,18 +229,17 @@ def get_invalid_ip_addresses(resources, port_property, parameters):
return invalid_ip_addresses
-def is_reserved_port(port_id):
- '''
- Checks to see if the resource id for a port follows
- the reserve port concept
- '''
- formats = [
- ["port_id",
- re.compile(r'reserve_port_(.+?)_floating_ip_\d+')],
- ["port_id",
- re.compile(r'reserve_port_(.+?)_floating_v6_ip_\d+')]]
- for f in formats:
- m = f[1].match(port_id.lower())
- if m and m.group(1):
- return True
- return False
+def get_list_of_ports_attached_to_nova_server(nova_server):
+ networks_list = nova_server.get("properties", {}).get("networks")
+
+ port_ids = []
+ if networks_list:
+ for network in networks_list:
+ network_prop = network.get("port")
+ if network_prop:
+ pid = network_prop.get("get_param")
+ if not pid:
+ pid = network_prop.get("get_resource")
+ port_ids.append(pid)
+
+ return port_ids
diff --git a/ice_validator/tests/utils/vm_types.py b/ice_validator/tests/utils/vm_types.py
index 6802666..78006b9 100644
--- a/ice_validator/tests/utils/vm_types.py
+++ b/ice_validator/tests/utils/vm_types.py
@@ -42,36 +42,33 @@ import re
def get_vm_types_for_resource(resource):
- '''
+ """
Get all unique vm_types for a resource
Notes:
- Returns set([]) if the resource is not formatted
properly, the passed resource is not a nova server
- If more than one vm_type is detected all vm_types will
be returned
- '''
+ """
if not isinstance(resource, dict):
return set()
- if 'type' not in resource:
+ if "type" not in resource:
return set()
- if resource['type'] != 'OS::Nova::Server':
+ if resource["type"] != "OS::Nova::Server":
return set()
- if 'properties' not in resource:
+ if "properties" not in resource:
return set()
key_values = ["name", "flavor", "image"]
key_value_formats = [
- ["name", "string",
- re.compile(r'(.+?)_name_\d+')],
- ["name", "comma_delimited_list",
- re.compile(r'(.+?)_names')],
- ["flavor", "string",
- re.compile(r'(.+?)_flavor_name')],
- ["image", "string",
- re.compile(r'(.+?)_image_name')]]
+ ["name", "string", re.compile(r"(.+?)_name_\d+")],
+ ["name", "comma_delimited_list", re.compile(r"(.+?)_names")],
+ ["flavor", "string", re.compile(r"(.+?)_flavor_name")],
+ ["image", "string", re.compile(r"(.+?)_image_name")],
+ ]
vm_types = []
- for k2, v2 in resource['properties'].items():
+ for k2, v2 in resource["properties"].items():
if k2 not in key_values:
continue
if "get_param" not in v2:
@@ -89,12 +86,12 @@ def get_vm_types_for_resource(resource):
def get_vm_type_for_nova_server(resource):
- '''
+ """
Get the vm_type for a resource
Note: Returns None if not exactly one vm_type
is detected, if the resource is not formatted properly, or
the passed resource is not a nova server
- '''
+ """
vm_types = get_vm_types_for_resource(resource)
# if more than one vm_type was identified, return None
@@ -105,10 +102,10 @@ def get_vm_type_for_nova_server(resource):
def get_vm_types(resources):
- '''
+ """
Get all vm_types for a list of heat resources, do note that
some of the values retrieved may be invalid
- '''
+ """
vm_types = []
for v in resources.values():
vm_types.extend(list(get_vm_types_for_resource(v)))
diff --git a/ice_validator/tests/utils/volumes.py b/ice_validator/tests/utils/volumes.py
index 40731bf..6f52dd8 100644
--- a/ice_validator/tests/utils/volumes.py
+++ b/ice_validator/tests/utils/volumes.py
@@ -44,19 +44,19 @@
from os import path
from tests import cached_yaml as yaml
-VERSION = '1.0.0'
+VERSION = "1.0.0"
def get_volume_resources(heat_template):
- '''
+ """
get the resources from the volume template
Note: Returns an empty dict if there is no
volume template or for any other error
- '''
+ """
basename = path.splitext(heat_template)[0]
- for ext in ['.yaml', '.yml']:
- volume_template = basename + '_volume' + ext
+ for ext in [".yaml", ".yml"]:
+ volume_template = basename + "_volume" + ext
if path.isfile(volume_template):
break
else:
@@ -66,12 +66,12 @@ def get_volume_resources(heat_template):
with open(volume_template) as fh:
yml = yaml.load(fh)
except yaml.YAMLError as e:
- print(e) # pylint: disable=superfluous-parens
+ print(e) # pylint: disable=superfluous-parens
return {}
- if 'outputs' not in yml:
+ if "outputs" not in yml:
return {}
- if 'resources' not in yml:
+ if "resources" not in yml:
return {}
- return yml['resources']
+ return yml["resources"]
diff --git a/ice_validator/tests/utils/yaml_custom_utils.py b/ice_validator/tests/utils/yaml_custom_utils.py
index 8a5006c..1e4b0e5 100644
--- a/ice_validator/tests/utils/yaml_custom_utils.py
+++ b/ice_validator/tests/utils/yaml_custom_utils.py
@@ -50,8 +50,11 @@ def raise_duplicates_keys(loader, node, deep=False):
value = loader.construct_object(value_node, deep=deep)
if key in mapping:
raise ConstructorError(
- "while constructing a mapping", node.start_mark,
- "found duplicate key (%s)" % key, key_node.start_mark)
+ "while constructing a mapping",
+ node.start_mark,
+ "found duplicate key (%s)" % key,
+ key_node.start_mark,
+ )
mapping[key] = value
return loader.construct_mapping(node, deep)