# http://www.apache.org/licenses/LICENSE-2.0 """PNF simulator registration module.""" import time from typing import Tuple import requests import urllib3 from kubernetes import client, config, watch from onapsdk.configuration import settings from onaptests.steps.base import BaseStep from onaptests.steps.instantiate.msb_k8s import CreateInstanceStep from onaptests.utils.exceptions import (EnvironmentPreparationException, OnapTestException) class PnfSimulatorCnfRegisterStep(BaseStep): """PNF simulator registration step.""" def __init__(self) -> None: """Initialize step. Substeps: - CreateInstanceStep. """ super().__init__(cleanup=BaseStep.HAS_NO_CLEANUP) self.add_step(CreateInstanceStep()) @property def description(self) -> str: """Step description.""" return "Register PNF simulator with VES." @property def component(self) -> str: """Component name.""" return "Environment" def is_pnf_pod_running(self, timeout_seconds=120) -> bool: """Check if PNF simulator pod is running. Args: timeout_seconds (int, optional): Timeout. Defaults to 120. Returns: bool: True if PNF simulator pod is running, False otherwise """ if settings.IN_CLUSTER: config.load_incluster_config() else: config.load_kube_config(config_file=settings.K8S_CONFIG) k8s_client: "client.CoreV1Api" = client.CoreV1Api() k8s_watch: "watch.Watch" = watch.Watch() status = False try: for event in k8s_watch.stream(k8s_client.list_namespaced_pod, namespace=settings.K8S_ADDITIONAL_RESOURCES_NAMESPACE, timeout_seconds=timeout_seconds): if event["object"].metadata.name == "pnf-macro-test-simulator": if not event["object"].status.phase in ["Pending", "Running"]: # Invalid pod state return False if event["object"].status.phase == "Running": return True return status except urllib3.exceptions.HTTPError as exc: self._logger.error("Can't connect with k8s") raise OnapTestException from exc def get_ves_protocol_ip_and_port(self) -> Tuple[str, str, str]: """Static method to get VES protocol, ip address and port. Raises: EnvironmentPreparationException: VES pod is not running Returns: Tuple[str, str, str]: VES protocol, IP and port """ if settings.IN_CLUSTER: config.load_incluster_config() else: config.load_kube_config(config_file=settings.K8S_CONFIG) k8s_client: "client.CoreV1Api" = client.CoreV1Api() try: for service in k8s_client.list_namespaced_service( namespace=settings.K8S_TESTS_NAMESPACE).items: if service.metadata.name == settings.DCAE_VES_COLLECTOR_POD_NAME: proto = "http" if "443" in str(service.spec.ports[0].port): proto = "https" return proto, service.spec.cluster_ip, service.spec.ports[0].port raise EnvironmentPreparationException("Couldn't get VES protocol, ip and port") except Exception as exc: self._logger.exception("Can't connect with k8s") raise OnapTestException from exc @BaseStep.store_state def execute(self) -> None: """Send PNF registration event.""" super().execute() status = self.is_pnf_pod_running() if not status: raise EnvironmentPreparationException("PNF simulator is not running") # Let's still wait for PNF simulator to make sure it's initialized time.sleep(settings.PNF_WAIT_TIME) ves_proto, ves_ip, ves_port = self.get_ves_protocol_ip_and_port() registration_number: int = 0 registered_successfully: bool = False while (registration_number < settings.PNF_REGISTRATION_NUMBER_OF_TRIES and not registered_successfully): try: response = requests.post( "http://portal.api.simpledemo.onap.org:30999/simulator/start", json={ "simulatorParams": { "repeatCount": 9999, "repeatInterval": 30, "vesServerUrl": f"{ves_proto}://sample1:sample1@{ves_ip}:{ves_port}/eventListener/v7" # noqa }, "templateName": "registration.json", "patch": { "event": { "commonEventHeader": { "sourceName": settings.SERVICE_INSTANCE_NAME }, "pnfRegistrationFields": { "oamV4IpAddress": "192.168.0.1", "oamV6IpAddress": "2001:db8::1428:57ab" } } } }, timeout=settings.DEFAULT_REQUEST_TIMEOUT ) response.raise_for_status() registered_successfully = True self._logger.info(f"PNF registered with {settings.SERVICE_INSTANCE_NAME} " "source name") except (requests.ConnectionError, requests.HTTPError) as http_error: self._logger.debug(f"Can't connect with PNF simulator: {str(http_error)}") registration_number = registration_number + 1 time.sleep(settings.PNF_WAIT_TIME) if not registered_successfully: raise OnapTestException("PNF not registered successfully")