aboutsummaryrefslogtreecommitdiffstats
path: root/src/onaptests/steps/instantiate/sdnc_service.py
diff options
context:
space:
mode:
Diffstat (limited to 'src/onaptests/steps/instantiate/sdnc_service.py')
-rw-r--r--src/onaptests/steps/instantiate/sdnc_service.py135
1 files changed, 128 insertions, 7 deletions
diff --git a/src/onaptests/steps/instantiate/sdnc_service.py b/src/onaptests/steps/instantiate/sdnc_service.py
index 7b8c600..0c10591 100644
--- a/src/onaptests/steps/instantiate/sdnc_service.py
+++ b/src/onaptests/steps/instantiate/sdnc_service.py
@@ -1,14 +1,21 @@
+import base64
import logging
+from typing import Dict
+import mysql.connector as mysql
+from kubernetes import client, config
from onapsdk.configuration import settings
from onapsdk.exceptions import APIError
from onapsdk.sdnc import VfModulePreload
from onapsdk.sdnc.preload import PreloadInformation
+from onapsdk.sdnc.sdnc_element import SdncElement
from onapsdk.sdnc.services import Service
+from onapsdk.utils.headers_creator import headers_sdnc_creator
from onaptests.scenario.scenario_base import BaseScenarioStep
from onaptests.steps.base import BaseStep
-from onaptests.utils.exceptions import OnapTestException
+from onaptests.utils.exceptions import (EnvironmentPreparationException,
+ OnapTestException)
class BaseSdncStep(BaseStep):
@@ -32,6 +39,77 @@ class BaseSdncStep(BaseStep):
return "SDNC"
+class CheckSdncDbStep(BaseSdncStep):
+ """Check MariaDB connection status."""
+
+ SDNC_QUERY = "SELECT * FROM svc_logic LIMIT 1;"
+ SDNC_DATABASE = "sdnctl"
+ SDNC_DB_LOGIN = "login"
+ SDNC_DB_PASSWORD = "password"
+
+ def __init__(self):
+ """Initialize step."""
+ super().__init__(cleanup=BaseStep.HAS_NO_CLEANUP)
+ self.login = None
+ self.password = None
+
+ @property
+ def description(self) -> str:
+ """Step description."""
+ return "Check MariaDB connection."
+
+ def get_database_credentials(self):
+ """Resolve SDNC datbase credentials from k8s secret."""
+
+ if settings.IN_CLUSTER:
+ config.load_incluster_config()
+ else:
+ config.load_kube_config(config_file=settings.K8S_CONFIG)
+ api_instance = client.CoreV1Api()
+ try:
+ secret = api_instance.read_namespaced_secret(
+ settings.SDNC_SECRET_NAME, settings.K8S_ONAP_NAMESPACE)
+ if secret.data:
+ if (self.SDNC_DB_LOGIN in secret.data and self.SDNC_DB_PASSWORD in secret.data):
+ login_base64 = secret.data[self.SDNC_DB_LOGIN]
+ self.login = base64.b64decode(login_base64).decode("utf-8")
+ password_base64 = secret.data[self.SDNC_DB_PASSWORD]
+ self.password = base64.b64decode(password_base64).decode("utf-8")
+ else:
+ raise EnvironmentPreparationException(
+ "Login key or password key not found in secret")
+ else:
+ raise EnvironmentPreparationException("Secret data not found in secret")
+ except client.rest.ApiException as e:
+ self.login = None
+ self.password = None
+ raise EnvironmentPreparationException("Error accessing secret") from e
+
+ @BaseStep.store_state
+ def execute(self) -> None:
+ """Check MariaDB connection."""
+ super().execute()
+ self.get_database_credentials()
+ conn = None
+ try:
+ conn = mysql.connect(
+ database=settings.DATABASE,
+ host=settings.DB_PRIMARY_HOST,
+ port=settings.DB_PORT,
+ user=self.login,
+ password=self.password)
+ cursor = conn.cursor()
+ cursor.execute(settings.QUERY)
+ except Exception as e:
+ raise OnapTestException("Cannot connect to SDNC Database") from e
+ finally:
+ if conn:
+ try:
+ conn.close()
+ except Exception:
+ pass
+
+
class ServiceCreateStep(BaseSdncStep):
"""Service creation step."""
@@ -150,6 +228,42 @@ class UploadVfModulePreloadStep(BaseSdncStep):
)
+class CheckSdncHealthStep(BaseSdncStep, SdncElement):
+ """Check SDNC Health API response."""
+
+ headers: Dict[str, str] = headers_sdnc_creator(SdncElement.headers)
+
+ def __init__(self):
+ """Initialize step."""
+ super().__init__(cleanup=BaseStep.HAS_NO_CLEANUP)
+
+ @property
+ def description(self) -> str:
+ """Step description.
+
+ Used for reports
+
+ Returns:
+ str: Step description
+
+ """
+ return "Check SDNC Health API response."
+
+ @BaseSdncStep.store_state
+ def execute(self) -> None:
+ super().execute()
+ result = self.send_message_json(
+ "POST",
+ "SDNC SLI API Healthcheck",
+ f"{self.base_url}/restconf/operations/SLI-API:healthcheck")
+ message = ""
+ if result and result["output"]:
+ if result["output"]["response-code"] == "200":
+ return
+ message = result["output"]["response-message"]
+ raise OnapTestException("SDNC is not healthy. %s" % message)
+
+
class GetSdncPreloadStep(BaseSdncStep):
"""Get preload information from SDNC.
@@ -191,15 +305,22 @@ class GetSdncPreloadStep(BaseSdncStep):
class TestSdncStep(BaseScenarioStep):
"""Top level step for SDNC tests."""
- def __init__(self):
+ def __init__(self, full: bool = True):
"""Initialize step.
+ Args:
+ full (bool): If the API logic calls should be executed
Sub steps:
- - UpdateSdncService.
+ - CheckSdncDbStep
+ - UpdateSdncService
+ - GetSdncPreloadStep
"""
super().__init__(cleanup=BaseStep.HAS_NO_CLEANUP)
- self.add_step(UpdateSdncService())
- self.add_step(GetSdncPreloadStep())
+ self.add_step(CheckSdncDbStep())
+ self.add_step(CheckSdncHealthStep())
+ if full:
+ self.add_step(UpdateSdncService())
+ self.add_step(GetSdncPreloadStep())
@property
def description(self) -> str:
@@ -211,7 +332,7 @@ class TestSdncStep(BaseScenarioStep):
str: Step description
"""
- return "Test SDNC functionality scenario step"
+ return "Test SDNC functionality"
@property
def component(self) -> str:
@@ -224,4 +345,4 @@ class TestSdncStep(BaseScenarioStep):
str: Component name
"""
- return "TEST"
+ return "SDNC"