aboutsummaryrefslogtreecommitdiffstats
path: root/src/onaptests/steps/onboard/cds.py
diff options
context:
space:
mode:
Diffstat (limited to 'src/onaptests/steps/onboard/cds.py')
-rw-r--r--src/onaptests/steps/onboard/cds.py129
1 files changed, 129 insertions, 0 deletions
diff --git a/src/onaptests/steps/onboard/cds.py b/src/onaptests/steps/onboard/cds.py
new file mode 100644
index 0000000..ed381ad
--- /dev/null
+++ b/src/onaptests/steps/onboard/cds.py
@@ -0,0 +1,129 @@
+# http://www.apache.org/licenses/LICENSE-2.0
+"""CDS onboard module."""
+
+from abc import ABC
+from pathlib import Path
+
+from kubernetes import client, config
+from onapsdk.cds import Blueprint, DataDictionarySet
+from onapsdk.cds.blueprint_processor import Blueprintprocessor
+from onapsdk.configuration import settings
+
+from ..base import BaseStep
+
+
+class CDSBaseStep(BaseStep, ABC):
+ """Abstract CDS base step."""
+
+ @property
+ def component(self) -> str:
+ """Component name."""
+ return "CDS"
+
+
+class ExposeCDSBlueprintprocessorNodePortStep(CDSBaseStep):
+ """Expose CDS blueprintsprocessor port."""
+
+ @property
+ def description(self) -> str:
+ """Step description."""
+ return "Expose CDS blueprintsprocessor NodePort."
+
+ @BaseStep.store_state
+ def execute(self) -> None:
+ """Expose CDS blueprintprocessor port using kubernetes client.
+
+ Use settings values:
+ - K8S_CONFIG,
+ - K8S_NAMESPACE.
+
+ """
+ super().execute()
+ config.load_kube_config(settings.K8S_CONFIG)
+ k8s_client = client.CoreV1Api()
+ k8s_client.patch_namespaced_service(
+ "cds-blueprints-processor-http",
+ settings.K8S_NAMESPACE,
+ {"spec": {"ports": [{"port": 8080, "nodePort": 30449}], "type": "NodePort"}}
+ )
+
+
+class BootstrapBlueprintprocessor(CDSBaseStep):
+ """Bootstrap blueprintsprocessor."""
+
+ def __init__(self, cleanup: bool = False) -> None:
+ super().__init__(cleanup=cleanup)
+ self.add_step(ExposeCDSBlueprintprocessorNodePortStep(cleanup=cleanup))
+
+ @property
+ def description(self) -> str:
+ """Step description."""
+ return "Bootstrap CDS blueprintprocessor"
+
+ @BaseStep.store_state
+ def execute(self) -> None:
+ """Bootsrap CDS blueprintprocessor"""
+ super().execute()
+ Blueprintprocessor.bootstrap()
+
+
+class DataDictionaryUploadStep(CDSBaseStep):
+ """Upload data dictionaries to CDS step."""
+
+ def __init__(self, cleanup: bool = False) -> None:
+ """Initialize data dictionary upload step."""
+ super().__init__(cleanup=cleanup)
+ self.add_step(BootstrapBlueprintprocessor(cleanup=cleanup))
+
+ @property
+ def description(self) -> str:
+ """Step description."""
+ return "Upload data dictionaries to CDS."
+
+ @BaseStep.store_state
+ def execute(self) -> None:
+ """Upload data dictionary to CDS.
+
+ Use settings values:
+ - CDS_DD_FILE.
+
+ """
+ super().execute()
+ dd_set: DataDictionarySet = DataDictionarySet.load_from_file(settings.CDS_DD_FILE)
+ dd_set.upload()
+
+
+class CbaEnrichStep(CDSBaseStep):
+ """Enrich CBA file step."""
+
+ def __init__(self, cleanup=False) -> None:
+ """Initialize CBA enrichment step."""
+ super().__init__(cleanup=cleanup)
+ self.add_step(DataDictionaryUploadStep(cleanup=cleanup))
+
+ @property
+ def description(self) -> str:
+ """Step description."""
+ return "Enrich CBA file."
+
+ @BaseStep.store_state
+ def execute(self) -> None:
+ """Enrich CBA file.
+
+ Use settings values:
+ - CDS_DD_FILE.
+
+ """
+ super().execute()
+ blueprint: Blueprint = Blueprint.load_from_file(settings.CDS_CBA_UNENRICHED)
+ blueprint.enrich()
+ blueprint.save(settings.CDS_CBA_ENRICHED)
+
+ def cleanup(self) -> None:
+ """Cleanup enrichment step.
+
+ Delete enriched CBA file.
+
+ """
+ super().cleanup()
+ Path(settings.CDA_CBA_ENRICHED).unlink()