diff options
-rw-r--r-- | src/onaptests/steps/onboard/cds.py | 52 | ||||
-rw-r--r-- | src/onaptests/steps/simulator/pnf_simulator_cnf/pnf_register.py | 43 |
2 files changed, 56 insertions, 39 deletions
diff --git a/src/onaptests/steps/onboard/cds.py b/src/onaptests/steps/onboard/cds.py index f137adb..3ef37f2 100644 --- a/src/onaptests/steps/onboard/cds.py +++ b/src/onaptests/steps/onboard/cds.py @@ -9,8 +9,10 @@ from kubernetes import client, config from onapsdk.cds import Blueprint, DataDictionarySet from onapsdk.cds.blueprint_processor import Blueprintprocessor from onapsdk.configuration import settings +import urllib3 from ..base import BaseStep +from onaptests.utils.exceptions import OnapTestException class CDSBaseStep(BaseStep, ABC): @@ -48,11 +50,15 @@ class ExposeCDSBlueprintprocessorNodePortStep(CDSBaseStep): super().execute() config.load_kube_config(settings.K8S_CONFIG) self.k8s_client = client.CoreV1Api() - self.k8s_client.patch_namespaced_service( - self.service_name, - settings.K8S_NAMESPACE, - {"spec": {"ports": [{"port": 8080, "nodePort": 30449}], "type": "NodePort"}} - ) + try: + self.k8s_client.patch_namespaced_service( + self.service_name, + settings.K8S_NAMESPACE, + {"spec": {"ports": [{"port": 8080, "nodePort": 30449}], "type": "NodePort"}} + ) + except urllib3.exceptions.HTTPError: + self._logger.exception("Can't connect with k8s") + raise OnapTestException def cleanup(self) -> None: """Step cleanup. @@ -60,22 +66,26 @@ class ExposeCDSBlueprintprocessorNodePortStep(CDSBaseStep): Restore CDS blueprintprocessor service. """ - self.k8s_client.patch_namespaced_service( - self.service_name, - settings.K8S_NAMESPACE, - [ - { - "op": "remove", - "path": "/spec/ports/0/nodePort" - }, - { - "op": "replace", - "path": "/spec/type", - "value": "ClusterIP" - } - ] - ) - return super().cleanup() + try: + self.k8s_client.patch_namespaced_service( + self.service_name, + settings.K8S_NAMESPACE, + [ + { + "op": "remove", + "path": "/spec/ports/0/nodePort" + }, + { + "op": "replace", + "path": "/spec/type", + "value": "ClusterIP" + } + ] + ) + return super().cleanup() + except urllib3.exceptions.HTTPError: + self._logger.exception("Can't connect with k8s") + raise OnapTestException class BootstrapBlueprintprocessor(CDSBaseStep): diff --git a/src/onaptests/steps/simulator/pnf_simulator_cnf/pnf_register.py b/src/onaptests/steps/simulator/pnf_simulator_cnf/pnf_register.py index a73f668..f394289 100644 --- a/src/onaptests/steps/simulator/pnf_simulator_cnf/pnf_register.py +++ b/src/onaptests/steps/simulator/pnf_simulator_cnf/pnf_register.py @@ -7,6 +7,7 @@ from typing import Tuple import requests from kubernetes import client, config, watch from onapsdk.configuration import settings +import urllib3 from onaptests.steps.base import BaseStep from onaptests.steps.instantiate.msb_k8s import CreateInstanceStep @@ -35,8 +36,7 @@ class PnfSimulatorCnfRegisterStep(BaseStep): """Component name.""" return "Environment" - @staticmethod - def is_pnf_pod_running(timeout_seconds=120) -> bool: + def is_pnf_pod_running(self, timeout_seconds=120) -> bool: """Check if PNF simulator pod is running. Args: @@ -49,18 +49,21 @@ class PnfSimulatorCnfRegisterStep(BaseStep): config.load_kube_config(settings.K8S_CONFIG) k8s_client: "CoreV1API" = client.CoreV1Api() k8s_watch: "Watch" = watch.Watch() - for event in k8s_watch.stream(k8s_client.list_namespaced_pod, - namespace=settings.K8S_NAMESPACE, - timeout_seconds=timeout_seconds): - if event["object"].metadata.name == "pnf-simulator": - if not event["object"].status.phase in ["Pending", "Running"]: - # Invalid pod state - return False - return event["object"].status.phase == "Running" - return False - - @staticmethod - def get_ves_ip_and_port() -> Tuple[str, str]: + try: + for event in k8s_watch.stream(k8s_client.list_namespaced_pod, + namespace=settings.K8S_NAMESPACE, + timeout_seconds=timeout_seconds): + if event["object"].metadata.name == "pnf-simulator": + if not event["object"].status.phase in ["Pending", "Running"]: + # Invalid pod state + return False + return event["object"].status.phase == "Running" + return False + except urllib3.exceptions.HTTPError: + self._logger.error("Can't connect with k8s") + raise OnapTestException + + def get_ves_ip_and_port(self) -> Tuple[str, str]: """Static method to get VES ip address and port. Raises: @@ -72,10 +75,14 @@ class PnfSimulatorCnfRegisterStep(BaseStep): """ config.load_kube_config(settings.K8S_CONFIG) k8s_client: "CoreV1API" = client.CoreV1Api() - for service in k8s_client.list_namespaced_service(namespace=settings.K8S_NAMESPACE).items: - if service.metadata.name == "xdcae-ves-collector": - return service.spec.cluster_ip, service.spec.ports[0].port - raise EnvironmentPreparationException("Couldn't get VES ip and port") + try: + for service in k8s_client.list_namespaced_service(namespace=settings.K8S_NAMESPACE).items: + if service.metadata.name == settings.DCAE_VES_COLLECTOR_POD_NAME: + return service.spec.cluster_ip, service.spec.ports[0].port + raise EnvironmentPreparationException("Couldn't get VES ip and port") + except urllib3.exceptions.HTTPError: + self._logger.error("Can't connect with k8s") + raise OnapTestException @BaseStep.store_state def execute(self) -> None: |