diff options
author | Morgan Richomme <morgan.richomme@orange.com> | 2021-08-19 13:12:03 +0000 |
---|---|---|
committer | Gerrit Code Review <gerrit@onap.org> | 2021-08-19 13:12:03 +0000 |
commit | 017e97ccae99714c2f4c0a50ddedd5ab1e5585a9 (patch) | |
tree | cf90655fa9d4481ea6517d380ac901a81fbd0074 /src/onaptests/steps/onboard/cds.py | |
parent | 0c9f5a021a6f4704e5c09a2db70160cd1d7e074d (diff) | |
parent | c9b3ea89ea31773cdb6ee343dc6ea803d0371739 (diff) |
Merge "CDS resource-resolution test"
Diffstat (limited to 'src/onaptests/steps/onboard/cds.py')
-rw-r--r-- | src/onaptests/steps/onboard/cds.py | 38 |
1 files changed, 36 insertions, 2 deletions
diff --git a/src/onaptests/steps/onboard/cds.py b/src/onaptests/steps/onboard/cds.py index cbd69ce..9239c43 100644 --- a/src/onaptests/steps/onboard/cds.py +++ b/src/onaptests/steps/onboard/cds.py @@ -8,6 +8,7 @@ from typing import Any, Dict from kubernetes import client, config from kubernetes.client.exceptions import ApiException from onapsdk.cds import Blueprint, DataDictionarySet +from onapsdk.cds.blueprint import Workflow from onapsdk.cds.blueprint_processor import Blueprintprocessor from onapsdk.configuration import settings import urllib3 @@ -222,12 +223,45 @@ class CbaPublishStep(CDSBaseStep): @BaseStep.store_state def execute(self) -> None: - """Enrich CBA file. + """Publish CBA file. Use settings values: - - CDS_DD_FILE. + - CDS_CBA_ENRICHED. """ super().execute() blueprint: Blueprint = Blueprint.load_from_file(settings.CDS_CBA_ENRICHED) blueprint.publish() + + +class CbaProcessStep(CDSBaseStep): + """Process CBA step.""" + + def __init__(self, cleanup=False) -> None: + """Initialize CBA process step.""" + super().__init__(cleanup=cleanup) + self.add_step(CbaPublishStep(cleanup=cleanup)) + + @property + def description(self) -> str: + """Step description.""" + return "Process CBA file." + + @BaseStep.store_state + def execute(self) -> None: + """Process CBA file. + + Check if output is equal to expected + + Use settings values: + - CDS_CBA_ENRICHED, + - CDS_WORKFLOW_NAME, + - CDS_WORKFLOW_INPUT + + """ + super().execute() + blueprint: Blueprint = Blueprint.load_from_file(settings.CDS_CBA_ENRICHED) + workflow: Workflow = blueprint.get_workflow_by_name(settings.CDS_WORKFLOW_NAME) + output: Dict[str, Any] = workflow.execute(settings.CDS_WORKFLOW_INPUT) + if not output == settings.CDS_WORKFLOW_EXPECTED_OUTPUT: + raise OnapTestException("Response is not equal to the expected one") |