aboutsummaryrefslogtreecommitdiffstats
path: root/ice_validator/tests/utils
diff options
context:
space:
mode:
authorstark, steven <ss820f@att.com>2018-09-13 16:49:43 -0700
committerstark, steven <ss820f@att.com>2018-09-13 17:21:45 -0700
commit31d5da59b39d38760cc519a2c5e5b70357b539e8 (patch)
tree7298d118aefb1e7494afa5cbedaa5e499b78373d /ice_validator/tests/utils
parent8de26dd1cc9ed33c3ab85a5014ac949f174db932 (diff)
[VVP] udpating scripts for casablanca
adding new "infrastructure" scripts addresses bugs VVP-100, VVP-101, VVP-102 adding base tests updating tests where arguments have changed Adds traceability for task VVP-92 Change-Id: I067d8e80934403039e66fbc9fc93766587f67b4e Issue-ID: VVP-80 Signed-off-by: stark, steven <ss820f@att.com>
Diffstat (limited to 'ice_validator/tests/utils')
-rw-r--r--ice_validator/tests/utils/nested_dict.py1
-rw-r--r--ice_validator/tests/utils/nested_files.py69
-rw-r--r--ice_validator/tests/utils/network_roles.py90
-rw-r--r--ice_validator/tests/utils/ports.py189
-rw-r--r--ice_validator/tests/utils/vm_types.py33
-rw-r--r--ice_validator/tests/utils/volumes.py2
6 files changed, 197 insertions, 187 deletions
diff --git a/ice_validator/tests/utils/nested_dict.py b/ice_validator/tests/utils/nested_dict.py
index 9bc3e99..24f7e5e 100644
--- a/ice_validator/tests/utils/nested_dict.py
+++ b/ice_validator/tests/utils/nested_dict.py
@@ -62,3 +62,4 @@ def is_dict_has_key(obj, key):
'''return True/False `obj` is a dict and has `key`
'''
return isinstance(obj, dict) and key in obj
+
diff --git a/ice_validator/tests/utils/nested_files.py b/ice_validator/tests/utils/nested_files.py
index 02f733d..c551646 100644
--- a/ice_validator/tests/utils/nested_files.py
+++ b/ice_validator/tests/utils/nested_files.py
@@ -43,17 +43,17 @@
from os import path
import re
-import yaml
+from tests import cached_yaml as yaml
-VERSION = "1.0.2"
+VERSION = '1.0.2'
def get_list_of_nested_files(yml, dirpath):
- """
+ '''
return a list of all nested files
- """
+ '''
- if not hasattr(yml, "items"):
+ if not hasattr(yml, 'items'):
return []
nested_files = []
@@ -63,34 +63,41 @@ def get_list_of_nested_files(yml, dirpath):
t = v["type"]
if t.endswith(".yml") or t.endswith(".yaml"):
filepath = path.join(dirpath, t)
- with open(filepath) as fh:
- t_yml = yaml.load(fh)
- nested_files.append(filepath)
- nested_files.extend(get_list_of_nested_files(t_yml, dirpath))
+ if path.exists(filepath):
+ with open(filepath) as fh:
+ t_yml = yaml.load(fh)
+ nested_files.append(filepath)
+ nested_files.extend(get_list_of_nested_files(t_yml, dirpath))
elif t == "OS::Heat::ResourceGroup":
- rdt = v.get("properties", {}).get("resource_def", {}).get("type", None)
+ rdt = (v.get("properties", {})
+ .get("resource_def", {})
+ .get("type", None))
if rdt and (rdt.endswith(".yml") or rdt.endswith(".yaml")):
filepath = path.join(dirpath, rdt)
- with open(filepath) as fh:
- rdt_yml = yaml.load(fh)
- nested_files.append(filepath)
- nested_files.extend(get_list_of_nested_files(rdt_yml, dirpath))
+ if path.exists(filepath):
+ with open(filepath) as fh:
+ rdt_yml = yaml.load(fh)
+ nested_files.append(filepath)
+ nested_files.extend(
+ get_list_of_nested_files(rdt_yml, dirpath))
if isinstance(v, dict):
- nested_files.extend(get_list_of_nested_files(v, dirpath))
+ nested_files.extend(
+ get_list_of_nested_files(v, dirpath))
elif isinstance(v, list):
for d in v:
- nested_files.extend(get_list_of_nested_files(d, dirpath))
+ nested_files.extend(
+ get_list_of_nested_files(d, dirpath))
return nested_files
def check_for_invalid_nesting(yml, yaml_file, dirpath):
- """
+ '''
return a list of all nested files
- """
- if not hasattr(yml, "items"):
+ '''
+ if not hasattr(yml, 'items'):
return []
invalid_nesting = []
- p = re.compile("^[A-z]*::[A-z]*::[A-z]*$")
+ p = re.compile('^[A-z]*::[A-z]*::[A-z]*$')
for v in yml.values():
if isinstance(v, dict) and "type" in v:
@@ -102,9 +109,7 @@ def check_for_invalid_nesting(yml, yaml_file, dirpath):
if not isinstance(rd, dict) or "type" not in rd:
invalid_nesting.append(yaml_file)
continue
- elif not p.match(rd["type"]) and not (
- rd["type"].endswith(".yml") or rd["type"].endswith(".yaml")
- ):
+ elif not p.match(rd["type"]):
filepath = path.join(dirpath, rd["type"])
else:
continue
@@ -115,11 +120,21 @@ def check_for_invalid_nesting(yml, yaml_file, dirpath):
yml = yaml.load(fh)
except yaml.YAMLError as e:
invalid_nesting.append(filepath)
- print(e) # pylint: disable=superfluous-parens
- invalid_nesting.extend(check_for_invalid_nesting(yml, filepath, dirpath))
+ print(e) # pylint: disable=superfluous-parens
+ invalid_nesting.extend(check_for_invalid_nesting(
+ yml,
+ filepath,
+ dirpath))
if isinstance(v, dict):
- invalid_nesting.extend(check_for_invalid_nesting(v, yaml_file, dirpath))
+ invalid_nesting.extend(check_for_invalid_nesting(
+ v,
+ yaml_file,
+ dirpath))
elif isinstance(v, list):
for d in v:
- invalid_nesting.extend(check_for_invalid_nesting(d, yaml_file, dirpath))
+ invalid_nesting.extend(check_for_invalid_nesting(
+ d,
+ yaml_file,
+ dirpath))
return invalid_nesting
+
diff --git a/ice_validator/tests/utils/network_roles.py b/ice_validator/tests/utils/network_roles.py
index d4b2cce..bed3a5a 100644
--- a/ice_validator/tests/utils/network_roles.py
+++ b/ice_validator/tests/utils/network_roles.py
@@ -43,31 +43,36 @@ import socket
def get_network_role_from_port(resource):
- """
+ '''
get the network role from a neutron port resource
- """
+ '''
if not isinstance(resource, dict):
return None
- if "type" not in resource:
+ if 'type' not in resource:
return None
- if resource["type"] != "OS::Neutron::Port":
+ if resource['type'] != 'OS::Neutron::Port':
return None
- if "properties" not in resource:
+ if 'properties' not in resource:
return None
formats = [
- ["network", "string", "internal", re.compile(r"int_(.+?)_net_id")],
- ["network", "string", "internal", re.compile(r"int_(.+?)_net_name")],
- ["network", "string", "external", re.compile(r"(.+?)_net_id")],
- ["network", "string", "external", re.compile(r"(.+?)_net_name")],
- ]
+ ["network", "string", "internal",
+ re.compile(r'int_(.+?)_net_id')],
+ ["network", "string", "internal",
+ re.compile(r'int_(.+?)_net_name')],
+ ["network", "string", "external",
+ re.compile(r'(.+?)_net_id')],
+ ["network", "string", "external",
+ re.compile(r'(.+?)_net_name')]]
for k1, v1 in resource["properties"].items():
- if k1 != "network":
+ if k1 != 'network':
continue
# get the network id or name
- network = v1.get("get_param") or v1.get("get_resource")
+ network = (
+ v1.get('get_param') or
+ v1.get('get_resource'))
if not network:
continue
@@ -79,28 +84,41 @@ def get_network_role_from_port(resource):
return None
+def get_network_roles(resources):
+ network_roles = []
+ for v in resources.values():
+ nr = get_network_role_from_port(v)
+ if nr:
+ network_roles.append(nr)
+
+ return set(network_roles)
+
+
def get_network_type_from_port(resource):
- """
+ '''
get the network type from a neutron port resource
- """
+ '''
if not isinstance(resource, dict):
return None
- if "type" not in resource:
+ if 'type' not in resource:
return None
- if resource["type"] != "OS::Neutron::Port":
+ if resource['type'] != 'OS::Neutron::Port':
return None
- if "properties" not in resource:
+ if 'properties' not in resource:
return None
formats = [
- ["network", "string", "internal", re.compile(r"int_(.+?)_net_id")],
- ["network", "string", "internal", re.compile(r"int_(.+?)_net_name")],
- ["network", "string", "external", re.compile(r"(.+?)_net_id")],
- ["network", "string", "external", re.compile(r"(.+?)_net_name")],
- ]
+ ["network", "string", "internal",
+ re.compile(r'int_(.+?)_net_id')],
+ ["network", "string", "internal",
+ re.compile(r'int_(.+?)_net_name')],
+ ["network", "string", "external",
+ re.compile(r'(.+?)_net_id')],
+ ["network", "string", "external",
+ re.compile(r'(.+?)_net_name')]]
for k1, v1 in resource["properties"].items():
- if k1 != "network":
+ if k1 != 'network':
continue
if "get_param" not in v1:
continue
@@ -112,22 +130,22 @@ def get_network_type_from_port(resource):
return None
-def is_valid_ip_address(ip_address, ip_type="ipv4"):
- """
+def is_valid_ip_address(ip_address, ip_type='ipv4'):
+ '''
check if an ip address is valid
- """
- if ip_type == "ipv4":
+ '''
+ if ip_type == 'ipv4':
return is_valid_ipv4_address(ip_address)
- elif ip_type == "ipv6":
+ elif ip_type == 'ipv6':
return is_valid_ipv6_address(ip_address)
return False
def is_valid_ipv4_address(ip_address):
- """
+ '''
check if an ip address of the type ipv4
is valid
- """
+ '''
try:
socket.inet_pton(socket.AF_INET, ip_address)
except AttributeError:
@@ -135,17 +153,17 @@ def is_valid_ipv4_address(ip_address):
socket.inet_aton(ip_address)
except (OSError, socket.error):
return False
- return ip_address.count(".") == 3
+ return ip_address.count('.') == 3
except (OSError, socket.error):
return False
return True
def is_valid_ipv6_address(ip_address):
- """
+ '''
check if an ip address of the type ipv6
is valid
- """
+ '''
try:
socket.inet_pton(socket.AF_INET6, ip_address)
except (OSError, socket.error):
@@ -154,13 +172,13 @@ def is_valid_ipv6_address(ip_address):
def property_uses_get_resource(resource, property_name):
- """
+ '''
returns true if a port's network property
uses the get_resource function
- """
+ '''
if not isinstance(resource, dict):
return False
- if "properties" not in resource:
+ if 'properties' not in resource:
return False
for k1, v1 in resource["properties"].items():
if k1 != property_name:
diff --git a/ice_validator/tests/utils/ports.py b/ice_validator/tests/utils/ports.py
index e479201..a2ae8a9 100644
--- a/ice_validator/tests/utils/ports.py
+++ b/ice_validator/tests/utils/ports.py
@@ -43,125 +43,89 @@ from .vm_types import get_vm_type_for_nova_server
import re
-def is_valid_ip_address(ip_address, vm_type, network_role, port_property):
- """
+def is_valid_ip_address(ip_address, vm_type, network_role, port_property, parameter_type):
+ '''
Check the ip_address to make sure it is properly formatted and
also contains {vm_type} and {network_role}
- """
+ '''
allowed_formats = [
- [
- "allowed_address_pairs",
- "string",
- "internal",
- re.compile(r"(.+?)_int_(.+?)_floating_v6_ip"),
- ],
- [
- "allowed_address_pairs",
- "string",
- "internal",
- re.compile(r"(.+?)_int_(.+?)_floating_ip"),
- ],
- [
- "allowed_address_pairs",
- "string",
- "external",
- re.compile(r"(.+?)_floating_v6_ip"),
- ],
- [
- "allowed_address_pairs",
- "string",
- "external",
- re.compile(r"(.+?)_floating_ip"),
- ],
- [
- "allowed_address_pairs",
- "string",
- "internal",
- re.compile(r"(.+?)_int_(.+?)_v6_ip_\d+"),
- ],
- [
- "allowed_address_pairs",
- "string",
- "internal",
- re.compile(r"(.+?)_int_(.+?)_ip_\d+"),
- ],
- ["allowed_address_pairs", "string", "external", re.compile(r"(.+?)_v6_ip_\d+")],
- ["allowed_address_pairs", "string", "external", re.compile(r"(.+?)_ip_\d+")],
- [
- "allowed_address_pairs",
- "comma_delimited_list",
- "internal",
- re.compile(r"(.+?)_int_(.+?)_v6_ips"),
- ],
- [
- "allowed_address_pairs",
- "comma_delimited_list",
- "internal",
- re.compile(r"(.+?)_int_(.+?)_ips"),
- ],
- [
- "allowed_address_pairs",
- "comma_delimited_list",
- "external",
- re.compile(r"(.+?)_v6_ips"),
- ],
- [
- "allowed_address_pairs",
- "comma_delimited_list",
- "external",
- re.compile(r"(.+?)_ips"),
- ],
- ["fixed_ips", "string", "internal", re.compile(r"(.+?)_int_(.+?)_v6_ip_\d+")],
- ["fixed_ips", "string", "internal", re.compile(r"(.+?)_int_(.+?)_ip_\d+")],
- ["fixed_ips", "string", "external", re.compile(r"(.+?)_v6_ip_\d+")],
- ["fixed_ips", "string", "external", re.compile(r"(.+?)_ip_\d+")],
- [
- "fixed_ips",
- "comma_delimited_list",
- "internal",
- re.compile(r"(.+?)_int_(.+?)_v6_ips"),
- ],
- [
- "fixed_ips",
- "comma_delimited_list",
- "internal",
- re.compile(r"(.+?)_int_(.+?)_ips"),
- ],
- ["fixed_ips", "comma_delimited_list", "external", re.compile(r"(.+?)_v6_ips")],
- ["fixed_ips", "comma_delimited_list", "external", re.compile(r"(.+?)_ips")],
- ]
+ ["allowed_address_pairs", "string", "internal",
+ re.compile(r'(.+?)_int_(.+?)_floating_v6_ip')],
+ ["allowed_address_pairs", "string", "internal",
+ re.compile(r'(.+?)_int_(.+?)_floating_ip')],
+ ["allowed_address_pairs", "string", "external",
+ re.compile(r'(.+?)_floating_v6_ip')],
+ ["allowed_address_pairs", "string", "external",
+ re.compile(r'(.+?)_floating_ip')],
+ ["allowed_address_pairs", "string", "internal",
+ re.compile(r'(.+?)_int_(.+?)_v6_ip_\d+')],
+ ["allowed_address_pairs", "string", "internal",
+ re.compile(r'(.+?)_int_(.+?)_ip_\d+')],
+ ["allowed_address_pairs", "string", "external",
+ re.compile(r'(.+?)_v6_ip_\d+')],
+ ["allowed_address_pairs", "string", "external",
+ re.compile(r'(.+?)_ip_\d+')],
+ ["allowed_address_pairs", "comma_delimited_list",
+ "internal", re.compile(r'(.+?)_int_(.+?)_v6_ips')],
+ ["allowed_address_pairs", "comma_delimited_list",
+ "internal", re.compile(r'(.+?)_int_(.+?)_ips')],
+ ["allowed_address_pairs", "comma_delimited_list",
+ "external", re.compile(r'(.+?)_v6_ips')],
+ ["allowed_address_pairs", "comma_delimited_list",
+ "external", re.compile(r'(.+?)_ips')],
+ ["fixed_ips", "string", "internal",
+ re.compile(r'(.+?)_int_(.+?)_v6_ip_\d+')],
+ ["fixed_ips", "string", "internal",
+ re.compile(r'(.+?)_int_(.+?)_ip_\d+')],
+ ["fixed_ips", "string", "external",
+ re.compile(r'(.+?)_v6_ip_\d+')],
+ ["fixed_ips", "string", "external",
+ re.compile(r'(.+?)_ip_\d+')],
+ ["fixed_ips", "comma_delimited_list", "internal",
+ re.compile(r'(.+?)_int_(.+?)_v6_ips')],
+ ["fixed_ips", "comma_delimited_list", "internal",
+ re.compile(r'(.+?)_int_(.+?)_ips')],
+ ["fixed_ips", "comma_delimited_list", "external",
+ re.compile(r'(.+?)_v6_ips')],
+ ["fixed_ips", "comma_delimited_list", "external",
+ re.compile(r'(.+?)_ips')]]
for v3 in allowed_formats:
+ if v3[1] != parameter_type:
+ continue
if v3[0] != port_property:
continue
# check if pattern matches
m = v3[3].match(ip_address)
if m:
- if v3[2] == "internal" and len(m.groups()) > 1:
- return m.group(1) == vm_type and m.group(2) == network_role
- elif v3[2] == "external" and len(m.groups()) > 0:
+ if (v3[2] == "internal" and
+ len(m.groups()) > 1):
+ return m.group(1) == vm_type and\
+ m.group(2) == network_role
+ elif (v3[2] == "external" and
+ len(m.groups()) > 0):
return m.group(1) == vm_type + "_" + network_role
return False
-def get_invalid_ip_addresses(resources, port_property):
- """
+def get_invalid_ip_addresses(resources, port_property, parameters):
+ '''
Get a list of valid ip addresses for a heat resources section
- """
+ '''
invalid_ip_addresses = []
for k, v in resources.items():
if not isinstance(v, dict):
continue
- if "type" not in v:
+ if 'type' not in v:
continue
- if v["type"] not in "OS::Nova::Server":
+ if v['type'] not in 'OS::Nova::Server':
continue
- if "properties" not in v:
+ if 'properties' not in v:
continue
- if "networks" not in v["properties"]:
+ if 'networks' not in v['properties']:
continue
port_resource = None
@@ -171,16 +135,16 @@ def get_invalid_ip_addresses(resources, port_property):
continue
# get all ports associated with the nova server
- properties = v["properties"]
- for network in properties["networks"]:
+ properties = v['properties']
+ for network in properties['networks']:
for k3, v3 in network.items():
- if k3 != "port":
+ if k3 != 'port':
continue
if not isinstance(v3, dict):
continue
- if "get_resource" in v3:
- port_id = v3["get_resource"]
+ if 'get_resource' in v3:
+ port_id = v3['get_resource']
if not resources[port_id]:
continue
port_resource = resources[port_id]
@@ -199,15 +163,23 @@ def get_invalid_ip_addresses(resources, port_property):
continue
if "get_param" not in v2["ip_address"]:
continue
-
ip_address = v2["ip_address"]["get_param"]
if isinstance(ip_address, list):
ip_address = ip_address[0]
- valid_ip_address = is_valid_ip_address(
- ip_address, vm_type, network_role, port_property
- )
+ if ip_address not in parameters:
+ continue
+
+ parameter_type = parameters[ip_address].get("type")
+ if not parameter_type:
+ continue
+
+ valid_ip_address = is_valid_ip_address(ip_address,
+ vm_type,
+ network_role,
+ port_property,
+ parameter_type)
if not valid_ip_address:
invalid_ip_addresses.append(ip_address)
@@ -216,14 +188,15 @@ def get_invalid_ip_addresses(resources, port_property):
def is_reserved_port(port_id):
- """
+ '''
Checks to see if the resource id for a port follows
the reserve port concept
- """
+ '''
formats = [
- ["port_id", re.compile(r"reserve_port_(.+?)_floating_ip_\d+")],
- ["port_id", re.compile(r"reserve_port_(.+?)_floating_v6_ip_\d+")],
- ]
+ ["port_id",
+ re.compile(r'reserve_port_(.+?)_floating_ip_\d+')],
+ ["port_id",
+ re.compile(r'reserve_port_(.+?)_floating_v6_ip_\d+')]]
for f in formats:
m = f[1].match(port_id.lower())
if m and m.group(1):
diff --git a/ice_validator/tests/utils/vm_types.py b/ice_validator/tests/utils/vm_types.py
index 78006b9..6802666 100644
--- a/ice_validator/tests/utils/vm_types.py
+++ b/ice_validator/tests/utils/vm_types.py
@@ -42,33 +42,36 @@ import re
def get_vm_types_for_resource(resource):
- """
+ '''
Get all unique vm_types for a resource
Notes:
- Returns set([]) if the resource is not formatted
properly, the passed resource is not a nova server
- If more than one vm_type is detected all vm_types will
be returned
- """
+ '''
if not isinstance(resource, dict):
return set()
- if "type" not in resource:
+ if 'type' not in resource:
return set()
- if resource["type"] != "OS::Nova::Server":
+ if resource['type'] != 'OS::Nova::Server':
return set()
- if "properties" not in resource:
+ if 'properties' not in resource:
return set()
key_values = ["name", "flavor", "image"]
key_value_formats = [
- ["name", "string", re.compile(r"(.+?)_name_\d+")],
- ["name", "comma_delimited_list", re.compile(r"(.+?)_names")],
- ["flavor", "string", re.compile(r"(.+?)_flavor_name")],
- ["image", "string", re.compile(r"(.+?)_image_name")],
- ]
+ ["name", "string",
+ re.compile(r'(.+?)_name_\d+')],
+ ["name", "comma_delimited_list",
+ re.compile(r'(.+?)_names')],
+ ["flavor", "string",
+ re.compile(r'(.+?)_flavor_name')],
+ ["image", "string",
+ re.compile(r'(.+?)_image_name')]]
vm_types = []
- for k2, v2 in resource["properties"].items():
+ for k2, v2 in resource['properties'].items():
if k2 not in key_values:
continue
if "get_param" not in v2:
@@ -86,12 +89,12 @@ def get_vm_types_for_resource(resource):
def get_vm_type_for_nova_server(resource):
- """
+ '''
Get the vm_type for a resource
Note: Returns None if not exactly one vm_type
is detected, if the resource is not formatted properly, or
the passed resource is not a nova server
- """
+ '''
vm_types = get_vm_types_for_resource(resource)
# if more than one vm_type was identified, return None
@@ -102,10 +105,10 @@ def get_vm_type_for_nova_server(resource):
def get_vm_types(resources):
- """
+ '''
Get all vm_types for a list of heat resources, do note that
some of the values retrieved may be invalid
- """
+ '''
vm_types = []
for v in resources.values():
vm_types.extend(list(get_vm_types_for_resource(v)))
diff --git a/ice_validator/tests/utils/volumes.py b/ice_validator/tests/utils/volumes.py
index c64c0ee..40731bf 100644
--- a/ice_validator/tests/utils/volumes.py
+++ b/ice_validator/tests/utils/volumes.py
@@ -42,7 +42,7 @@
"""
from os import path
-import yaml
+from tests import cached_yaml as yaml
VERSION = '1.0.0'