aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--src/onaptests/steps/onboard/cds.py52
-rw-r--r--src/onaptests/steps/simulator/pnf_simulator_cnf/pnf_register.py45
2 files changed, 57 insertions, 40 deletions
diff --git a/src/onaptests/steps/onboard/cds.py b/src/onaptests/steps/onboard/cds.py
index f137adb..3ef37f2 100644
--- a/src/onaptests/steps/onboard/cds.py
+++ b/src/onaptests/steps/onboard/cds.py
@@ -9,8 +9,10 @@ from kubernetes import client, config
from onapsdk.cds import Blueprint, DataDictionarySet
from onapsdk.cds.blueprint_processor import Blueprintprocessor
from onapsdk.configuration import settings
+import urllib3
from ..base import BaseStep
+from onaptests.utils.exceptions import OnapTestException
class CDSBaseStep(BaseStep, ABC):
@@ -48,11 +50,15 @@ class ExposeCDSBlueprintprocessorNodePortStep(CDSBaseStep):
super().execute()
config.load_kube_config(settings.K8S_CONFIG)
self.k8s_client = client.CoreV1Api()
- self.k8s_client.patch_namespaced_service(
- self.service_name,
- settings.K8S_NAMESPACE,
- {"spec": {"ports": [{"port": 8080, "nodePort": 30449}], "type": "NodePort"}}
- )
+ try:
+ self.k8s_client.patch_namespaced_service(
+ self.service_name,
+ settings.K8S_NAMESPACE,
+ {"spec": {"ports": [{"port": 8080, "nodePort": 30449}], "type": "NodePort"}}
+ )
+ except urllib3.exceptions.HTTPError:
+ self._logger.exception("Can't connect with k8s")
+ raise OnapTestException
def cleanup(self) -> None:
"""Step cleanup.
@@ -60,22 +66,26 @@ class ExposeCDSBlueprintprocessorNodePortStep(CDSBaseStep):
Restore CDS blueprintprocessor service.
"""
- self.k8s_client.patch_namespaced_service(
- self.service_name,
- settings.K8S_NAMESPACE,
- [
- {
- "op": "remove",
- "path": "/spec/ports/0/nodePort"
- },
- {
- "op": "replace",
- "path": "/spec/type",
- "value": "ClusterIP"
- }
- ]
- )
- return super().cleanup()
+ try:
+ self.k8s_client.patch_namespaced_service(
+ self.service_name,
+ settings.K8S_NAMESPACE,
+ [
+ {
+ "op": "remove",
+ "path": "/spec/ports/0/nodePort"
+ },
+ {
+ "op": "replace",
+ "path": "/spec/type",
+ "value": "ClusterIP"
+ }
+ ]
+ )
+ return super().cleanup()
+ except urllib3.exceptions.HTTPError:
+ self._logger.exception("Can't connect with k8s")
+ raise OnapTestException
class BootstrapBlueprintprocessor(CDSBaseStep):
diff --git a/src/onaptests/steps/simulator/pnf_simulator_cnf/pnf_register.py b/src/onaptests/steps/simulator/pnf_simulator_cnf/pnf_register.py
index c1d76b0..3483351 100644
--- a/src/onaptests/steps/simulator/pnf_simulator_cnf/pnf_register.py
+++ b/src/onaptests/steps/simulator/pnf_simulator_cnf/pnf_register.py
@@ -7,6 +7,7 @@ from typing import Tuple
import requests
from kubernetes import client, config, watch
from onapsdk.configuration import settings
+import urllib3
from onaptests.steps.base import BaseStep
from onaptests.steps.instantiate.msb_k8s import CreateInstanceStep
@@ -35,8 +36,7 @@ class PnfSimulatorCnfRegisterStep(BaseStep):
"""Component name."""
return "Environment"
- @staticmethod
- def is_pnf_pod_running(timeout_seconds=120) -> bool:
+ def is_pnf_pod_running(self, timeout_seconds=120) -> bool:
"""Check if PNF simulator pod is running.
Args:
@@ -49,18 +49,21 @@ class PnfSimulatorCnfRegisterStep(BaseStep):
config.load_kube_config(settings.K8S_CONFIG)
k8s_client: "CoreV1API" = client.CoreV1Api()
k8s_watch: "Watch" = watch.Watch()
- for event in k8s_watch.stream(k8s_client.list_namespaced_pod,
- namespace=settings.K8S_NAMESPACE,
- timeout_seconds=timeout_seconds):
- if event["object"].metadata.name == "pnf-simulator":
- if not event["object"].status.phase in ["Pending", "Running"]:
- # Invalid pod state
- return False
- return event["object"].status.phase == "Running"
- return False
-
- @staticmethod
- def get_ves_ip_and_port() -> Tuple[str, str]:
+ try:
+ for event in k8s_watch.stream(k8s_client.list_namespaced_pod,
+ namespace=settings.K8S_NAMESPACE,
+ timeout_seconds=timeout_seconds):
+ if event["object"].metadata.name == "pnf-simulator":
+ if not event["object"].status.phase in ["Pending", "Running"]:
+ # Invalid pod state
+ return False
+ return event["object"].status.phase == "Running"
+ return False
+ except urllib3.exceptions.HTTPError:
+ self._logger.error("Can't connect with k8s")
+ raise OnapTestException
+
+ def get_ves_ip_and_port(self) -> Tuple[str, str]:
"""Static method to get VES ip address and port.
Raises:
@@ -72,10 +75,14 @@ class PnfSimulatorCnfRegisterStep(BaseStep):
"""
config.load_kube_config(settings.K8S_CONFIG)
k8s_client: "CoreV1API" = client.CoreV1Api()
- for service in k8s_client.list_namespaced_service(namespace=settings.K8S_NAMESPACE).items:
- if service.metadata.name == settings.DCAE_VES_COLLECTOR_POD_NAME:
- return service.spec.cluster_ip, service.spec.ports[0].port
- raise EnvironmentPreparationException("Couldn't get VES ip and port")
+ try:
+ for service in k8s_client.list_namespaced_service(namespace=settings.K8S_NAMESPACE).items:
+ if service.metadata.name == settings.DCAE_VES_COLLECTOR_POD_NAME:
+ return service.spec.cluster_ip, service.spec.ports[0].port
+ raise EnvironmentPreparationException("Couldn't get VES ip and port")
+ except urllib3.exceptions.HTTPError:
+ self._logger.error("Can't connect with k8s")
+ raise OnapTestException
@BaseStep.store_state
def execute(self) -> None:
@@ -110,7 +117,7 @@ class PnfSimulatorCnfRegisterStep(BaseStep):
response.raise_for_status()
registered_successfully = True
self._logger.info(f"PNF registered with {settings.SERVICE_INSTANCE_NAME} source name")
- except requests.HTTPError as http_error:
+ except (requests.ConnectionError, requests.HTTPError) as http_error:
self._logger.debug(f"Can't connect with PNF simulator: {str(http_error)}")
registration_number = registration_number + 1
time.sleep(settings.PNF_WAIT_TIME)