From 698fb766ab97734f0e6b72405a333bec657a47a4 Mon Sep 17 00:00:00 2001 From: Michal Jagiello Date: Wed, 18 Jan 2023 13:24:23 +0000 Subject: [CPS] Create basic_cps test Create test to check if creating basic cps resources (dataspace, schema-set, anchor) works and it's possible to create a node on an anchor. Issue-ID: INT-2194 Signed-off-by: Michal Jagiello Change-Id: I86c5a80da65b90e2f1a228fcc7fc46a54bee92bc --- src/onaptests/steps/onboard/cps.py | 144 +++++++++++++++++++++++++++++++++++++ 1 file changed, 144 insertions(+) create mode 100644 src/onaptests/steps/onboard/cps.py (limited to 'src/onaptests/steps') diff --git a/src/onaptests/steps/onboard/cps.py b/src/onaptests/steps/onboard/cps.py new file mode 100644 index 0000000..1c59086 --- /dev/null +++ b/src/onaptests/steps/onboard/cps.py @@ -0,0 +1,144 @@ +# http://www.apache.org/licenses/LICENSE-2.0 +"""CPS onboard module.""" +from abc import ABC + +from onapsdk.configuration import settings +from onapsdk.cps import Anchor, Dataspace, SchemaSet + +from ..base import BaseStep + + +class CpsBaseStep(BaseStep, ABC): + """Abstract CPS base step.""" + + @property + def component(self) -> str: + """Component name.""" + return "CPS" + + +class CreateCpsDataspaceStep(CpsBaseStep): + """Step to create a dataspace.""" + + @property + def description(self) -> str: + """Step description.""" + return "Create CPS dataspace." + + @BaseStep.store_state + def execute(self) -> None: + """Create a dataspace.""" + super().execute() + Dataspace.create(settings.DATASPACE_NAME) + + @BaseStep.store_state(cleanup=True) + def cleanup(self) -> None: + """Delete created dataspace.""" + dataspace: Dataspace = Dataspace(settings.DATASPACE_NAME) + dataspace.delete() + super().cleanup() + + +class CreateCpsSchemaSetStep(CpsBaseStep): + """Step to check schema-set creation.""" + + def __init__(self, cleanup: bool = False) -> None: + """Initialize step. + + Substeps: + - CreateCpsDataspaceStep. + """ + super().__init__(cleanup) + self.add_step(CreateCpsDataspaceStep(cleanup)) + + @property + def description(self) -> str: + """Step description.""" + return "Create CPS bookstore schema set" + + @BaseStep.store_state + def execute(self) -> None: + """Get dataspace created on substep and create schema-set.""" + super().execute() + dataspace: Dataspace = Dataspace(settings.DATASPACE_NAME) + with settings.SCHEMA_SET_FILE.open("rb") as schema_set_file: + dataspace.create_schema_set(settings.SCHEMA_SET_NAME, schema_set_file) + + @BaseStep.store_state(cleanup=True) + def cleanup(self) -> None: + """Delete created schema-set.""" + dataspace: Dataspace = Dataspace(settings.DATASPACE_NAME) + schema_set = dataspace.get_schema_set(settings.SCHEMA_SET_NAME) + schema_set.delete() + super().cleanup() + + +class CreateCpsAnchorStep(CpsBaseStep): + """Step to create an anchor.""" + + def __init__(self, cleanup: bool = False) -> None: + """Initialize step. + + Substeps: + - CreateCpsSchemaSetStep. + """ + super().__init__(cleanup) + self.add_step(CreateCpsSchemaSetStep(cleanup)) + + @property + def description(self) -> str: + """Step description.""" + return "Create CPS anchor" + + @BaseStep.store_state + def execute(self) -> None: + """Create anchor. + + Get dataspace and schema-set created substeps and create anchor. + """ + super().execute() + dataspace: Dataspace = Dataspace(settings.DATASPACE_NAME) + schema_set: SchemaSet = dataspace.get_schema_set(settings.SCHEMA_SET_NAME) + dataspace.create_anchor(schema_set, settings.ANCHOR_NAME) + + @BaseStep.store_state(cleanup=True) + def cleanup(self) -> None: + """Delete an anchor.""" + dataspace: Dataspace = Dataspace(settings.DATASPACE_NAME) + anchor: Anchor = dataspace.get_anchor(settings.ANCHOR_NAME) + anchor.delete() + super().cleanup() + + +class CreateCpsAnchorNodeStep(CpsBaseStep): + """Step to check node on anchor creation.""" + + def __init__(self, cleanup: bool = False) -> None: + """Initialize step. + + Substeps: + - CreateCpsAnchorStep. + """ + super().__init__(cleanup) + self.add_step(CreateCpsAnchorStep(cleanup)) + + @property + def description(self) -> str: + """Step description.""" + return "Create CPS anchor node" + + @BaseStep.store_state + def execute(self) -> None: + """Create a node on an anchor created on substep.""" + super().execute() + dataspace: Dataspace = Dataspace(settings.DATASPACE_NAME) + anchor: Anchor = dataspace.get_anchor(settings.ANCHOR_NAME) + anchor.create_node(settings.ANCHOR_DATA) + + @BaseStep.store_state(cleanup=True) + def cleanup(self) -> None: + """Delete nodes.""" + dataspace: Dataspace = Dataspace(settings.DATASPACE_NAME) + anchor: Anchor = dataspace.get_anchor(settings.ANCHOR_NAME) + anchor.delete_nodes("/") + super().cleanup() -- cgit 1.2.3-korg