diff options
Diffstat (limited to 'ice_validator/tests/utils')
-rw-r--r-- | ice_validator/tests/utils/ports.py | 234 |
1 files changed, 140 insertions, 94 deletions
diff --git a/ice_validator/tests/utils/ports.py b/ice_validator/tests/utils/ports.py index d65202c..1f5f658 100644 --- a/ice_validator/tests/utils/ports.py +++ b/ice_validator/tests/utils/ports.py @@ -78,7 +78,6 @@ def check_parameter_format( heat = Heat(filepath=yaml_file) resource_type = resource_processor.resource_type resources = heat.get_resource_by_type(resource_type) - heat_parameters = heat.parameters for rid, resource in resources.items(): resource_intext, port_match = resource_processor.get_rid_match_tuple(rid) if not port_match: @@ -88,7 +87,6 @@ def check_parameter_format( resource_intext != intext ): # skipping if type (internal/external) doesn't match continue - for param in prop_iterator(resource, *properties): if ( param @@ -96,108 +94,156 @@ def check_parameter_format( and "get_resource" not in param and "get_attr" not in param ): - # checking parameter uses get_param - parameter = param.get("get_param") - if not parameter: - msg = ( - "Unexpected parameter format for {} {} property {}: {}. " - "Please consult the heat guidelines documentation for details." - ).format(resource_type, rid, properties, param) - invalid_parameters.append(msg) # should this be a failure? - continue - - # getting parameter if the get_param uses list, and getting official - # HEAT parameter type - parameter_type = parameter_type_to_heat_type(parameter) - if parameter_type == "comma_delimited_list": - parameter = parameter[0] - elif parameter_type != "string": - continue - - # checking parameter format = parameter type defined in parameters - # section - heat_parameter_type = nested_dict.get( - heat_parameters, parameter, "type" - ) - if not heat_parameter_type or heat_parameter_type != parameter_type: - msg = ( - "{} {} parameter {} defined as type {} " - + "is being used as type {} in the heat template" - ).format( - resource_type, - properties, - parameter, - heat_parameter_type, - parameter_type, + template_parameters = [] + if "str_replace" in param: + # print(param) + template_parameters.extend( + v + for k, v in nested_dict.get( + param, "str_replace", "params", default={} + ).items() ) - invalid_parameters.append(msg) # should this actually be an error? - continue - - if exemptions_allowed and parameter in get_aap_exemptions(resource): - continue - - # if parameter type is not in regx dict, then it is not supported - # by automation - regx_dict = regx[resource_intext].get(parameter_type) - if not regx_dict: - msg = ( - "{} {} {} parameter {} defined as type {} " - "which is required by platform data model for proper " - "assignment and inventory." - ).format(resource_type, rid, properties, parameter, parameter_type) - if exemptions_allowed: - msg = "WARNING: {} {}".format(msg, AAP_EXEMPT_CAVEAT) - invalid_parameters.append(msg) - continue - - # checking if param adheres to guidelines format - regexp = regx[resource_intext][parameter_type]["machine"] - readable_format = regx[resource_intext][parameter_type]["readable"] - match = regexp.match(parameter) - if not match: - msg = ( - "{} {} property {} parameter {} does not follow {} " - "format {} which is required by platform data model for proper " - "assignment and inventory." - ).format( + else: + template_parameters.append(param) + + invalid_template_parameters = [] + for template_parameter in template_parameters: + # Looping through each parameter to check + # the only case where there can be more than 1 is + # if using str_replace + msg = validate_port_parameter( resource_type, rid, properties, - parameter, + template_parameter, resource_intext, - readable_format, + resource, + regx, + port_match, + exemptions_allowed, ) - if exemptions_allowed: - msg = "WARNING: {} {}".format(msg, AAP_EXEMPT_CAVEAT) - invalid_parameters.append(msg) - continue - - # checking that parameter includes correct vm_type/network_role - parameter_checks = regx.get("parameter_to_resource_comparisons", []) - for check in parameter_checks: - resource_match = port_match.group(check) - if ( - resource_match - and not parameter.startswith(resource_match) - and parameter.find("_{}_".format(resource_match)) == -1 - ): - msg = ( - "{0} {1} property {2} parameter " - "{3} {4} does match resource {4} {5}" - ).format( - resource_type, - rid, - properties, - parameter, - check, - resource_match, - ) - invalid_parameters.append(msg) - continue + + if not msg: + # if we found a valid parameter then + # reset invalide_template_parameters + # and break out of loop + invalid_template_parameters = [] + break + else: + # haven't found a valid parameter yet + invalid_template_parameters.append(msg) + + invalid_parameters.extend(x for x in invalid_template_parameters) assert not invalid_parameters, "%s" % "\n".join(invalid_parameters) +def validate_port_parameter( + resource_type, + rid, + properties, + param, + resource_intext, + resource, + regx, + port_match, + exemptions_allowed, +): + """ + Performs 4 validations + + 1) param actually uses get_param + 2) parameter_type + network_type (internal/external) is a valid combination + 3) parameter format matches expected format from input dictionary + 4) the vm_type or network role from resource matches parameter + + If the parameter is present in the resource metadata + and exemptions are allowed, then the validation will be skipped. + """ + parameter = param.get("get_param") + if not parameter: + return ( + "Unexpected parameter format for {} {} property {}: {}. " + "Please consult the heat guidelines documentation for details." + ).format(resource_type, rid, properties, param) + + # getting parameter if the get_param uses list, and getting official + # HEAT parameter type + parameter_type = parameter_type_to_heat_type(parameter) + if parameter_type == "comma_delimited_list": + parameter = parameter[0] + elif parameter_type != "string": + return None + + if exemptions_allowed and parameter in get_aap_exemptions(resource): + return None + + # if parameter type is not in regx dict, then it is not supported + # by automation + regx_dict = regx[resource_intext].get(parameter_type) + if not regx_dict: + msg = ( + "{} {} {} parameter {} defined as type {} " + "which is required by platform data model for proper " + "assignment and inventory." + ).format(resource_type, rid, properties, parameter, parameter_type) + if exemptions_allowed: + msg = "WARNING: {} {}".format(msg, AAP_EXEMPT_CAVEAT) + return msg + + msg = validate_parameter_format( + regx, parameter_type, resource_intext, parameter, rid, exemptions_allowed + ) + if msg: + return msg + + # checking that parameter includes correct vm_type/network_role + parameter_checks = regx.get("parameter_to_resource_comparisons", []) + for check in parameter_checks: + msg = mismatch_resource_and_parameter_attribute( + check, port_match, parameter, rid + ) + if msg: + return msg + + return None + + +def validate_parameter_format( + regx, parameter_type, resource_intext, parameter, rid, exemptions_allowed +): + """Checks if a parameter format matches the expected format + from input format dictionary""" + msg = None + regexp = regx[resource_intext][parameter_type]["machine"] + readable_format = regx[resource_intext][parameter_type]["readable"] + match = regexp.match(parameter) + if not match: + msg = ( + "{} property parameter {} does not follow {} " + "format {} which is required by platform data model for proper " + "assignment and inventory." + ).format(rid, parameter, resource_intext, readable_format) + if exemptions_allowed: + msg = "WARNING: {} {}".format(msg, AAP_EXEMPT_CAVEAT) + + return msg + + +def mismatch_resource_and_parameter_attribute(check, resource_re_match, parameter, rid): + """Compares vm_type or network_role from resource + is the same as found in parameter""" + resource_match = resource_re_match.group(check) + if ( + resource_match + and not parameter.startswith(resource_match) + and parameter.find("_{}_".format(resource_match)) == -1 + ): + return ("{0} {1} does not match parameter {2} {1}").format( + rid, check, parameter + ) + + def get_list_of_ports_attached_to_nova_server(nova_server): networks_list = nova_server.get("properties", {}).get("networks") |