diff options
Diffstat (limited to 'src/onapsdk/cds/data_dictionary.py')
-rw-r--r-- | src/onapsdk/cds/data_dictionary.py | 266 |
1 files changed, 266 insertions, 0 deletions
diff --git a/src/onapsdk/cds/data_dictionary.py b/src/onapsdk/cds/data_dictionary.py new file mode 100644 index 0000000..b4d8d0e --- /dev/null +++ b/src/onapsdk/cds/data_dictionary.py @@ -0,0 +1,266 @@ +"""CDS data dictionary module.""" +# Copyright 2022 Orange, Deutsche Telekom AG +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import json +from logging import getLogger, Logger + +from onapsdk.exceptions import FileError, ValidationError + +from .cds_element import CdsElement + + +class DataDictionary(CdsElement): + """Data dictionary class.""" + + logger: Logger = getLogger(__name__) + + def __init__(self, data_dictionary_json: dict, fix_schema: bool = True) -> None: + """Initialize data dictionary. + + Args: + data_dictionary_json (dict): data dictionary json + fix_schema (bool, optional): determines if data dictionary should be fixed if + the invalid schema is detected. Fixing can raise ValidationError if + dictionary is invalid. Defaults to True. + + """ + super().__init__() + self.data_dictionary_json: dict = data_dictionary_json + if not self.has_valid_schema() and fix_schema: + self.fix_schema() + + def __hash__(self) -> int: # noqa: D401 + """Data dictionary object hash. + + Based on data dictionary name + + Returns: + int: Data dictionary hash + + """ + return hash(self.name) + + def __eq__(self, dd: "DataDictionary") -> bool: + """Compare two data dictionaries. + + Data dictionaries are equal if have the same name. + + Args: + dd (DataDictionary): Object to compare with. + + Returns: + bool: True if objects have the same name, False otherwise. + + """ + return self.name == dd.name + + def __repr__(self) -> str: + """Representation of object. + + Returns: + str: Object's string representation + + """ + return f'DataDictionary[name: "{self.name}"]' + + @property + def name(self) -> str: # noqa: D401 + """Data dictionary name. + + Returns: + str: Data dictionary name + + """ + return self.data_dictionary_json["name"] + + @property + def url(self) -> str: + """URL to call. + + Returns: + str: CDS dictionary API url + + """ + return f"{self._url}/api/v1/dictionary" + + @classmethod + def get_by_name(cls, name: str) -> "DataDictionary": + """Get data dictionary by the provided name. + + Returns: + DataDictionary: Data dicionary object with the given name + + """ + cls.logger.debug("Get CDS data dictionary with %s name", name) + return DataDictionary( + data_dictionary_json=cls.send_message_json( + "GET", + f"Get {name} CDS data dictionary", + f"{cls._url}/api/v1/dictionary/{name}", + auth=cls.auth), + fix_schema=False + ) + + def upload(self) -> None: + """Upload data dictionary using CDS API.""" + self.logger.debug("Upload %s data dictionary", self.name) + self.send_message( + "POST", + "Publish CDS data dictionary", + f"{self.url}", + auth=self.auth, + data=json.dumps(self.data_dictionary_json) + ) + + def has_valid_schema(self) -> bool: + """Check data dictionary json schema. + + Check data dictionary JSON and return bool if schema is valid or not. + Valid schema means that data dictionary has given keys: + - "name" + - "tags" + - "data_type" + - "description" + - "entry_schema" + - "updatedBy" + - "definition" + "definition" key value should contains the "raw" data dictionary. + + Returns: + bool: True if data dictionary has valid schema, False otherwise + + """ + return all(key_to_check in self.data_dictionary_json for + key_to_check in ["name", "tags", "data_type", "description", "entry_schema", + "updatedBy", "definition"]) + + def fix_schema(self) -> None: + """Fix data dictionary schema. + + "Raw" data dictionary can be passed during initialization, but + this kind of data dictionary can't be uploaded to blueprintprocessor. + That method tries to fix it. It can be done only if "raw" data dictionary + has a given schema: + { + "name": "string", + "tags": "string", + "updated-by": "string", + "property": { + "description": "string", + "type": "string" + } + } + + Raises: + ValidationError: Data dictionary doesn't have all required keys + + """ + try: + self.data_dictionary_json = { + "name": self.data_dictionary_json["name"], + "tags": self.data_dictionary_json["tags"], + "data_type": self.data_dictionary_json["property"]["type"], + "description": self.data_dictionary_json["property"]["description"], + "entry_schema": self.data_dictionary_json["property"]["type"], + "updatedBy": self.data_dictionary_json["updated-by"], + "definition": self.data_dictionary_json + } + except KeyError: + raise ValidationError("Raw data dictionary JSON has invalid schema") + + +class DataDictionarySet: + """Data dictionary set. + + Stores data dictionary and upload to server. + """ + + logger: Logger = getLogger(__name__) + + def __init__(self) -> None: + """Initialize data dictionary set.""" + self.dd_set = set() + + @property + def length(self) -> int: + """Get the length of data dicitonary set. + + Returns: + int: Number of data dictionaries in set + + """ + return len(self.dd_set) + + def add(self, data_dictionary: DataDictionary) -> None: + """Add data dictionary object to set. + + Based on name it won't add duplications. + + Args: + data_dictionary (DataDictionary): object to add to set. + + """ + self.dd_set.add(data_dictionary) + + def upload(self) -> None: + """Upload all data dictionaries using CDS API. + + Raises: + RuntimeError: Raises if any data dictionary won't be uploaded to server. + Data dictionaries uploaded before the one which raises excepion won't be + deleted from server. + + """ + self.logger.debug("Upload data dictionary") + for data_dictionary in self.dd_set: # type DataDictionary + data_dictionary.upload() # raise a relevant exception + + def save_to_file(self, dd_file_path: str) -> None: + """Save data dictionaries to file. + + Args: + dd_file_path (str): Data dictinary file path. + """ + with open(dd_file_path, "w") as dd_file: + dd_file.write(json.dumps([dd.data_dictionary_json for dd in self.dd_set], indent=4)) + + @classmethod + def load_from_file(cls, dd_file_path: str, fix_schema: bool = True) -> "DataDictionarySet": + """Create data dictionary set from file. + + File has to have valid JSON with data dictionaries list. + + Args: + dd_file_path (str): Data dictionaries file path. + fix_schema (bool): Determines if schema should be fixed or not. + + Raises: + FileError: File to load data dictionaries from doesn't exist. + + Returns: + DataDictionarySet: Data dictionary set with data dictionaries from given file. + + """ + dd_set: DataDictionarySet = DataDictionarySet() + + try: + with open(dd_file_path, "r") as dd_file: # type file + dd_json: dict = json.loads(dd_file.read()) + for data_dictionary in dd_json: # type DataDictionary + dd_set.add(DataDictionary(data_dictionary, fix_schema=fix_schema)) + return dd_set + except FileNotFoundError as exc: + msg = "File with a set of data dictionaries does not exist." + raise FileError(msg) from exc |