# -*- coding: utf8 -*-
# ============LICENSE_START=======================================================
# org.onap.vvp/validation-scripts
# ===================================================================
# Copyright © 2018 AT&T Intellectual Property. All rights reserved.
# ===================================================================
#
# Unless otherwise specified, all software contained herein is licensed
# under the Apache License, Version 2.0 (the "License");
# you may not use this software except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
#
#
# Unless otherwise specified, all documentation contained herein is licensed
# under the Creative Commons License, Attribution 4.0 Intl. (the "License");
# you may not use this documentation except in compliance with the License.
# You may obtain a copy of the License at
#
# https://creativecommons.org/licenses/by/4.0/
#
# Unless required by applicable law or agreed to in writing, documentation
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# ============LICENSE_END============================================
import collections
import csv
import datetime
import hashlib
import io
import json
import os
import sys
import time
import docutils.core
import pytest
from more_itertools import partition
from six import string_types
import xlsxwriter
__path__ = [os.path.dirname(os.path.abspath(__file__))]
resolution_steps_file = "resolution_steps.json"
requirements_file = "requirements.json"
FAILURE_DATA = {}
report_columns = [
("Input File", "file"),
("Test", "test_file"),
("Requirements", "req_description"),
("Resolution Steps", "resolution_steps"),
("Error Message", "message"),
("Raw Test Output", "raw_output"),
]
report = collections.OrderedDict(report_columns)
def extract_error_msg(rep):
try:
msg = str(rep.longrepr.reprcrash)
except AttributeError:
msg = str(rep)
if "AssertionError:" in msg:
return msg.split("AssertionError:")[1]
else:
return msg
@pytest.hookimpl(tryfirst=True, hookwrapper=True)
def pytest_runtest_makereport(item, call):
outcome = yield
rep = outcome.get_result()
output_dir = "{}/../output".format(__path__[0])
if rep.outcome == "failed":
if not os.path.exists(output_dir):
os.mkdir(output_dir)
if hasattr(item.function, "requirement_ids"):
requirement_ids = item.function.requirement_ids
else:
requirement_ids = ""
if "environment_pair" in item.fixturenames:
resolved_pair = "{} environment pair".format(
item.funcargs["environment_pair"]["name"]
)
elif "heat_volume_pair" in item.fixturenames:
resolved_pair = "{} volume pair".format(
item.funcargs["heat_volume_pair"]["name"]
)
elif "heat_templates" in item.fixturenames:
resolved_pair = item.funcargs["heat_templates"]
elif "yaml_files" in item.fixturenames:
resolved_pair = item.funcargs["yaml_files"]
else:
resolved_pair = rep.nodeid.split("[")[1][:-1]
FAILURE_DATA[len(FAILURE_DATA)] = {
"file": resolved_pair,
"vnfrqts": requirement_ids,
"test": item.function.__name__,
"test_file": item.function.__module__.split(".")[-1],
"raw_output": str(rep.longrepr),
"message": extract_error_msg(rep),
}
with open("{}/failures".format(output_dir), "w") as f:
json.dump(FAILURE_DATA, f, indent=4)
def make_timestamp():
timezone = time.tzname[time.localtime().tm_isdst]
return "{} {}".format(str(datetime.datetime.now()), timezone)
def pytest_sessionfinish(session, exitstatus):
if not session.config.option.template_dir:
return
template_path = os.path.abspath(session.config.option.template_dir[0])
profile_name = session.config.option.validation_profile_name
generate_report(
"{}/../output".format(__path__[0]),
template_path,
profile_name,
session.config.option.report_format,
)
def pytest_runtest_setup(item):
profile = item.session.config.option.validation_profile
markers = set(m.name for m in item.iter_markers())
if not profile and markers and "xfail" not in markers:
pytest.skip("No validation profile selected. Skipping tests with marks.")
if profile and markers and profile not in markers and "xfail" not in markers:
pytest.skip("Doesn't match selection validation profile")
def make_href(path):
paths = [path] if isinstance(path, string_types) else path
links = []
for p in paths:
abs_path = os.path.abspath(p)
filename = os.path.split(abs_path)[1]
links.append(
"{filename}".format(
abs_path=abs_path, filename=filename
)
)
return "
".join(links)
def generate_report(outpath, template_path, profile_name, output_format):
failures = "{}/failures".format(outpath)
faildata = None
rdata = None
hdata = None
if os.path.exists(failures):
with open(failures, "r") as f:
faildata = json.loads(f.read())
else:
faildata = {}
resolution_steps = "{}/../{}".format(__path__[0], resolution_steps_file)
if os.path.exists(resolution_steps):
with open(resolution_steps, "r") as f:
rdata = json.loads(f.read())
heat_requirements = "{}/../{}".format(__path__[0], requirements_file)
if os.path.exists(heat_requirements):
with open(heat_requirements, "r") as f:
hdata = json.loads(f.read())
# point requirements at the most recent version
current_version = hdata["current_version"]
hdata = hdata["versions"][current_version]["needs"]
# mapping requirement IDs from failures to requirement descriptions
for k, v in faildata.items():
req_text = ""
if v["vnfrqts"] != "":
for req in v["vnfrqts"]:
if req in hdata:
req_text += "\n\n{}: \n{}".format(req, hdata[req]["description"])
faildata[k]["req_description"] = req_text
# mapping resolution steps to module and test name
for k, v in faildata.items():
faildata[k]["resolution_steps"] = ""
for rs in rdata:
if v["test_file"] == rs["module"] and v["test"] == rs["function"]:
faildata[k]["resolution_steps"] = "\n{}: \n{}".format(
rs["header"], rs["resolution_steps"]
)
output_format = output_format.lower().strip() if output_format else "html"
if output_format == "html":
generate_html_report(outpath, profile_name, template_path, faildata)
elif output_format == "excel":
generate_excel_report(outpath, profile_name, template_path, faildata)
elif output_format == "csv":
generate_csv_report(outpath, profile_name, template_path, faildata)
else:
raise ValueError("Unsupported output format: " + output_format)
def generate_csv_report(output_dir, profile_name, template_path, faildata):
rows = []
rows.append(["Validation Failures"])
headers = [
("Profile Selected:", profile_name),
("Report Generated At:", make_timestamp()),
("Directory Validated:", template_path),
("Checksum:", hash_directory(template_path)),
("Total Errors:", len(faildata)),
]
rows.append([])
for header in headers:
rows.append(header)
rows.append([])
# table header
rows.append([col for col, _ in report_columns])
# table content
for data in faildata.values():
rows.append(
[
data.get("file", ""),
data.get("test_file", ""),
data.get("req_description", ""),
data.get("resolution_steps", ""),
data.get("message", ""),
data.get("raw_output", ""),
]
)
output_path = os.path.join(output_dir, "report.csv")
with open(output_path, "w", newline="") as f:
writer = csv.writer(f)
for row in rows:
writer.writerow(row)
def generate_excel_report(output_dir, profile_name, template_path, faildata):
output_path = os.path.join(output_dir, "report.xlsx")
workbook = xlsxwriter.Workbook(output_path)
bold = workbook.add_format({"bold": True})
code = workbook.add_format(({"font_name": "Courier", "text_wrap": True}))
normal = workbook.add_format({"text_wrap": True})
heading = workbook.add_format({"bold": True, "font_size": 18})
worksheet = workbook.add_worksheet("failures")
worksheet.write(0, 0, "Validation Failures", heading)
headers = [
("Profile Selected:", profile_name),
("Report Generated At:", make_timestamp()),
("Directory Validated:", template_path),
("Checksum:", hash_directory(template_path)),
("Total Errors:", len(faildata)),
]
for row, (header, value) in enumerate(headers, start=2):
worksheet.write(row, 0, header, bold)
worksheet.write(row, 1, value)
worksheet.set_column(0, len(headers) - 1, 40)
worksheet.set_column(len(headers), len(headers), 80)
# table header
start_error_table_row = 2 + len(headers) + 2
for col_num, (col_name, _) in enumerate(report_columns):
worksheet.write(start_error_table_row, col_num, col_name, bold)
# table content
for row, data in enumerate(faildata.values(), start=start_error_table_row + 1):
for col, key in enumerate(report.values()):
if key == "file":
paths = (
[data[key]] if isinstance(data[key], string_types) else data[key]
)
contents = "\n".join(paths)
worksheet.write(row, col, contents, normal)
elif key == "raw_output":
worksheet.write_string(row, col, data[key], code)
else:
worksheet.write(row, col, data[key], normal)
workbook.close()
def generate_html_report(outpath, profile_name, template_path, faildata):
with open("{}/report.html".format(outpath), "w") as of:
body_begin = """
Success! No validation failures detected.
") return table_begin = '{} | '.format(k)) of.write("
---|
{} | ".format(value)) of.write("