diff options
Diffstat (limited to 'check.py')
-rw-r--r-- | check.py | 511 |
1 files changed, 511 insertions, 0 deletions
diff --git a/check.py b/check.py new file mode 100644 index 0000000..eac897f --- /dev/null +++ b/check.py @@ -0,0 +1,511 @@ +# -*- coding: utf8 -*- +# org.onap.vnfrqts/requirements +# ============LICENSE_START==================================================== +# Copyright © 2018 AT&T Intellectual Property. All rights reserved. +# +# Unless otherwise specified, all software contained herein is licensed +# under the Apache License, Version 2.0 (the "License"); +# you may not use this software except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# Unless otherwise specified, all documentation contained herein is licensed +# under the Creative Commons License, Attribution 4.0 Intl. (the "License"); +# you may not use this documentation except in compliance with the License. +# You may obtain a copy of the License at +# +# https://creativecommons.org/licenses/by/4.0/ +# +# Unless required by applicable law or agreed to in writing, documentation +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# ============LICENSE_END============================================ + +""" +This script should be run before every commit to ensure proper +standards are being followed within the project. The script +will also automatically fix certain issues when they are encountered, and +warn about other issues it cannot automatically resolve. + +Warnings: +- Requirement missing required attributes +- Invalid values for attributes +- Invalid section header usage in any file +- :keyword: and requirement mismatch + +Auto Updates: +- Assigning :id: on new requirements where an ID missing +- Adding :introduced: attribute on new requirements +- Adding/correcting :updated: attribute on changed requirements +""" + +import os +import random +import re +import sys +from abc import ABC, abstractmethod +from collections import OrderedDict, deque +from pathlib import Path +from typing import Deque, List, Mapping, Callable, Set + +import requests + +THIS_DIR = Path(__file__).parent +CONF_PATH = THIS_DIR / "docs/conf.py" + +NEEDS_JSON_URL = ( + "https://nexus.onap.org/service/local/repositories/raw/content" + "/org.onap.vnfrqts.requirements/master/needs.json" +) + +HEADING_LEVELS = ("-", "^", "~", "+", "*", '"') + +SPACES = re.compile(r"\s+") +REQ_DIRECTIVE_PATTERN = re.compile(r"\.\.\s+req::.*") +ATTRIBUTE_PATTERN = re.compile(r"^\s+(:\w+:)\s+(.*)$") +VERSION_PATTERN = re.compile(r"version\s+=\s+'(.*?)'") + +VALID_KEYWORDS = ("MUST", "MUST NOT", "SHOULD", "SHOULD NOT", "MAY", "MAY NOT") +VALID_VERSIONS = ( + "amsterdam", + "beijing", + "casablanca", + "dublin", + "el alto", + "frankfurt", + "guilin", +) +REQUIRED_ATTRIBUTES = (":keyword:", ":target:", ":id:") +VALID_TARGETS = ( + "VNF", + "PNF", + "VNF or PNF", + "VNF DOCUMENTATION PACKAGE", + "PNF DOCUMENTATION PACKAGE", + "VNF or PNF DOCUMENTATION PACKAGE", + "VNF PROVIDER", + "PNF PROVIDER", + "VNF or PNF PROVIDER", + "VNF CSAR PACKAGE", + "PNF CSAR PACKAGE", + "VNF or PNF CSAR PACKAGE", + "VNF HEAT PACKAGE", +) +VALID_VALIDATION_MODES = ("static", "none", "in_service") + + +def check(predicate: bool, msg: str): + """ + Raises RuntimeError with given msg if predicate is False + """ + if not predicate: + raise RuntimeError(msg) + + +def get_version() -> str: + """ + Returns the version value from conf.py + """ + with open(CONF_PATH) as f: + for line in f: + m = VERSION_PATTERN.match(line) + if m: + version = m.groups()[0] + if version not in VALID_VERSIONS: + print( + f"ERROR: {version} in conf.py is not defined in " + f"VALID_VERSIONS. Update the script to continue" + ) + sys.exit(1) + return version + + +VERSION = get_version() + + +def normalize(text: str): + """ + Strips out formatting, line breaks, and repeated spaces to normalize + the string for comparison. This ensures minor formatting changes + are not tagged as meaningful changes + """ + s = text.lower() + s = s.replace("\n", " ") + s = re.sub(r'[`*\'"]', "", s) + s = re.sub(r"\s+", " ", s) + return s + + +def warn(path: str, msg: str, req: "RequirementDirective" = None): + """ + Log a warning + """ + req_id = req.requirement_id or "UNKNOWN" if req else "UNKNOWN" + print(f"WARNING: {path} | {req_id} | {msg}") + + +class RequirementRepository: + """ + Pulls needs.json and provides various options to interact with the data. + """ + + def __init__(self, data=None): + self.data = data or requests.get(NEEDS_JSON_URL).json() + self.all_ids = { + r["id"] + for version in self.data["versions"].values() + for r in version["needs"].values() + } + + @property + def current_requirements(self) -> Mapping: + """ + Returns the requirements specified by current_version in needs.json. + """ + version = self.data["current_version"] + return self.data["versions"][version]["needs"] + + @property + def unique_targets(self) -> Set[str]: + return {r["target"] for r in self.current_requirements.values()} + + @property + def unique_validation_modes(self) -> Set[str]: + return {r["validation_mode"] for r in self.current_requirements.values()} + + def create_id(self) -> str: + """ + Generates a requirements ID that has not been used in any version + of the requirements. + """ + while True: + new_id = "R-{:0>5d}".format(random.randint(0, 99999)) + if new_id in self.all_ids: + continue # skip this one and generate another one + self.all_ids.add(new_id) + return new_id + + def is_new_requirement(self, req: "RequirementDirective") -> bool: + return req.requirement_id not in self.current_requirements + + def has_changed(self, req: "RequirementDirective") -> bool: + """ + Returns True if the requirement already exists and the contents has + meaningfully changed. Small changes in spaces or formatting are not considered. + """ + current_req = self.current_requirements.get(req.requirement_id) + if not current_req: + return False + return normalize(current_req["description"]) == normalize("".join(req.content)) + + +class RequirementDirective: + """ + Data structure to hold a .. req:: directive + """ + + ATTRIBUTE_ORDER = ( + ":id:", + ":target:", + ":keyword:", + ":introduced:", + ":updated:", + ":validation_mode:", + ":impacts:", + ) + + def __init__(self, path: str): + self.path = path + self.attributes = OrderedDict() + self.content = [] + self.indent = None + + @property + def requirement_id(self) -> str: + return self.attributes.get(":id:", "") + + @requirement_id.setter + def requirement_id(self, r_id: str): + self._update(":id:", r_id) + + @property + def keyword(self) -> str: + return self.attributes.get(":keyword:", "") + + @keyword.setter + def keyword(self, k: str): + self._update(":keyword:", k) + + @property + def target(self) -> str: + return self.attributes.get(":target:", "") + + @target.setter + def target(self, value: str): + self._update(":target", value) + + @property + def introduced(self) -> str: + return self.attributes.get(":introduced:", "") + + @introduced.setter + def introduced(self, version: str): + self._update(":introduced:", version) + + @property + def updated(self) -> str: + return self.attributes.get(":updated:", "") + + @updated.setter + def updated(self, version: str): + self._update(":updated:", version) + + @property + def validation_mode(self) -> str: + return self.attributes.get(":validation_mode:", "") + + @validation_mode.setter + def validation_mode(self, value: str): + self._update(":validation_mode:", value) + + def parse(self, lines: Deque[str]): + """ + Parses a ..req:: directive and populates the data structre + """ + parsing_attrs = True + while lines: + line = lines.popleft() + match = ATTRIBUTE_PATTERN.match(line) if parsing_attrs else None + if match: + self.indent = self.indent or self.calc_indent(line) + attr, value = match.groups() + self.attributes[attr] = value + else: + parsing_attrs = False # passed attributes, capture content + if line.strip() and self.calc_indent(line) < self.indent: + # past end of the directive so we'll put this line back + lines.appendleft(line) + break + else: + self.content.append(line) + + def format_attributes(self) -> List[str]: + """ + Converts a dict back to properly indented lines using ATTRIBUTE_ORDER + """ + spaces = " " * self.indent + attr_lines = [] + for key in self.ATTRIBUTE_ORDER: + value = self.attributes.get(key) + if value: + attr_lines.append(f"{spaces}{key} {value}\n") + return attr_lines + + @staticmethod + def calc_indent(line: str) -> int: + """ + Number of leading spaces of the line + """ + return len(line) - len(line.lstrip()) + + def __str__(self): + return "".join(self.format_attributes() + self.content) + + def _notify(self, field, value): + req_id = self.requirement_id or "UNKNOWN" + print(f"UPDATE: {self.path} | {req_id} | Setting {field} to {value}") + + def _update(self, attr, value): + self.attributes[attr] = value + self._notify(attr, value) + + +class RequirementVisitor: + """ + Walks a directory for reStructuredText files and and passes contents to + visitors when the content is encountered. + + Types of visitors supported: + + - Requirement: Take the path and a RequirementDirective which may be modified + If modified, the file will be updated using the modified directive + - Post Processor: Take the path and all lines for processing; returning a + potentially changed set of lines + """ + + def __init__( + self, + req_visitors: List[Callable[[str, RequirementDirective], None]], + post_processors: List[Callable[[str, List[str]], List[str]]], + ): + self.req_visitors = req_visitors or [] + self.post_processors = post_processors or [] + + def process(self, root_dir: Path): + """ + Walks the `root_dir` looking for rst to files to parse + """ + for dir_path, sub_dirs, filenames in os.walk(root_dir.as_posix()): + for filename in filenames: + if filename.lower().endswith(".rst"): + self.handle_rst_file(os.path.join(dir_path, filename)) + + @staticmethod + def read(path): + """Read file at `path` and return lines as list""" + with open(path, "r") as f: + return list(f) + + @staticmethod + def write(path, content): + """Write a content to the given path""" + with open(path, "w") as f: + for line in content: + f.write(line) + + def handle_rst_file(self, path): + """ + Parse the RST file notifying the registered visitors + """ + lines = deque(self.read(path)) + new_contents = [] + while lines: + line = lines.popleft() + if self.is_req_directive(line): + req = RequirementDirective(path) + req.parse(lines) + for func in self.req_visitors: + func(path, req) + # Put the lines back for processing by the line visitor + lines.extendleft(reversed(req.format_attributes() + req.content)) + new_contents.append(line) + for processor in self.post_processors: + new_contents = processor(path, new_contents) or new_contents + self.write(path, new_contents) + + @staticmethod + def is_req_directive(line): + """Returns True if the line denotes the start of a needs directive""" + return bool(REQ_DIRECTIVE_PATTERN.match(line)) + + +class AbstractRequirementVisitor(ABC): + @abstractmethod + def __call__(self, path: str, req: RequirementDirective): + raise NotImplementedError() + + +class MetadataFixer(AbstractRequirementVisitor): + """ + Updates metadata based on the status of the requirement and contents of + the metadata + """ + + def __init__(self, repos: RequirementRepository): + self.repos = repos + + def __call__(self, path: str, req: RequirementDirective): + if not req.requirement_id: + req.requirement_id = self.repos.create_id() + if self.repos.is_new_requirement(req) and req.introduced != VERSION: + req.introduced = VERSION + if self.repos.has_changed(req) and req.updated != VERSION: + req.updated = VERSION + + +class MetadataValidator(AbstractRequirementVisitor): + def __init__(self, repos: RequirementRepository): + self.repos = repos + + def __call__(self, path: str, req: RequirementDirective): + for attr in REQUIRED_ATTRIBUTES: + if attr not in req.attributes: + warn(path, f"Missing required attribute {attr}", req) + if req.keyword and req.keyword not in VALID_KEYWORDS: + warn(path, f"Invalid :keyword: value ({req.keyword})", req) + if repository.is_new_requirement(req) and req.introduced != VERSION: + warn(path, f":introduced: is not {VERSION} on new requirement", req) + if req.introduced and req.introduced not in VALID_VERSIONS: + warn(path, f"Invalid :introduced: value ({req.introduced})", req) + if req.updated and req.updated not in VALID_VERSIONS: + warn(path, f"Invalid :updated: value ({req.updated})", req) + if req.target and req.target not in VALID_TARGETS: + warn(path, f"Invalid :target: value ({req.target})", req) + if req.validation_mode and req.validation_mode not in VALID_VALIDATION_MODES: + warn(path, f"Invalid :validation_mode: value ({req.validation_mode})", req) + + +def check_section_headers(path: str, lines: List[str]) -> List[str]: + """ + Ensure hierarchy of section headers follows the expected progression as defined + by `HEADING_LEVELS`, and that section heading marks match the length of the + section title. + """ + current_heading_level = 0 + for i, line in enumerate(lines): + if any(line.startswith(char * 3) for char in HEADING_LEVELS): + # heading level should go down, stay the same, or be next level + expected = HEADING_LEVELS[0 : current_heading_level + 2] + if line[0] not in expected: + warn( + path, + f"Unexpected heading char ({line[0]}) on line {i+1}. " + f"Expected one of {' '.join(expected)}", + ) + if len(line.strip()) != len(lines[i - 1].strip()): + lines[i] = (line[0] * len(lines[i - 1].strip())) + "\n" + print( + f"UPDATE: {path} | Matching section mark to title length " + f"on line {i+1}" + ) + current_heading_level = HEADING_LEVELS.index(line[0]) + return lines + + +def check_keyword_text_alignment(path: str, req: RequirementDirective): + if not req.keyword: + return req + keyword = f"**{req.keyword}**" + if not any(keyword in line for line in req.content): + warn(path, f"Keyword is {req.keyword}, but {keyword} not in requirement", req) + + +if __name__ == "__main__": + print("Valid Versions") + print("-----------------------") + print("\n".join(VALID_VERSIONS)) + print() + print("Valid Keywords") + print("-----------------------") + print("\n".join(VALID_KEYWORDS)) + print() + print("Valid Targets") + print("-----------------------") + print("\n".join(VALID_TARGETS)) + print() + print("Valid Validation Modes") + print("-----------------------") + print("\n".join(VALID_VALIDATION_MODES)) + print() + print("Check-up Report") + print("-" * 100) + repository = RequirementRepository() + visitor = RequirementVisitor( + req_visitors=[ + MetadataFixer(repository), + MetadataValidator(repository), + check_keyword_text_alignment, + ], + post_processors=[check_section_headers], + ) + visitor.process(THIS_DIR / "docs") |