aboutsummaryrefslogtreecommitdiffstats
path: root/ice_validator/tests/utils
diff options
context:
space:
mode:
Diffstat (limited to 'ice_validator/tests/utils')
-rw-r--r--ice_validator/tests/utils/__init__.py6
-rw-r--r--ice_validator/tests/utils/nested_files.py50
-rw-r--r--ice_validator/tests/utils/network_roles.py82
-rw-r--r--ice_validator/tests/utils/ports.py176
-rw-r--r--ice_validator/tests/utils/vm_types.py34
5 files changed, 177 insertions, 171 deletions
diff --git a/ice_validator/tests/utils/__init__.py b/ice_validator/tests/utils/__init__.py
index e8f24cd..ec11176 100644
--- a/ice_validator/tests/utils/__init__.py
+++ b/ice_validator/tests/utils/__init__.py
@@ -2,11 +2,11 @@
# ============LICENSE_START=======================================================
# org.onap.vvp/validation-scripts
# ===================================================================
-# Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+# Copyright © 2018 AT&T Intellectual Property. All rights reserved.
# ===================================================================
#
# Unless otherwise specified, all software contained herein is licensed
-# under the Apache License, Version 2.0 (the “License”);
+# under the Apache License, Version 2.0 (the "License");
# you may not use this software except in compliance with the License.
# You may obtain a copy of the License at
#
@@ -21,7 +21,7 @@
#
#
# Unless otherwise specified, all documentation contained herein is licensed
-# under the Creative Commons License, Attribution 4.0 Intl. (the “License”);
+# under the Creative Commons License, Attribution 4.0 Intl. (the "License");
# you may not use this documentation except in compliance with the License.
# You may obtain a copy of the License at
#
diff --git a/ice_validator/tests/utils/nested_files.py b/ice_validator/tests/utils/nested_files.py
index 39306eb..02f733d 100644
--- a/ice_validator/tests/utils/nested_files.py
+++ b/ice_validator/tests/utils/nested_files.py
@@ -45,15 +45,15 @@ from os import path
import re
import yaml
-VERSION = '1.0.2'
+VERSION = "1.0.2"
def get_list_of_nested_files(yml, dirpath):
- '''
+ """
return a list of all nested files
- '''
+ """
- if not hasattr(yml, 'items'):
+ if not hasattr(yml, "items"):
return []
nested_files = []
@@ -68,34 +68,29 @@ def get_list_of_nested_files(yml, dirpath):
nested_files.append(filepath)
nested_files.extend(get_list_of_nested_files(t_yml, dirpath))
elif t == "OS::Heat::ResourceGroup":
- rdt = (v.get("properties", {})
- .get("resource_def", {})
- .get("type", None))
+ rdt = v.get("properties", {}).get("resource_def", {}).get("type", None)
if rdt and (rdt.endswith(".yml") or rdt.endswith(".yaml")):
filepath = path.join(dirpath, rdt)
with open(filepath) as fh:
rdt_yml = yaml.load(fh)
nested_files.append(filepath)
- nested_files.extend(
- get_list_of_nested_files(rdt_yml, dirpath))
+ nested_files.extend(get_list_of_nested_files(rdt_yml, dirpath))
if isinstance(v, dict):
- nested_files.extend(
- get_list_of_nested_files(v, dirpath))
+ nested_files.extend(get_list_of_nested_files(v, dirpath))
elif isinstance(v, list):
for d in v:
- nested_files.extend(
- get_list_of_nested_files(d, dirpath))
+ nested_files.extend(get_list_of_nested_files(d, dirpath))
return nested_files
def check_for_invalid_nesting(yml, yaml_file, dirpath):
- '''
+ """
return a list of all nested files
- '''
- if not hasattr(yml, 'items'):
+ """
+ if not hasattr(yml, "items"):
return []
invalid_nesting = []
- p = re.compile('^[A-z]*::[A-z]*::[A-z]*$')
+ p = re.compile("^[A-z]*::[A-z]*::[A-z]*$")
for v in yml.values():
if isinstance(v, dict) and "type" in v:
@@ -108,8 +103,8 @@ def check_for_invalid_nesting(yml, yaml_file, dirpath):
invalid_nesting.append(yaml_file)
continue
elif not p.match(rd["type"]) and not (
- rd["type"].endswith(".yml")
- or rd["type"].endswith(".yaml")):
+ rd["type"].endswith(".yml") or rd["type"].endswith(".yaml")
+ ):
filepath = path.join(dirpath, rd["type"])
else:
continue
@@ -120,20 +115,11 @@ def check_for_invalid_nesting(yml, yaml_file, dirpath):
yml = yaml.load(fh)
except yaml.YAMLError as e:
invalid_nesting.append(filepath)
- print(e) # pylint: disable=superfluous-parens
- invalid_nesting.extend(check_for_invalid_nesting(
- yml,
- filepath,
- dirpath))
+ print(e) # pylint: disable=superfluous-parens
+ invalid_nesting.extend(check_for_invalid_nesting(yml, filepath, dirpath))
if isinstance(v, dict):
- invalid_nesting.extend(check_for_invalid_nesting(
- v,
- yaml_file,
- dirpath))
+ invalid_nesting.extend(check_for_invalid_nesting(v, yaml_file, dirpath))
elif isinstance(v, list):
for d in v:
- invalid_nesting.extend(check_for_invalid_nesting(
- d,
- yaml_file,
- dirpath))
+ invalid_nesting.extend(check_for_invalid_nesting(d, yaml_file, dirpath))
return invalid_nesting
diff --git a/ice_validator/tests/utils/network_roles.py b/ice_validator/tests/utils/network_roles.py
index 3f8d6b8..d4b2cce 100644
--- a/ice_validator/tests/utils/network_roles.py
+++ b/ice_validator/tests/utils/network_roles.py
@@ -43,37 +43,31 @@ import socket
def get_network_role_from_port(resource):
- '''
+ """
get the network role from a neutron port resource
- '''
+ """
if not isinstance(resource, dict):
return None
- if 'type' not in resource:
+ if "type" not in resource:
return None
- if resource['type'] != 'OS::Neutron::Port':
+ if resource["type"] != "OS::Neutron::Port":
return None
- if 'properties' not in resource:
+ if "properties" not in resource:
return None
formats = [
- ["network", "string", "internal",
- re.compile(r'int_(.+?)_net_id')],
- ["network", "string", "internal",
- re.compile(r'int_(.+?)_net_name')],
- ["network", "string", "external",
- re.compile(r'(.+?)_net_id')],
- ["network", "string", "external",
- re.compile(r'(.+?)_net_name')],
- ]
+ ["network", "string", "internal", re.compile(r"int_(.+?)_net_id")],
+ ["network", "string", "internal", re.compile(r"int_(.+?)_net_name")],
+ ["network", "string", "external", re.compile(r"(.+?)_net_id")],
+ ["network", "string", "external", re.compile(r"(.+?)_net_name")],
+ ]
for k1, v1 in resource["properties"].items():
- if k1 != 'network':
+ if k1 != "network":
continue
# get the network id or name
- network = (
- v1.get('get_param') or
- v1.get('get_resource'))
+ network = v1.get("get_param") or v1.get("get_resource")
if not network:
continue
@@ -86,31 +80,27 @@ def get_network_role_from_port(resource):
def get_network_type_from_port(resource):
- '''
+ """
get the network type from a neutron port resource
- '''
+ """
if not isinstance(resource, dict):
return None
- if 'type' not in resource:
+ if "type" not in resource:
return None
- if resource['type'] != 'OS::Neutron::Port':
+ if resource["type"] != "OS::Neutron::Port":
return None
- if 'properties' not in resource:
+ if "properties" not in resource:
return None
formats = [
- ["network", "string", "internal",
- re.compile(r'int_(.+?)_net_id')],
- ["network", "string", "internal",
- re.compile(r'int_(.+?)_net_name')],
- ["network", "string", "external",
- re.compile(r'(.+?)_net_id')],
- ["network", "string", "external",
- re.compile(r'(.+?)_net_name')],
- ]
+ ["network", "string", "internal", re.compile(r"int_(.+?)_net_id")],
+ ["network", "string", "internal", re.compile(r"int_(.+?)_net_name")],
+ ["network", "string", "external", re.compile(r"(.+?)_net_id")],
+ ["network", "string", "external", re.compile(r"(.+?)_net_name")],
+ ]
for k1, v1 in resource["properties"].items():
- if k1 != 'network':
+ if k1 != "network":
continue
if "get_param" not in v1:
continue
@@ -122,22 +112,22 @@ def get_network_type_from_port(resource):
return None
-def is_valid_ip_address(ip_address, ip_type='ipv4'):
- '''
+def is_valid_ip_address(ip_address, ip_type="ipv4"):
+ """
check if an ip address is valid
- '''
- if ip_type == 'ipv4':
+ """
+ if ip_type == "ipv4":
return is_valid_ipv4_address(ip_address)
- elif ip_type == 'ipv6':
+ elif ip_type == "ipv6":
return is_valid_ipv6_address(ip_address)
return False
def is_valid_ipv4_address(ip_address):
- '''
+ """
check if an ip address of the type ipv4
is valid
- '''
+ """
try:
socket.inet_pton(socket.AF_INET, ip_address)
except AttributeError:
@@ -145,17 +135,17 @@ def is_valid_ipv4_address(ip_address):
socket.inet_aton(ip_address)
except (OSError, socket.error):
return False
- return ip_address.count('.') == 3
+ return ip_address.count(".") == 3
except (OSError, socket.error):
return False
return True
def is_valid_ipv6_address(ip_address):
- '''
+ """
check if an ip address of the type ipv6
is valid
- '''
+ """
try:
socket.inet_pton(socket.AF_INET6, ip_address)
except (OSError, socket.error):
@@ -164,13 +154,13 @@ def is_valid_ipv6_address(ip_address):
def property_uses_get_resource(resource, property_name):
- '''
+ """
returns true if a port's network property
uses the get_resource function
- '''
+ """
if not isinstance(resource, dict):
return False
- if 'properties' not in resource:
+ if "properties" not in resource:
return False
for k1, v1 in resource["properties"].items():
if k1 != property_name:
diff --git a/ice_validator/tests/utils/ports.py b/ice_validator/tests/utils/ports.py
index c9d4be9..e479201 100644
--- a/ice_validator/tests/utils/ports.py
+++ b/ice_validator/tests/utils/ports.py
@@ -44,53 +44,93 @@ import re
def is_valid_ip_address(ip_address, vm_type, network_role, port_property):
- '''
+ """
Check the ip_address to make sure it is properly formatted and
also contains {vm_type} and {network_role}
- '''
+ """
allowed_formats = [
- ["allowed_address_pairs", "string", "internal",
- re.compile(r'(.+?)_int_(.+?)_floating_v6_ip')],
- ["allowed_address_pairs", "string", "internal",
- re.compile(r'(.+?)_int_(.+?)_floating_ip')],
- ["allowed_address_pairs", "string", "external",
- re.compile(r'(.+?)_floating_v6_ip')],
- ["allowed_address_pairs", "string", "external",
- re.compile(r'(.+?)_floating_ip')],
- ["allowed_address_pairs", "string", "internal",
- re.compile(r'(.+?)_int_(.+?)_v6_ip_\d+')],
- ["allowed_address_pairs", "string", "internal",
- re.compile(r'(.+?)_int_(.+?)_ip_\d+')],
- ["allowed_address_pairs", "string", "external",
- re.compile(r'(.+?)_v6_ip_\d+')],
- ["allowed_address_pairs", "string", "external",
- re.compile(r'(.+?)_ip_\d+')],
- ["allowed_address_pairs", "comma_delimited_list",
- "internal", re.compile(r'(.+?)_int_(.+?)_v6_ips')],
- ["allowed_address_pairs", "comma_delimited_list",
- "internal", re.compile(r'(.+?)_int_(.+?)_ips')],
- ["allowed_address_pairs", "comma_delimited_list",
- "external", re.compile(r'(.+?)_v6_ips')],
- ["allowed_address_pairs", "comma_delimited_list",
- "external", re.compile(r'(.+?)_ips')],
- ["fixed_ips", "string", "internal",
- re.compile(r'(.+?)_int_(.+?)_v6_ip_\d+')],
- ["fixed_ips", "string", "internal",
- re.compile(r'(.+?)_int_(.+?)_ip_\d+')],
- ["fixed_ips", "string", "external",
- re.compile(r'(.+?)_v6_ip_\d+')],
- ["fixed_ips", "string", "external",
- re.compile(r'(.+?)_ip_\d+')],
- ["fixed_ips", "comma_delimited_list", "internal",
- re.compile(r'(.+?)_int_(.+?)_v6_ips')],
- ["fixed_ips", "comma_delimited_list", "internal",
- re.compile(r'(.+?)_int_(.+?)_ips')],
- ["fixed_ips", "comma_delimited_list", "external",
- re.compile(r'(.+?)_v6_ips')],
- ["fixed_ips", "comma_delimited_list", "external",
- re.compile(r'(.+?)_ips')],
- ]
+ [
+ "allowed_address_pairs",
+ "string",
+ "internal",
+ re.compile(r"(.+?)_int_(.+?)_floating_v6_ip"),
+ ],
+ [
+ "allowed_address_pairs",
+ "string",
+ "internal",
+ re.compile(r"(.+?)_int_(.+?)_floating_ip"),
+ ],
+ [
+ "allowed_address_pairs",
+ "string",
+ "external",
+ re.compile(r"(.+?)_floating_v6_ip"),
+ ],
+ [
+ "allowed_address_pairs",
+ "string",
+ "external",
+ re.compile(r"(.+?)_floating_ip"),
+ ],
+ [
+ "allowed_address_pairs",
+ "string",
+ "internal",
+ re.compile(r"(.+?)_int_(.+?)_v6_ip_\d+"),
+ ],
+ [
+ "allowed_address_pairs",
+ "string",
+ "internal",
+ re.compile(r"(.+?)_int_(.+?)_ip_\d+"),
+ ],
+ ["allowed_address_pairs", "string", "external", re.compile(r"(.+?)_v6_ip_\d+")],
+ ["allowed_address_pairs", "string", "external", re.compile(r"(.+?)_ip_\d+")],
+ [
+ "allowed_address_pairs",
+ "comma_delimited_list",
+ "internal",
+ re.compile(r"(.+?)_int_(.+?)_v6_ips"),
+ ],
+ [
+ "allowed_address_pairs",
+ "comma_delimited_list",
+ "internal",
+ re.compile(r"(.+?)_int_(.+?)_ips"),
+ ],
+ [
+ "allowed_address_pairs",
+ "comma_delimited_list",
+ "external",
+ re.compile(r"(.+?)_v6_ips"),
+ ],
+ [
+ "allowed_address_pairs",
+ "comma_delimited_list",
+ "external",
+ re.compile(r"(.+?)_ips"),
+ ],
+ ["fixed_ips", "string", "internal", re.compile(r"(.+?)_int_(.+?)_v6_ip_\d+")],
+ ["fixed_ips", "string", "internal", re.compile(r"(.+?)_int_(.+?)_ip_\d+")],
+ ["fixed_ips", "string", "external", re.compile(r"(.+?)_v6_ip_\d+")],
+ ["fixed_ips", "string", "external", re.compile(r"(.+?)_ip_\d+")],
+ [
+ "fixed_ips",
+ "comma_delimited_list",
+ "internal",
+ re.compile(r"(.+?)_int_(.+?)_v6_ips"),
+ ],
+ [
+ "fixed_ips",
+ "comma_delimited_list",
+ "internal",
+ re.compile(r"(.+?)_int_(.+?)_ips"),
+ ],
+ ["fixed_ips", "comma_delimited_list", "external", re.compile(r"(.+?)_v6_ips")],
+ ["fixed_ips", "comma_delimited_list", "external", re.compile(r"(.+?)_ips")],
+ ]
for v3 in allowed_formats:
if v3[0] != port_property:
@@ -98,33 +138,30 @@ def is_valid_ip_address(ip_address, vm_type, network_role, port_property):
# check if pattern matches
m = v3[3].match(ip_address)
if m:
- if (v3[2] == "internal" and
- len(m.groups()) > 1):
- return m.group(1) == vm_type and\
- m.group(2) == network_role
- elif (v3[2] == "external" and
- len(m.groups()) > 0):
+ if v3[2] == "internal" and len(m.groups()) > 1:
+ return m.group(1) == vm_type and m.group(2) == network_role
+ elif v3[2] == "external" and len(m.groups()) > 0:
return m.group(1) == vm_type + "_" + network_role
return False
def get_invalid_ip_addresses(resources, port_property):
- '''
+ """
Get a list of valid ip addresses for a heat resources section
- '''
+ """
invalid_ip_addresses = []
for k, v in resources.items():
if not isinstance(v, dict):
continue
- if 'type' not in v:
+ if "type" not in v:
continue
- if v['type'] not in 'OS::Nova::Server':
+ if v["type"] not in "OS::Nova::Server":
continue
- if 'properties' not in v:
+ if "properties" not in v:
continue
- if 'networks' not in v['properties']:
+ if "networks" not in v["properties"]:
continue
port_resource = None
@@ -134,16 +171,16 @@ def get_invalid_ip_addresses(resources, port_property):
continue
# get all ports associated with the nova server
- properties = v['properties']
- for network in properties['networks']:
+ properties = v["properties"]
+ for network in properties["networks"]:
for k3, v3 in network.items():
- if k3 != 'port':
+ if k3 != "port":
continue
if not isinstance(v3, dict):
continue
- if 'get_resource' in v3:
- port_id = v3['get_resource']
+ if "get_resource" in v3:
+ port_id = v3["get_resource"]
if not resources[port_id]:
continue
port_resource = resources[port_id]
@@ -168,10 +205,9 @@ def get_invalid_ip_addresses(resources, port_property):
if isinstance(ip_address, list):
ip_address = ip_address[0]
- valid_ip_address = is_valid_ip_address(ip_address,
- vm_type,
- network_role,
- port_property)
+ valid_ip_address = is_valid_ip_address(
+ ip_address, vm_type, network_role, port_property
+ )
if not valid_ip_address:
invalid_ip_addresses.append(ip_address)
@@ -180,16 +216,14 @@ def get_invalid_ip_addresses(resources, port_property):
def is_reserved_port(port_id):
- '''
+ """
Checks to see if the resource id for a port follows
the reserve port concept
- '''
+ """
formats = [
- ["port_id",
- re.compile(r'reserve_port_(.+?)_floating_ip_\d+')],
- ["port_id",
- re.compile(r'reserve_port_(.+?)_floating_v6_ip_\d+')],
- ]
+ ["port_id", re.compile(r"reserve_port_(.+?)_floating_ip_\d+")],
+ ["port_id", re.compile(r"reserve_port_(.+?)_floating_v6_ip_\d+")],
+ ]
for f in formats:
m = f[1].match(port_id.lower())
if m and m.group(1):
diff --git a/ice_validator/tests/utils/vm_types.py b/ice_validator/tests/utils/vm_types.py
index 41da7d9..78006b9 100644
--- a/ice_validator/tests/utils/vm_types.py
+++ b/ice_validator/tests/utils/vm_types.py
@@ -42,37 +42,33 @@ import re
def get_vm_types_for_resource(resource):
- '''
+ """
Get all unique vm_types for a resource
Notes:
- Returns set([]) if the resource is not formatted
properly, the passed resource is not a nova server
- If more than one vm_type is detected all vm_types will
be returned
- '''
+ """
if not isinstance(resource, dict):
return set()
- if 'type' not in resource:
+ if "type" not in resource:
return set()
- if resource['type'] != 'OS::Nova::Server':
+ if resource["type"] != "OS::Nova::Server":
return set()
- if 'properties' not in resource:
+ if "properties" not in resource:
return set()
key_values = ["name", "flavor", "image"]
key_value_formats = [
- ["name", "string",
- re.compile(r'(.+?)_name_\d+')],
- ["name", "comma_delimited_list",
- re.compile(r'(.+?)_names')],
- ["flavor", "string",
- re.compile(r'(.+?)_flavor_name')],
- ["image", "string",
- re.compile(r'(.+?)_image_name')],
- ]
+ ["name", "string", re.compile(r"(.+?)_name_\d+")],
+ ["name", "comma_delimited_list", re.compile(r"(.+?)_names")],
+ ["flavor", "string", re.compile(r"(.+?)_flavor_name")],
+ ["image", "string", re.compile(r"(.+?)_image_name")],
+ ]
vm_types = []
- for k2, v2 in resource['properties'].items():
+ for k2, v2 in resource["properties"].items():
if k2 not in key_values:
continue
if "get_param" not in v2:
@@ -90,12 +86,12 @@ def get_vm_types_for_resource(resource):
def get_vm_type_for_nova_server(resource):
- '''
+ """
Get the vm_type for a resource
Note: Returns None if not exactly one vm_type
is detected, if the resource is not formatted properly, or
the passed resource is not a nova server
- '''
+ """
vm_types = get_vm_types_for_resource(resource)
# if more than one vm_type was identified, return None
@@ -106,10 +102,10 @@ def get_vm_type_for_nova_server(resource):
def get_vm_types(resources):
- '''
+ """
Get all vm_types for a list of heat resources, do note that
some of the values retrieved may be invalid
- '''
+ """
vm_types = []
for v in resources.values():
vm_types.extend(list(get_vm_types_for_resource(v)))