diff options
Diffstat (limited to 'src/onaptests/steps/onboard/cps.py')
-rw-r--r-- | src/onaptests/steps/onboard/cps.py | 153 |
1 files changed, 152 insertions, 1 deletions
diff --git a/src/onaptests/steps/onboard/cps.py b/src/onaptests/steps/onboard/cps.py index 55598bf..60e1990 100644 --- a/src/onaptests/steps/onboard/cps.py +++ b/src/onaptests/steps/onboard/cps.py @@ -1,10 +1,15 @@ # http://www.apache.org/licenses/LICENSE-2.0 """CPS onboard module.""" +import base64 from abc import ABC +import pg8000 +from kubernetes import client, config from onapsdk.configuration import settings from onapsdk.cps import Anchor, Dataspace, SchemaSet +from onaptests.utils.exceptions import EnvironmentPreparationException + from ..base import BaseStep @@ -132,7 +137,7 @@ class CreateCpsAnchorStep(CpsBaseStep): class CreateCpsAnchorNodeStep(CpsBaseStep): - """Step to check node on anchor creation.""" + """Step to create anchor node.""" def __init__(self) -> None: """Initialize step. @@ -170,3 +175,149 @@ class CreateCpsAnchorNodeStep(CpsBaseStep): anchor: Anchor = dataspace.get_anchor(settings.ANCHOR_NAME) anchor.delete_nodes("/") super().cleanup() + + +class UpdateCpsAnchorNodeStep(CpsBaseStep): + """Step to update node on anchor creation.""" + + def __init__(self) -> None: + """Initialize step. + + Substeps: + - CreateCpsAnchorNodeStep. + """ + super().__init__(cleanup=BaseStep.HAS_NO_CLEANUP) + self.add_step(CreateCpsAnchorNodeStep()) + + @property + def description(self) -> str: + """Step description.""" + return "Update CPS anchor node" + + @BaseStep.store_state + def execute(self) -> None: + """Update a node on an anchor created on substep. + + Use settings values: + - DATASPACE_NAME, + - ANCHOR_NAME, + - ANCHOR_DATA_2. + + """ + super().execute() + dataspace: Dataspace = Dataspace(settings.DATASPACE_NAME) + anchor: Anchor = dataspace.get_anchor(settings.ANCHOR_NAME) + anchor.update_node("/", settings.ANCHOR_DATA_2) + + +class QueryCpsAnchorNodeStep(CpsBaseStep): + """Step to check query on node.""" + + def __init__(self) -> None: + """Initialize step. + + Substeps: + - UpdateCpsAnchorNodeStep. + + """ + super().__init__(cleanup=BaseStep.HAS_NO_CLEANUP) + self.add_step(UpdateCpsAnchorNodeStep()) + + @property + def description(self) -> str: + """Step description.""" + return "Query node" + + @BaseStep.store_state + def execute(self) -> None: + """Query on node on an anchor created on substep. + + Use settings values: + - DATASPACE_NAME, + - ANCHOR_NAME, + - QUERY_1, + - QUERY_2, + - QUERY_3. + + """ + super().execute() + dataspace: Dataspace = Dataspace(settings.DATASPACE_NAME) + anchor: Anchor = dataspace.get_anchor(settings.ANCHOR_NAME) + anchor.query_node(settings.QUERY_1) + anchor.query_node(settings.QUERY_2) + anchor.query_node(settings.QUERY_3) + + +class CheckPostgressDataBaseConnectionStep(CpsBaseStep): + """Step to test connection with postgress.""" + + def __init__(self) -> None: + """Initialize step.""" + super().__init__(cleanup=BaseStep.HAS_NO_CLEANUP) + + @property + def description(self) -> str: + """Step description.""" + return "Establish connection with Postgress and execute the query" + + def get_database_credentials(self): + config.load_kube_config() + api_instance = client.CoreV1Api() + try: + secret = api_instance.read_namespaced_secret( + settings.SECRET_NAME, settings.K8S_ONAP_NAMESPACE) + if secret.data: + if settings.DB_LOGIN in secret.data and settings.DB_PASSWORD in secret.data: + login_base64 = secret.data[settings.DB_LOGIN] + self.login = base64.b64decode(login_base64).decode("utf-8") + password_base64 = secret.data[settings.DB_PASSWORD] + self.password = base64.b64decode(password_base64).decode("utf-8") + else: + raise EnvironmentPreparationException( + "Login key or password key not found in secret") + else: + raise EnvironmentPreparationException("Secret data not found in secret") + except client.rest.ApiException as e: + self.login = None + self.password = None + raise EnvironmentPreparationException("Error accessing secret") from e + + def connect_to_postgress(self): + self.get_database_credentials() + if self.login and self.password: + db_params = { + "user": self.login, + "password": self.password, + "host": settings.DB_PRIMARY_HOST, + "database": settings.DATABASE, + "port": settings.DB_PORT + } + try: + connection = pg8000.connect(**db_params) + cursor = connection.cursor() + select_query = "SELECT * FROM yang_resource LIMIT 1;" + cursor.execute(select_query) + if cursor: + cursor.close() + if connection: + connection.close() + except pg8000.Error as e: + self._logger.exception(f"Error while connecting to PostgreSQL: {str(e)}") + raise + + @BaseStep.store_state + def execute(self) -> None: + """Establish connection with Postgress and execute the query. + + Use settings values: + - DB_PRIMARY_HOST, + - DATABASE, + - DB_PORT, + - K8S_ONAP_NAMESPACE, + - SECRET_NAME, + - DB_LOGIN, + - DB_PASSWORD. + + """ + super().execute() + self.connect_to_postgress() |