diff options
Diffstat (limited to 'src/onaptests/steps/onboard/service.py')
-rw-r--r-- | src/onaptests/steps/onboard/service.py | 50 |
1 files changed, 50 insertions, 0 deletions
diff --git a/src/onaptests/steps/onboard/service.py b/src/onaptests/steps/onboard/service.py index d8a627e..77456ff 100644 --- a/src/onaptests/steps/onboard/service.py +++ b/src/onaptests/steps/onboard/service.py @@ -2,6 +2,7 @@ import time from typing import Any, Dict, Iterator from urllib.parse import urlencode +import mysql.connector as mysql from onapsdk.aai.service_design_and_creation import Model from onapsdk.configuration import settings from onapsdk.exceptions import InvalidResponse, ResourceNotFound @@ -17,6 +18,7 @@ from yaml import SafeLoader, load import onaptests.utils.exceptions as onap_test_exceptions from onaptests.scenario.scenario_base import BaseScenarioStep +from onaptests.utils.kubernetes import KubernetesHelper from ..base import BaseStep, YamlTemplateBaseStep from .pnf import YamlTemplatePnfOnboardStep @@ -203,6 +205,7 @@ class VerifyServiceDistributionStep(BaseScenarioStep): notified_module=notified_module)) self.add_step(VerifyServiceDistributionInSoStep()) self.add_step(VerifyServiceDistributionInAaiStep()) + self.add_step(VerifyServiceDistributionInSdncStep()) @property def description(self) -> str: @@ -392,3 +395,50 @@ class VerifyServiceDistributionInAaiStep(BaseServiceDistributionComponentCheckSt msg = f"Service {self.service.name} is missing in AAI." self._logger.error(msg) raise onap_test_exceptions.ServiceDistributionException(msg) from e + + +class VerifyServiceDistributionInSdncStep(BaseServiceDistributionComponentCheckStep): + """Check service distribution in SDNC step.""" + + SDNC_DATABASE = "sdnctl" + SDNC_DB_LOGIN = "login" + SDNC_DB_PASSWORD = "password" + + def __init__(self): + """Initialize step.""" + BaseServiceDistributionComponentCheckStep.__init__( + self, component_name="SDNC") + + @BaseStep.store_state + def execute(self): + """Check service distribution status.""" + super().execute() + login, password = KubernetesHelper.get_credentials_from_secret( + settings.SDNC_SECRET_NAME, self.SDNC_DB_LOGIN, self.SDNC_DB_PASSWORD) + conn = None + try: + conn = mysql.connect( + database=self.SDNC_DATABASE, + host=settings.SDNC_DB_PRIMARY_HOST, + port=settings.SDNC_DB_PORT, + user=login, + password=password) + cursor = conn.cursor() + cursor.execute( + f"SELECT * FROM service_model WHERE service_uuid = '{self.service.uuid}'") + for _ in cursor: + pass + if cursor.rowcount == 0: + msg = f"Service {self.service.name} is missing in SDNC." + self._logger.error(msg) + raise onap_test_exceptions.ServiceDistributionException(msg) + cursor.close() + except Exception as e: + msg = f"Service {self.service.name} is missing in SDNC." + raise onap_test_exceptions.ServiceDistributionException(msg) from e + finally: + if conn: + try: + conn.close() + except Exception: + pass |