aboutsummaryrefslogtreecommitdiffstats
path: root/src/onaptests/steps
diff options
context:
space:
mode:
authorLukasz Rajewski <lukasz.rajewski@t-mobile.pl>2023-02-03 11:46:04 +0000
committerGerrit Code Review <gerrit@onap.org>2023-02-03 11:46:04 +0000
commit00fb69a0dcfb8dfa4f5b540046ee1453f45bcf37 (patch)
treea94ad8b430b832f192e30d7dd1e1e02b3a758a71 /src/onaptests/steps
parent54b66f2da9ee9d9aafc07c3f78bd7d6b755e1d17 (diff)
parent698fb766ab97734f0e6b72405a333bec657a47a4 (diff)
Merge "[CPS] Create basic_cps test"
Diffstat (limited to 'src/onaptests/steps')
-rw-r--r--src/onaptests/steps/onboard/cps.py144
1 files changed, 144 insertions, 0 deletions
diff --git a/src/onaptests/steps/onboard/cps.py b/src/onaptests/steps/onboard/cps.py
new file mode 100644
index 0000000..1c59086
--- /dev/null
+++ b/src/onaptests/steps/onboard/cps.py
@@ -0,0 +1,144 @@
+# http://www.apache.org/licenses/LICENSE-2.0
+"""CPS onboard module."""
+from abc import ABC
+
+from onapsdk.configuration import settings
+from onapsdk.cps import Anchor, Dataspace, SchemaSet
+
+from ..base import BaseStep
+
+
+class CpsBaseStep(BaseStep, ABC):
+ """Abstract CPS base step."""
+
+ @property
+ def component(self) -> str:
+ """Component name."""
+ return "CPS"
+
+
+class CreateCpsDataspaceStep(CpsBaseStep):
+ """Step to create a dataspace."""
+
+ @property
+ def description(self) -> str:
+ """Step description."""
+ return "Create CPS dataspace."
+
+ @BaseStep.store_state
+ def execute(self) -> None:
+ """Create a dataspace."""
+ super().execute()
+ Dataspace.create(settings.DATASPACE_NAME)
+
+ @BaseStep.store_state(cleanup=True)
+ def cleanup(self) -> None:
+ """Delete created dataspace."""
+ dataspace: Dataspace = Dataspace(settings.DATASPACE_NAME)
+ dataspace.delete()
+ super().cleanup()
+
+
+class CreateCpsSchemaSetStep(CpsBaseStep):
+ """Step to check schema-set creation."""
+
+ def __init__(self, cleanup: bool = False) -> None:
+ """Initialize step.
+
+ Substeps:
+ - CreateCpsDataspaceStep.
+ """
+ super().__init__(cleanup)
+ self.add_step(CreateCpsDataspaceStep(cleanup))
+
+ @property
+ def description(self) -> str:
+ """Step description."""
+ return "Create CPS bookstore schema set"
+
+ @BaseStep.store_state
+ def execute(self) -> None:
+ """Get dataspace created on substep and create schema-set."""
+ super().execute()
+ dataspace: Dataspace = Dataspace(settings.DATASPACE_NAME)
+ with settings.SCHEMA_SET_FILE.open("rb") as schema_set_file:
+ dataspace.create_schema_set(settings.SCHEMA_SET_NAME, schema_set_file)
+
+ @BaseStep.store_state(cleanup=True)
+ def cleanup(self) -> None:
+ """Delete created schema-set."""
+ dataspace: Dataspace = Dataspace(settings.DATASPACE_NAME)
+ schema_set = dataspace.get_schema_set(settings.SCHEMA_SET_NAME)
+ schema_set.delete()
+ super().cleanup()
+
+
+class CreateCpsAnchorStep(CpsBaseStep):
+ """Step to create an anchor."""
+
+ def __init__(self, cleanup: bool = False) -> None:
+ """Initialize step.
+
+ Substeps:
+ - CreateCpsSchemaSetStep.
+ """
+ super().__init__(cleanup)
+ self.add_step(CreateCpsSchemaSetStep(cleanup))
+
+ @property
+ def description(self) -> str:
+ """Step description."""
+ return "Create CPS anchor"
+
+ @BaseStep.store_state
+ def execute(self) -> None:
+ """Create anchor.
+
+ Get dataspace and schema-set created substeps and create anchor.
+ """
+ super().execute()
+ dataspace: Dataspace = Dataspace(settings.DATASPACE_NAME)
+ schema_set: SchemaSet = dataspace.get_schema_set(settings.SCHEMA_SET_NAME)
+ dataspace.create_anchor(schema_set, settings.ANCHOR_NAME)
+
+ @BaseStep.store_state(cleanup=True)
+ def cleanup(self) -> None:
+ """Delete an anchor."""
+ dataspace: Dataspace = Dataspace(settings.DATASPACE_NAME)
+ anchor: Anchor = dataspace.get_anchor(settings.ANCHOR_NAME)
+ anchor.delete()
+ super().cleanup()
+
+
+class CreateCpsAnchorNodeStep(CpsBaseStep):
+ """Step to check node on anchor creation."""
+
+ def __init__(self, cleanup: bool = False) -> None:
+ """Initialize step.
+
+ Substeps:
+ - CreateCpsAnchorStep.
+ """
+ super().__init__(cleanup)
+ self.add_step(CreateCpsAnchorStep(cleanup))
+
+ @property
+ def description(self) -> str:
+ """Step description."""
+ return "Create CPS anchor node"
+
+ @BaseStep.store_state
+ def execute(self) -> None:
+ """Create a node on an anchor created on substep."""
+ super().execute()
+ dataspace: Dataspace = Dataspace(settings.DATASPACE_NAME)
+ anchor: Anchor = dataspace.get_anchor(settings.ANCHOR_NAME)
+ anchor.create_node(settings.ANCHOR_DATA)
+
+ @BaseStep.store_state(cleanup=True)
+ def cleanup(self) -> None:
+ """Delete nodes."""
+ dataspace: Dataspace = Dataspace(settings.DATASPACE_NAME)
+ anchor: Anchor = dataspace.get_anchor(settings.ANCHOR_NAME)
+ anchor.delete_nodes("/")
+ super().cleanup()