aboutsummaryrefslogtreecommitdiffstats
path: root/ice_validator/tests/conftest.py
diff options
context:
space:
mode:
authorLovett, Trevor (tl2972) <tl2972@att.com>2018-08-22 11:13:45 -0500
committerLovett, Trevor (tl2972) <tl2972@att.com>2018-08-22 12:52:32 -0500
commitf5edc06be0d8bedeb0904b348ba5e3e67c74f186 (patch)
tree5fba01ca97aebe14f4b7004551bc58c17a858169 /ice_validator/tests/conftest.py
parent1af0d577ab6d8c431ae1322657c50efd5e0a1a93 (diff)
[VVP] Added new three new reports
Resolves VVP-81 by introducing three new reports to the validation script output. Each report is written to the output directory under ice_validator. * mapping_errors.csv - each row represents an issue where a test is mapped to a requirement that does not exist in the requirement.json file * traceability.csv - Shows a mapping between all test cases and requirements. Rows will also be shown where a requirement does not have a tests, and where tests are not mapped to a requirement * report.{html|csv|xlsx} - Formatted error report that shows each failure that was detected and maps it to a the file name, requirement(s) violated if known, possible resolution steps, error message, and raw output from pytest. The report can be written in html, csv, or Excel format Other minor changes: * Replaced smart quotes with regular quotes to resolve issues with py3 * Updated copyright of modified files * Fixed Flake-8 errors Issue-ID: VVP-81 Change-Id: I659836cb20262ae44652c03639281a817dd051ae Signed-off-by: Lovett, Trevor (tl2972) <tl2972@att.com>
Diffstat (limited to 'ice_validator/tests/conftest.py')
-rw-r--r--ice_validator/tests/conftest.py539
1 files changed, 494 insertions, 45 deletions
diff --git a/ice_validator/tests/conftest.py b/ice_validator/tests/conftest.py
index 84429cf..09baa9a 100644
--- a/ice_validator/tests/conftest.py
+++ b/ice_validator/tests/conftest.py
@@ -2,11 +2,11 @@
# ============LICENSE_START=======================================================
# org.onap.vvp/validation-scripts
# ===================================================================
-# Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+# Copyright © 2018 AT&T Intellectual Property. All rights reserved.
# ===================================================================
#
# Unless otherwise specified, all software contained herein is licensed
-# under the Apache License, Version 2.0 (the “License”);
+# under the Apache License, Version 2.0 (the "License");
# you may not use this software except in compliance with the License.
# You may obtain a copy of the License at
#
@@ -21,7 +21,7 @@
#
#
# Unless otherwise specified, all documentation contained herein is licensed
-# under the Creative Commons License, Attribution 4.0 Intl. (the “License”);
+# under the Creative Commons License, Attribution 4.0 Intl. (the "License");
# you may not use this documentation except in compliance with the License.
# You may obtain a copy of the License at
#
@@ -34,29 +34,377 @@
# limitations under the License.
#
# ============LICENSE_END============================================
-#
-# ECOMP is a trademark and service mark of AT&T Intellectual Property.
-#
+import collections
+import csv
+import datetime
+import hashlib
+import io
+import json
import os
+import sys
+import time
+import docutils.core
+import pytest
+from more_itertools import partition
+from six import string_types
+import xlsxwriter
__path__ = [os.path.dirname(os.path.abspath(__file__))]
+resolution_steps_file = "resolution_steps.json"
+requirements_file = "requirements.json"
+
+FAILURE_DATA = {}
+
+report_columns = [
+ ("Input File", "file"),
+ ("Test", "test_file"),
+ ("Requirements", "req_description"),
+ ("Resolution Steps", "resolution_steps"),
+ ("Error Message", "message"),
+ ("Raw Test Output", "raw_output"),
+]
+report = collections.OrderedDict(report_columns)
+
+
+def extract_error_msg(rep):
+ msg = str(rep.longrepr.reprcrash)
+ if "AssertionError:" in msg:
+ return msg.split("AssertionError:")[1]
+ else:
+ return msg
+
+
+@pytest.hookimpl(tryfirst=True, hookwrapper=True)
+def pytest_runtest_makereport(item, call):
+
+ outcome = yield
+ rep = outcome.get_result()
+
+ output_dir = "{}/../output".format(__path__[0])
+ if rep.outcome == "failed":
+ if not os.path.exists(output_dir):
+ os.mkdir(output_dir)
+
+ if hasattr(item.function, "requirement_ids"):
+ requirement_ids = item.function.requirement_ids
+ else:
+ requirement_ids = ""
+
+ if "environment_pair" in item.fixturenames:
+ resolved_pair = "{} environment pair".format(
+ item.funcargs["environment_pair"]["name"]
+ )
+ elif "heat_volume_pair" in item.fixturenames:
+ resolved_pair = "{} volume pair".format(
+ item.funcargs["heat_volume_pair"]["name"]
+ )
+ elif "heat_templates" in item.fixturenames:
+ resolved_pair = item.funcargs["heat_templates"]
+ elif "yaml_files" in item.fixturenames:
+ resolved_pair = item.funcargs["yaml_files"]
+ else:
+ resolved_pair = rep.nodeid.split("[")[1][:-1]
+
+ FAILURE_DATA[len(FAILURE_DATA)] = {
+ "file": resolved_pair,
+ "vnfrqts": requirement_ids,
+ "test": item.function.__name__,
+ "test_file": item.function.__module__.split(".")[-1],
+ "raw_output": str(rep.longrepr),
+ "message": extract_error_msg(rep),
+ }
+
+ with open("{}/failures".format(output_dir), "w") as f:
+ json.dump(FAILURE_DATA, f, indent=4)
+
+
+def make_timestamp():
+ timezone = time.tzname[time.localtime().tm_isdst]
+ return "{} {}".format(str(datetime.datetime.now()), timezone)
+
+
+def pytest_sessionfinish(session, exitstatus):
+ if not session.config.option.template_dir:
+ return
+ template_path = os.path.abspath(session.config.option.template_dir[0])
+ profile_name = session.config.option.validation_profile_name
+ generate_report(
+ "{}/../output".format(__path__[0]),
+ template_path,
+ profile_name,
+ session.config.option.report_format,
+ )
+
+
+def pytest_runtest_setup(item):
+ profile = item.session.config.option.validation_profile
+ markers = set(m.name for m in item.iter_markers())
+ if not profile and markers:
+ pytest.skip("No validation profile selected. Skipping tests with marks.")
+ if profile and markers and profile not in markers:
+ pytest.skip("Doesn't match selection validation profile")
+
+
+def make_href(path):
+ paths = [path] if isinstance(path, string_types) else path
+ links = []
+ for p in paths:
+ abs_path = os.path.abspath(p)
+ filename = os.path.split(abs_path)[1]
+ links.append(
+ "<a href='file://{abs_path}' target='_blank'>{filename}</a>".format(
+ abs_path=abs_path, filename=filename
+ )
+ )
+ return "<br/>".join(links)
+
+
+def generate_report(outpath, template_path, profile_name, output_format):
+ failures = "{}/failures".format(outpath)
+ faildata = None
+ rdata = None
+ hdata = None
+
+ if os.path.exists(failures):
+ with open(failures, "r") as f:
+ faildata = json.loads(f.read())
+ else:
+ faildata = {}
+
+ resolution_steps = "{}/../{}".format(__path__[0], resolution_steps_file)
+ if os.path.exists(resolution_steps):
+ with open(resolution_steps, "r") as f:
+ rdata = json.loads(f.read())
+
+ heat_requirements = "{}/../{}".format(__path__[0], requirements_file)
+ if os.path.exists(heat_requirements):
+ with open(heat_requirements, "r") as f:
+ hdata = json.loads(f.read())
+
+ # point requirements at the most recent version
+ current_version = hdata["current_version"]
+ hdata = hdata["versions"][current_version]["needs"]
+ # mapping requirement IDs from failures to requirement descriptions
+ for k, v in faildata.items():
+ req_text = ""
+ if v["vnfrqts"] != "":
+ for req in v["vnfrqts"]:
+ if req in hdata:
+ req_text += "\n\n{}: \n{}".format(req, hdata[req]["description"])
+ faildata[k]["req_description"] = req_text
+
+ # mapping resolution steps to module and test name
+ for k, v in faildata.items():
+ faildata[k]["resolution_steps"] = ""
+ for rs in rdata:
+ if v["test_file"] == rs["module"] and v["test"] == rs["function"]:
+ faildata[k]["resolution_steps"] = "\n{}: \n{}".format(
+ rs["header"], rs["resolution_steps"]
+ )
+ output_format = output_format.lower().strip() if output_format else "html"
+ if output_format == "html":
+ generate_html_report(outpath, profile_name, template_path, faildata)
+ elif output_format == "excel":
+ generate_excel_report(outpath, profile_name, template_path, faildata)
+ elif output_format == "csv":
+ generate_csv_report(outpath, profile_name, template_path, faildata)
+ else:
+ raise ValueError("Unsupported output format: " + output_format)
+
+
+def generate_csv_report(output_dir, profile_name, template_path, faildata):
+ rows = []
+ rows.append(["Validation Failures"])
+ headers = [
+ ("Profile Selected:", profile_name),
+ ("Report Generated At:", make_timestamp()),
+ ("Directory Validated:", template_path),
+ ("Checksum:", hash_directory(template_path)),
+ ("Total Errors:", len(faildata)),
+ ]
+
+ rows.append([])
+ for header in headers:
+ rows.append(header)
+ rows.append([])
+
+ # table header
+ rows.append([col for col, _ in report_columns])
+
+ # table content
+ for data in faildata.values():
+ rows.append(
+ [
+ data.get("file", ""),
+ data.get("test_file", ""),
+ data.get("req_description", ""),
+ data.get("resolution_steps", ""),
+ data.get("message", ""),
+ data.get("raw_output", ""),
+ ]
+ )
+
+ output_path = os.path.join(output_dir, "report.csv")
+ with open(output_path, "w", newline="") as f:
+ writer = csv.writer(f)
+ for row in rows:
+ writer.writerow(row)
+
+
+def generate_excel_report(output_dir, profile_name, template_path, faildata):
+ output_path = os.path.join(output_dir, "report.xlsx")
+ workbook = xlsxwriter.Workbook(output_path)
+ bold = workbook.add_format({"bold": True})
+ code = workbook.add_format(({"font_name": "Courier", "text_wrap": True}))
+ normal = workbook.add_format({"text_wrap": True})
+ heading = workbook.add_format({"bold": True, "font_size": 18})
+ worksheet = workbook.add_worksheet("failures")
+ worksheet.write(0, 0, "Validation Failures", heading)
+
+ headers = [
+ ("Profile Selected:", profile_name),
+ ("Report Generated At:", make_timestamp()),
+ ("Directory Validated:", template_path),
+ ("Checksum:", hash_directory(template_path)),
+ ("Total Errors:", len(faildata)),
+ ]
+ for row, (header, value) in enumerate(headers, start=2):
+ worksheet.write(row, 0, header, bold)
+ worksheet.write(row, 1, value)
+
+ worksheet.set_column(0, len(headers) - 1, 40)
+ worksheet.set_column(len(headers), len(headers), 80)
+
+ # table header
+ start_error_table_row = 2 + len(headers) + 2
+ for col_num, (col_name, _) in enumerate(report_columns):
+ worksheet.write(start_error_table_row, col_num, col_name, bold)
+
+ # table content
+ for row, data in enumerate(faildata.values(), start=start_error_table_row + 1):
+ for col, key in enumerate(report.values()):
+ if key == "file":
+ paths = (
+ [data[key]] if isinstance(data[key], string_types) else data[key]
+ )
+ contents = "\n".join(paths)
+ worksheet.write(row, col, contents, normal)
+ elif key == "raw_output":
+ worksheet.write_string(row, col, data[key], code)
+ else:
+ worksheet.write(row, col, data[key], normal)
+
+ workbook.close()
+
+
+def generate_html_report(outpath, profile_name, template_path, faildata):
+ with open("{}/report.html".format(outpath), "w") as of:
+ body_begin = """
+ <style type="text/css">
+ h1, li {{
+ font-family:Arial, sans-serif;
+ }}
+ .tg {{border-collapse:collapse;border-spacing:0;}}
+ .tg td{{font-family:Arial, sans-serif;font-size:8px;padding:10px 5px;
+ border-style:solid;border-width:1px;overflow:hidden;word-break:normal;
+ border-color:black;}}
+ .tg th{{font-family:Arial, sans-serif;font-size:14px;font-weight:normal;
+ padding:10px 5px;border-style:solid;border-width:1px;overflow:hidden;
+ word-break:normal;border-color:black;}}
+ .tg .tg-rwj1{{font-size:10px;font-family:Arial, Helvetica,
+ sans-serif !important;;border-color:inherit;vertical-align:top}}</style>
+ <h1>Validation Failures</h1>
+ <ul>
+ <li><b>Profile Selected: </b> <tt>{profile}</tt></li>
+ <li><b>Report Generated At:</b> <tt>{timestamp}</tt></li>
+ <li><b>Directory Validated:</b> <tt>{template_dir}</tt></li>
+ <li><b>Checksum:</b> <tt>{checksum}</tt></li>
+ <li><b>Total Errors:</b> {num_failures}</li>
+ </ul>
+ """.format(
+ profile=profile_name,
+ timestamp=make_timestamp(),
+ checksum=hash_directory(template_path),
+ template_dir=template_path,
+ num_failures=len(faildata),
+ )
+ of.write(body_begin)
+
+ if len(faildata) == 0:
+ of.write("<p>Success! No validation failures detected.</p>")
+ return
+
+ table_begin = '<table class="tg">'
+ of.write(table_begin)
+
+ # table headers
+ of.write("<tr>")
+ for k, v in report.items():
+ of.write('<th class="tg-rwj1">{}</th>'.format(k))
+ of.write("</tr>")
+
+ # table content
+ for k, v in faildata.items():
+ of.write("<tr>")
+ for rk, rv in report.items():
+ if rv == "file":
+ value = make_href(v[rv])
+ elif rv == "raw_output":
+ value = "<pre>{}</pre>".format(v[rv])
+ elif rv == "req_description":
+ parts = docutils.core.publish_parts(
+ writer_name="html", source=v[rv]
+ )
+ value = parts["body"]
+ else:
+ value = v[rv].replace("\n", "<br />")
+ of.write(" <td>{}</td>".format(value))
+ of.write("</tr>")
+
+ of.write("</table>")
+
def pytest_addoption(parser):
"""
Add needed CLI arguments
"""
- parser.addoption("--template-directory",
- dest="template_dir",
- action="append",
- help="Directory which holds the templates for validation")
+ parser.addoption(
+ "--template-directory",
+ dest="template_dir",
+ action="append",
+ help="Directory which holds the templates for validation",
+ )
+
+ parser.addoption(
+ "--self-test",
+ dest="self_test",
+ action="store_true",
+ help="Test the unit tests against their fixtured data",
+ )
+
+ parser.addoption(
+ "--validation-profile",
+ dest="validation_profile",
+ action="store",
+ help="Runs all unmarked tests plus test with a matching marker",
+ )
- parser.addoption("--self-test",
- dest="self_test",
- action='store_true',
- help="Test the unit tests against their fixtured data")
+ parser.addoption(
+ "--validation-profile-name",
+ dest="validation_profile_name",
+ action="store",
+ help="Friendly name of the validation profile used in reports",
+ )
+
+ parser.addoption(
+ "--report-format",
+ dest="report_format",
+ action="store",
+ help="Format of output report (html, csv, excel)",
+ )
def pytest_configure(config):
@@ -64,12 +412,14 @@ def pytest_configure(config):
Ensure that we are receive either `--self-test` or
`--template-dir=<directory` as CLI arguments
"""
- if config.getoption('template_dir') and config.getoption('self_test'):
- raise Exception(('"--template-dir", and "--self-test"'
- ' are mutually exclusive'))
- if not (config.getoption('template_dir') or config.getoption('self_test')):
- raise Exception(('One of "--template-dir" or'
- ' "--self-test" must be specified'))
+ if config.getoption("template_dir") and config.getoption("self_test"):
+ raise Exception('"--template-dir", and "--self-test"' " are mutually exclusive")
+ if not (
+ config.getoption("template_dir") or
+ config.getoption("self_test") or
+ config.getoption("help")
+ ):
+ raise Exception('One of "--template-dir" or' ' "--self-test" must be specified')
def pytest_generate_tests(metafunc):
@@ -80,81 +430,180 @@ def pytest_generate_tests(metafunc):
is not specified on the CLI, the fixtures associated with this
test name.
"""
- if 'filename' in metafunc.fixturenames:
+ if "filename" in metafunc.fixturenames:
from .parametrizers import parametrize_filename
+
parametrize_filename(metafunc)
- if 'filenames' in metafunc.fixturenames:
+ if "filenames" in metafunc.fixturenames:
from .parametrizers import parametrize_filenames
+
parametrize_filenames(metafunc)
- if 'template_dir' in metafunc.fixturenames:
+ if "template_dir" in metafunc.fixturenames:
from .parametrizers import parametrize_template_dir
+
parametrize_template_dir(metafunc)
- if 'environment_pair' in metafunc.fixturenames:
+ if "environment_pair" in metafunc.fixturenames:
from .parametrizers import parametrize_environment_pair
+
parametrize_environment_pair(metafunc)
- if 'heat_volume_pair' in metafunc.fixturenames:
+ if "heat_volume_pair" in metafunc.fixturenames:
from .parametrizers import parametrize_heat_volume_pair
+
parametrize_heat_volume_pair(metafunc)
- if 'yaml_files' in metafunc.fixturenames:
+ if "yaml_files" in metafunc.fixturenames:
from .parametrizers import parametrize_yaml_files
+
parametrize_yaml_files(metafunc)
- if 'env_files' in metafunc.fixturenames:
+ if "env_files" in metafunc.fixturenames:
from .parametrizers import parametrize_environment_files
+
parametrize_environment_files(metafunc)
- if 'yaml_file' in metafunc.fixturenames:
+ if "yaml_file" in metafunc.fixturenames:
from .parametrizers import parametrize_yaml_file
+
parametrize_yaml_file(metafunc)
- if 'env_file' in metafunc.fixturenames:
+ if "env_file" in metafunc.fixturenames:
from .parametrizers import parametrize_environment_file
+
parametrize_environment_file(metafunc)
- if 'parsed_yaml_file' in metafunc.fixturenames:
+ if "parsed_yaml_file" in metafunc.fixturenames:
from .parametrizers import parametrize_parsed_yaml_file
+
parametrize_parsed_yaml_file(metafunc)
- if 'parsed_environment_file' in metafunc.fixturenames:
+ if "parsed_environment_file" in metafunc.fixturenames:
from .parametrizers import parametrize_parsed_environment_file
+
parametrize_parsed_environment_file(metafunc)
- if 'heat_template' in metafunc.fixturenames:
+ if "heat_template" in metafunc.fixturenames:
from .parametrizers import parametrize_heat_template
+
parametrize_heat_template(metafunc)
- if 'heat_templates' in metafunc.fixturenames:
+ if "heat_templates" in metafunc.fixturenames:
from .parametrizers import parametrize_heat_templates
+
parametrize_heat_templates(metafunc)
- if 'volume_template' in metafunc.fixturenames:
+ if "volume_template" in metafunc.fixturenames:
from .parametrizers import parametrize_volume_template
+
parametrize_volume_template(metafunc)
- if 'volume_templates' in metafunc.fixturenames:
+ if "volume_templates" in metafunc.fixturenames:
from .parametrizers import parametrize_volume_templates
+
parametrize_volume_templates(metafunc)
- if 'template' in metafunc.fixturenames:
+ if "template" in metafunc.fixturenames:
from .parametrizers import parametrize_template
+
parametrize_template(metafunc)
- if 'templates' in metafunc.fixturenames:
+ if "templates" in metafunc.fixturenames:
from .parametrizers import parametrize_templates
+
parametrize_templates(metafunc)
+def hash_directory(path):
+ md5 = hashlib.md5()
+ for dir_path, sub_dirs, filenames in os.walk(path):
+ for filename in filenames:
+ file_path = os.path.join(dir_path, filename)
+ with open(file_path, "rb") as f:
+ md5.update(f.read())
+ return md5.hexdigest()
+
+
+def load_current_requirements():
+ """Loads dict of current requirements or empty dict if file doesn't exist"""
+ path = "requirements.json"
+ if not os.path.exists(path):
+ return {}
+ with io.open(path, encoding="utf8", mode="r") as f:
+ data = json.load(f)
+ version = data["current_version"]
+ return data["versions"][version]["needs"]
+
+
+def compat_open(path):
+ """Invokes open correctly depending on the Python version"""
+ if sys.version_info.major < 3:
+ return open(path, "wb")
+ else:
+ return open(path, "w", newline="")
+
+
+def unicode_writerow(writer, row):
+ if sys.version_info.major < 3:
+ row = [s.encode("utf8") for s in row]
+ writer.writerow(row)
+
+
def pytest_report_collectionfinish(config, startdir, items):
- """Generates a simple traceability report to sysout"""
- print("Traceability Report")
- print("==================================================================")
- for item in items:
- if hasattr(item.function, "requirement_ids"):
- for req_id in item.function.requirement_ids:
- print(req_id + "," + item.function.__module__ + ","
- + item.function.__name__)
+ """Generates a simple traceability report to output/traceability.csv"""
+ traceability_path = os.path.join(__path__[0], "../output/traceability.csv")
+ output_dir = os.path.split(traceability_path)[0]
+ if not os.path.exists(output_dir):
+ os.makedirs(output_dir)
+ requirements = load_current_requirements()
+ unmapped, mapped = partition(
+ lambda item: hasattr(item.function, "requirement_ids"), items
+ )
+
+ req_to_test = collections.defaultdict(set)
+ mapping_errors = set()
+ for item in mapped:
+ for req_id in item.function.requirement_ids:
+ req_to_test[req_id].add(item)
+ if req_id not in requirements:
+ mapping_errors.add(
+ (req_id, item.function.__module__, item.function.__name__)
+ )
+
+ mapping_error_path = os.path.join(__path__[0], "../output/mapping_errors.csv")
+ with compat_open(mapping_error_path) as f:
+ writer = csv.writer(f)
+ for error in mapping_errors:
+ unicode_writerow(writer, error)
+
+ with compat_open(traceability_path) as f:
+ out = csv.writer(f)
+ unicode_writerow(
+ out,
+ ("Requirement ID", "Requirement", "Section", "Test Module", "Test Name"),
+ )
+ for req_id, metadata in requirements.items():
+ if req_to_test[req_id]:
+ for item in req_to_test[req_id]:
+ unicode_writerow(
+ out,
+ (
+ req_id,
+ metadata["description"],
+ metadata["section_name"],
+ item.function.__module__,
+ item.function.__name__,
+ ),
+ )
+ else:
+ unicode_writerow(
+ out,
+ (req_id, metadata["description"], metadata["section_name"], "", ""),
+ )
+ # now write out any test methods that weren't mapped to requirements
+ for item in unmapped:
+ unicode_writerow(
+ out, ("", "", "", item.function.__module__, item.function.__name__)
+ )