diff options
Diffstat (limited to 'src/onaptests/steps/simulator/pnf_simulator_cnf')
-rw-r--r-- | src/onaptests/steps/simulator/pnf_simulator_cnf/pnf_register.py | 43 |
1 files changed, 25 insertions, 18 deletions
diff --git a/src/onaptests/steps/simulator/pnf_simulator_cnf/pnf_register.py b/src/onaptests/steps/simulator/pnf_simulator_cnf/pnf_register.py index c1d76b0..77c6222 100644 --- a/src/onaptests/steps/simulator/pnf_simulator_cnf/pnf_register.py +++ b/src/onaptests/steps/simulator/pnf_simulator_cnf/pnf_register.py @@ -7,6 +7,7 @@ from typing import Tuple import requests from kubernetes import client, config, watch from onapsdk.configuration import settings +import urllib3 from onaptests.steps.base import BaseStep from onaptests.steps.instantiate.msb_k8s import CreateInstanceStep @@ -35,8 +36,7 @@ class PnfSimulatorCnfRegisterStep(BaseStep): """Component name.""" return "Environment" - @staticmethod - def is_pnf_pod_running(timeout_seconds=120) -> bool: + def is_pnf_pod_running(self, timeout_seconds=120) -> bool: """Check if PNF simulator pod is running. Args: @@ -49,18 +49,21 @@ class PnfSimulatorCnfRegisterStep(BaseStep): config.load_kube_config(settings.K8S_CONFIG) k8s_client: "CoreV1API" = client.CoreV1Api() k8s_watch: "Watch" = watch.Watch() - for event in k8s_watch.stream(k8s_client.list_namespaced_pod, - namespace=settings.K8S_NAMESPACE, - timeout_seconds=timeout_seconds): - if event["object"].metadata.name == "pnf-simulator": - if not event["object"].status.phase in ["Pending", "Running"]: - # Invalid pod state - return False - return event["object"].status.phase == "Running" - return False - - @staticmethod - def get_ves_ip_and_port() -> Tuple[str, str]: + try: + for event in k8s_watch.stream(k8s_client.list_namespaced_pod, + namespace=settings.K8S_NAMESPACE, + timeout_seconds=timeout_seconds): + if event["object"].metadata.name == "pnf-simulator": + if not event["object"].status.phase in ["Pending", "Running"]: + # Invalid pod state + return False + return event["object"].status.phase == "Running" + return False + except urllib3.exceptions.HTTPError: + self._logger.error("Can't connect with k8s") + raise OnapTestException + + def get_ves_ip_and_port(self) -> Tuple[str, str]: """Static method to get VES ip address and port. Raises: @@ -72,10 +75,14 @@ class PnfSimulatorCnfRegisterStep(BaseStep): """ config.load_kube_config(settings.K8S_CONFIG) k8s_client: "CoreV1API" = client.CoreV1Api() - for service in k8s_client.list_namespaced_service(namespace=settings.K8S_NAMESPACE).items: - if service.metadata.name == settings.DCAE_VES_COLLECTOR_POD_NAME: - return service.spec.cluster_ip, service.spec.ports[0].port - raise EnvironmentPreparationException("Couldn't get VES ip and port") + try: + for service in k8s_client.list_namespaced_service(namespace=settings.K8S_NAMESPACE).items: + if service.metadata.name == settings.DCAE_VES_COLLECTOR_POD_NAME: + return service.spec.cluster_ip, service.spec.ports[0].port + raise EnvironmentPreparationException("Couldn't get VES ip and port") + except urllib3.exceptions.HTTPError: + self._logger.error("Can't connect with k8s") + raise OnapTestException @BaseStep.store_state def execute(self) -> None: |