aboutsummaryrefslogtreecommitdiffstats
path: root/src/onaptests/steps/onboard
diff options
context:
space:
mode:
authorMichal Jagiello <michal.jagiello@t-mobile.pl>2021-07-20 10:15:32 +0000
committerMichal Jagiello <michal.jagiello@t-mobile.pl>2021-11-08 17:17:21 +0000
commitaad04fa2a7e1c9d71f273d26da25436e5c6b779c (patch)
tree8b247e3c7d2bf22a9b6d50b764afba6946b5e746 /src/onaptests/steps/onboard
parent1a77bcd6eb5ab695a232004b88e5e59d9d9ff6a3 (diff)
[TEST] CDS resource-resolution test
Issue-ID: TEST-291 Signed-off-by: Michal Jagiello <michal.jagiello@t-mobile.pl> Change-Id: I5d9f55b67942c62f63e11282ef2383fe063d3137 (cherry picked from commit c31b7fb464fbb8bb0c7d8d2b3dc7b20f4a04cff5)
Diffstat (limited to 'src/onaptests/steps/onboard')
-rw-r--r--src/onaptests/steps/onboard/cds.py38
1 files changed, 36 insertions, 2 deletions
diff --git a/src/onaptests/steps/onboard/cds.py b/src/onaptests/steps/onboard/cds.py
index cbd69ce..9239c43 100644
--- a/src/onaptests/steps/onboard/cds.py
+++ b/src/onaptests/steps/onboard/cds.py
@@ -8,6 +8,7 @@ from typing import Any, Dict
from kubernetes import client, config
from kubernetes.client.exceptions import ApiException
from onapsdk.cds import Blueprint, DataDictionarySet
+from onapsdk.cds.blueprint import Workflow
from onapsdk.cds.blueprint_processor import Blueprintprocessor
from onapsdk.configuration import settings
import urllib3
@@ -222,12 +223,45 @@ class CbaPublishStep(CDSBaseStep):
@BaseStep.store_state
def execute(self) -> None:
- """Enrich CBA file.
+ """Publish CBA file.
Use settings values:
- - CDS_DD_FILE.
+ - CDS_CBA_ENRICHED.
"""
super().execute()
blueprint: Blueprint = Blueprint.load_from_file(settings.CDS_CBA_ENRICHED)
blueprint.publish()
+
+
+class CbaProcessStep(CDSBaseStep):
+ """Process CBA step."""
+
+ def __init__(self, cleanup=False) -> None:
+ """Initialize CBA process step."""
+ super().__init__(cleanup=cleanup)
+ self.add_step(CbaPublishStep(cleanup=cleanup))
+
+ @property
+ def description(self) -> str:
+ """Step description."""
+ return "Process CBA file."
+
+ @BaseStep.store_state
+ def execute(self) -> None:
+ """Process CBA file.
+
+ Check if output is equal to expected
+
+ Use settings values:
+ - CDS_CBA_ENRICHED,
+ - CDS_WORKFLOW_NAME,
+ - CDS_WORKFLOW_INPUT
+
+ """
+ super().execute()
+ blueprint: Blueprint = Blueprint.load_from_file(settings.CDS_CBA_ENRICHED)
+ workflow: Workflow = blueprint.get_workflow_by_name(settings.CDS_WORKFLOW_NAME)
+ output: Dict[str, Any] = workflow.execute(settings.CDS_WORKFLOW_INPUT)
+ if not output == settings.CDS_WORKFLOW_EXPECTED_OUTPUT:
+ raise OnapTestException("Response is not equal to the expected one")