diff options
author | Michal Jagiello <michal.jagiello@t-mobile.pl> | 2022-07-12 09:52:01 +0000 |
---|---|---|
committer | Michal Jagiello <michal.jagiello@t-mobile.pl> | 2022-07-18 09:49:01 +0000 |
commit | 0a7c213c87d9b0916080d62d8e97ac4d47851688 (patch) | |
tree | 4599fc1da6a74decdd789f70d01b057e8f426308 /onap_data_provider/resources/cps_resource.py | |
parent | dc0b8a7349225066f0e8e55c8decc0049696b21f (diff) |
[CPS] Create CPS resources using data provider0.7.0
Create:
- dataspace
- anchor
- schema set
Issue-ID: INT-2135
Signed-off-by: Michal Jagiello <michal.jagiello@t-mobile.pl>
Change-Id: I49fa9aa34bd4157372571275774c6a410d1bedcb
Diffstat (limited to 'onap_data_provider/resources/cps_resource.py')
-rw-r--r-- | onap_data_provider/resources/cps_resource.py | 188 |
1 files changed, 188 insertions, 0 deletions
diff --git a/onap_data_provider/resources/cps_resource.py b/onap_data_provider/resources/cps_resource.py new file mode 100644 index 0000000..c8e666f --- /dev/null +++ b/onap_data_provider/resources/cps_resource.py @@ -0,0 +1,188 @@ +"""CPS resources module.""" +""" + Copyright 2022 Deutsche Telekom AG + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +""" + +import logging +from abc import ABC +from typing import Any, Dict + +from onapsdk.cps import Anchor, Dataspace, SchemaSet # type: ignore +from onapsdk.exceptions import APIError, ResourceNotFound # type: ignore + +from onap_data_provider.resources.resource import Resource + + +class DataspaceSubresource(Resource, ABC): + """Abstract dataspace subresource class.""" + + def __init__(self, data: Dict[str, Any]) -> None: + """Initialize dataspace subresource""" + super().__init__(data) + + self._dataspace: Dataspace = None + self._schema_set: SchemaSet = None + + @property + def dataspace(self) -> Dataspace: + """Dataspace property. + + Return dataspace object by it name. + + Returns: + Dataspace: Dataspace object. + + """ + return Dataspace(self.data["dataspace-name"]) + + @property + def schema_set(self) -> SchemaSet: + """Schema set property. + + Raises: + ValueError: Dataspace which name was used to get schema-set object doesn't exist + + Returns: + SchemaSet: Schema set object + + """ + if not self._schema_set: + try: + self._schema_set = self.dataspace.get_schema_set(self.data["schema-set-name"]) + except APIError as api_error: + # Needs to be fixed when https://gitlab.com/Orange-OpenSource/lfn/onap/python-onapsdk/-/issues/181 is done + if "Dataspace not found" in str(api_error): + logging.error("Dataspace %s does not exist, create it before", self.data["dataspace-name"]) + raise ValueError("Dataspace %s does not exist, create it before", self.data["dataspace-name"]) + elif "Schema Set not found" in str(api_error): + logging.info("Schema set %s not found", self.data["schema-set-name"]) + return None + else: + raise + return self._schema_set + + +class AnchorResource(DataspaceSubresource): + """Anchor resource class + + Creates CPS anchor + """ + + def __init__(self, data: Dict[str, Any]) -> None: + """Initialize anchor resource""" + super().__init__(data) + + self._anchor: Anchor = None + + def create(self) -> None: + """Create anchor. + + Raises: + ValueError: Schema set doesn't exist + + """ + if not self.schema_set: + raise ValueError("Schema set %s does not exist, create it first", self.data["schema-set-name"]) + if not self.anchor: + self.dataspace.create_anchor(self.schema_set, self.data["anchor-name"]) + + @property + def anchor(self) -> Anchor: + """Anchor property. + + Tries to get anchor from dataspace. + + Returns: + Anchor: Anchor object, if anchor already exists. None otherwise. + + """ + if not self._anchor: + try: + self._anchor = self.dataspace.get_anchor(self.data["anchor-name"]) + except APIError as api_error: + if "Anchor not found" in str(api_error): + return None + else: + raise + return self._anchor + + +class SchemaSetResource(DataspaceSubresource): + """Schema set resource class + + Creates CPS schema set and + """ + + def __init__(self, data: Dict[str, Any]) -> None: + """Initialize schema set resource""" + super().__init__(data) + + def create(self) -> None: + """Create schema set resource.""" + if not self.schema_set: + with open(self.data["schema-set-file"], "rb") as schema_set_file: + self.dataspace.create_schema_set(self.data["schema-set-name"], schema_set_file) + + +class DataspaceResource(Resource): + """DataspaceResource resource class + + Creates CPS dataspace and sub-resources + """ + + def __init__(self, data: Dict[str, Any]) -> None: + """Initialize dataspace resource""" + super().__init__(data) + + self._dataspace: Dataspace = None + + def create(self) -> None: + try: + self._dataspace = Dataspace.create(self.data["dataspace-name"]) + except APIError as api_error: + # Needs to be fixed when https://gitlab.com/Orange-OpenSource/lfn/onap/python-onapsdk/-/issues/181 is done + if "409" in str(api_error): + logging.warning("Dataspace %s already exists", self.data["dataspace-name"]) + self._dataspace = Dataspace(self.data["dataspace-name"]) + else: + raise + + for schema_set_data in self.data.get("schema-sets", []): + try: + self._dataspace.get_schema_set(schema_set_data["schema-set-name"]) + except APIError: + # Needs to be fixed when https://gitlab.com/Orange-OpenSource/lfn/onap/python-onapsdk/-/issues/181 is done + with open(schema_set_data["schema-set-file"], "rb") as schema_set_file: + self._dataspace.create_schema_set( + schema_set_data["schema-set-name"], + schema_set_file + ) + else: + logging.warning("Schema set %s already exists", schema_set_data["schema-set-name"]) + + for anchor_data in self.data.get("anchors", []): + try: + self._dataspace.get_anchor(anchor_data["anchor-name"]) + except APIError: + try: + schema_set: SchemaSet = self._dataspace.get_schema_set(anchor_data["schema-set-name"]) + except APIError: + logging.warning("Schema set %s does not exist, anchor won't be created", + anchor_data["schema-set-name"]) + continue + self._dataspace.create_anchor(schema_set, + anchor_data["anchor-name"]) + else: + logging.warning("Anchor %s already exists", anchor_data["anchor-name"]) |