diff options
author | Lukasz Rajewski <lukasz.rajewski@t-mobile.pl> | 2023-02-03 11:46:04 +0000 |
---|---|---|
committer | Gerrit Code Review <gerrit@onap.org> | 2023-02-03 11:46:04 +0000 |
commit | 00fb69a0dcfb8dfa4f5b540046ee1453f45bcf37 (patch) | |
tree | a94ad8b430b832f192e30d7dd1e1e02b3a758a71 /src/onaptests/steps | |
parent | 54b66f2da9ee9d9aafc07c3f78bd7d6b755e1d17 (diff) | |
parent | 698fb766ab97734f0e6b72405a333bec657a47a4 (diff) |
Merge "[CPS] Create basic_cps test"
Diffstat (limited to 'src/onaptests/steps')
-rw-r--r-- | src/onaptests/steps/onboard/cps.py | 144 |
1 files changed, 144 insertions, 0 deletions
diff --git a/src/onaptests/steps/onboard/cps.py b/src/onaptests/steps/onboard/cps.py new file mode 100644 index 0000000..1c59086 --- /dev/null +++ b/src/onaptests/steps/onboard/cps.py @@ -0,0 +1,144 @@ +# http://www.apache.org/licenses/LICENSE-2.0 +"""CPS onboard module.""" +from abc import ABC + +from onapsdk.configuration import settings +from onapsdk.cps import Anchor, Dataspace, SchemaSet + +from ..base import BaseStep + + +class CpsBaseStep(BaseStep, ABC): + """Abstract CPS base step.""" + + @property + def component(self) -> str: + """Component name.""" + return "CPS" + + +class CreateCpsDataspaceStep(CpsBaseStep): + """Step to create a dataspace.""" + + @property + def description(self) -> str: + """Step description.""" + return "Create CPS dataspace." + + @BaseStep.store_state + def execute(self) -> None: + """Create a dataspace.""" + super().execute() + Dataspace.create(settings.DATASPACE_NAME) + + @BaseStep.store_state(cleanup=True) + def cleanup(self) -> None: + """Delete created dataspace.""" + dataspace: Dataspace = Dataspace(settings.DATASPACE_NAME) + dataspace.delete() + super().cleanup() + + +class CreateCpsSchemaSetStep(CpsBaseStep): + """Step to check schema-set creation.""" + + def __init__(self, cleanup: bool = False) -> None: + """Initialize step. + + Substeps: + - CreateCpsDataspaceStep. + """ + super().__init__(cleanup) + self.add_step(CreateCpsDataspaceStep(cleanup)) + + @property + def description(self) -> str: + """Step description.""" + return "Create CPS bookstore schema set" + + @BaseStep.store_state + def execute(self) -> None: + """Get dataspace created on substep and create schema-set.""" + super().execute() + dataspace: Dataspace = Dataspace(settings.DATASPACE_NAME) + with settings.SCHEMA_SET_FILE.open("rb") as schema_set_file: + dataspace.create_schema_set(settings.SCHEMA_SET_NAME, schema_set_file) + + @BaseStep.store_state(cleanup=True) + def cleanup(self) -> None: + """Delete created schema-set.""" + dataspace: Dataspace = Dataspace(settings.DATASPACE_NAME) + schema_set = dataspace.get_schema_set(settings.SCHEMA_SET_NAME) + schema_set.delete() + super().cleanup() + + +class CreateCpsAnchorStep(CpsBaseStep): + """Step to create an anchor.""" + + def __init__(self, cleanup: bool = False) -> None: + """Initialize step. + + Substeps: + - CreateCpsSchemaSetStep. + """ + super().__init__(cleanup) + self.add_step(CreateCpsSchemaSetStep(cleanup)) + + @property + def description(self) -> str: + """Step description.""" + return "Create CPS anchor" + + @BaseStep.store_state + def execute(self) -> None: + """Create anchor. + + Get dataspace and schema-set created substeps and create anchor. + """ + super().execute() + dataspace: Dataspace = Dataspace(settings.DATASPACE_NAME) + schema_set: SchemaSet = dataspace.get_schema_set(settings.SCHEMA_SET_NAME) + dataspace.create_anchor(schema_set, settings.ANCHOR_NAME) + + @BaseStep.store_state(cleanup=True) + def cleanup(self) -> None: + """Delete an anchor.""" + dataspace: Dataspace = Dataspace(settings.DATASPACE_NAME) + anchor: Anchor = dataspace.get_anchor(settings.ANCHOR_NAME) + anchor.delete() + super().cleanup() + + +class CreateCpsAnchorNodeStep(CpsBaseStep): + """Step to check node on anchor creation.""" + + def __init__(self, cleanup: bool = False) -> None: + """Initialize step. + + Substeps: + - CreateCpsAnchorStep. + """ + super().__init__(cleanup) + self.add_step(CreateCpsAnchorStep(cleanup)) + + @property + def description(self) -> str: + """Step description.""" + return "Create CPS anchor node" + + @BaseStep.store_state + def execute(self) -> None: + """Create a node on an anchor created on substep.""" + super().execute() + dataspace: Dataspace = Dataspace(settings.DATASPACE_NAME) + anchor: Anchor = dataspace.get_anchor(settings.ANCHOR_NAME) + anchor.create_node(settings.ANCHOR_DATA) + + @BaseStep.store_state(cleanup=True) + def cleanup(self) -> None: + """Delete nodes.""" + dataspace: Dataspace = Dataspace(settings.DATASPACE_NAME) + anchor: Anchor = dataspace.get_anchor(settings.ANCHOR_NAME) + anchor.delete_nodes("/") + super().cleanup() |