diff options
Diffstat (limited to 'src/onaptests/steps')
-rw-r--r-- | src/onaptests/steps/cloud/expose_service_node_port.py | 118 | ||||
-rw-r--r-- | src/onaptests/steps/instantiate/pnf_register_ves.py | 68 | ||||
-rw-r--r-- | src/onaptests/steps/onboard/cds.py | 102 |
3 files changed, 190 insertions, 98 deletions
diff --git a/src/onaptests/steps/cloud/expose_service_node_port.py b/src/onaptests/steps/cloud/expose_service_node_port.py new file mode 100644 index 0000000..ee820f7 --- /dev/null +++ b/src/onaptests/steps/cloud/expose_service_node_port.py @@ -0,0 +1,118 @@ +# http://www.apache.org/licenses/LICENSE-2.0 +"""Expose service NodePort module.""" + +from typing import Any, Dict + +import urllib3 +from kubernetes import client, config +from kubernetes.client.exceptions import ApiException +from onapsdk.configuration import settings + +from onaptests.steps.base import BaseStep +from onaptests.utils.exceptions import OnapTestException + + +class ExposeServiceNodePortStep(BaseStep): + """Expose Service NodePort.""" + + def __init__(self, component: str, service_name: str, port: int, node_port: int) -> None: + """Initialize step.""" + super().__init__(cleanup=settings.CLEANUP_FLAG) + self.component_value = component + self.service_name = service_name + self.port = port + self.node_port = node_port + if settings.IN_CLUSTER: + config.load_incluster_config() + else: + config.load_kube_config(config_file=settings.K8S_CONFIG) + self.k8s_client: client.CoreV1Api = client.CoreV1Api() + + @property + def component(self) -> str: + return self.component_value + + @property + def description(self) -> str: + """Step description.""" + return "Expose service NodePort." + + def is_service_node_port_type(self) -> bool: + """Check if service type is 'NodePort' + + Raises: + OnapTestException: Kubernetes API error + + Returns: + bool: True if service type is 'NodePort', False otherwise + + """ + try: + service_data: Dict[str, Any] = self.k8s_client.read_namespaced_service( + self.service_name, + settings.K8S_ONAP_NAMESPACE + ) + return service_data.spec.type == "NodePort" + except ApiException: + self._logger.exception("Kubernetes API exception") + raise OnapTestException + + @BaseStep.store_state + def execute(self) -> None: + """Expose services ports using kubernetes client. + + Use settings values: + - K8S_CONFIG, + - K8S_ONAP_NAMESPACE. + - EXPOSE_SERVICES_NODE_PORTS + + """ + super().execute() + if not self.is_service_node_port_type(): + try: + self.k8s_client.patch_namespaced_service( + self.service_name, + settings.K8S_ONAP_NAMESPACE, + {"spec": {"ports": [{"port": self.port, "nodePort": self.node_port}], "type": "NodePort"}} + ) + except ApiException: + self._logger.exception("Kubernetes API exception") + raise OnapTestException + except urllib3.exceptions.HTTPError: + self._logger.exception("Can't connect with k8s") + raise OnapTestException + else: + self._logger.debug("Service already patched, skip") + + def cleanup(self) -> None: + """Step cleanup. + + Restore service. + + """ + if self.is_service_node_port_type(): + try: + self.k8s_client.patch_namespaced_service( + self.service_name, + settings.K8S_ONAP_NAMESPACE, + [ + { + "op": "remove", + "path": "/spec/ports/0/nodePort" + }, + { + "op": "replace", + "path": "/spec/type", + "value": "ClusterIP" + } + ] + ) + except ApiException: + self._logger.exception("Kubernetes API exception") + raise OnapTestException + except urllib3.exceptions.HTTPError: + self._logger.exception("Can't connect with k8s") + raise OnapTestException + else: + self._logger.debug("Service is not 'NodePort' type, skip") + return super().cleanup() diff --git a/src/onaptests/steps/instantiate/pnf_register_ves.py b/src/onaptests/steps/instantiate/pnf_register_ves.py new file mode 100644 index 0000000..cc606ed --- /dev/null +++ b/src/onaptests/steps/instantiate/pnf_register_ves.py @@ -0,0 +1,68 @@ +# http://www.apache.org/licenses/LICENSE-2.0 +"""PNF simulator registration module.""" + +import time + +import requests +from jinja2 import Environment, PackageLoader, select_autoescape +from onapsdk.configuration import settings +from onapsdk.ves.ves import Ves + +from onaptests.steps.base import BaseStep +from onaptests.steps.cloud.expose_service_node_port import \ + ExposeServiceNodePortStep +from onaptests.utils.exceptions import OnapTestException + + +class SendPnfRegisterVesEvent(BaseStep): + """PNF VES registration step.""" + + def __init__(self) -> None: + """Initialize step.""" + super().__init__(cleanup=BaseStep.HAS_NO_CLEANUP) + if settings.EXPOSE_SERVICES_NODE_PORTS: + self.add_step(ExposeServiceNodePortStep(component="VES-Collector", + service_name="dcae-ves-collector", port=8080, node_port=settings.VES_NODE_PORT)) + + @property + def description(self) -> str: + """Step description.""" + return "Register PNF with VES." + + @property + def component(self) -> str: + """Component name.""" + return "Environment" + + @BaseStep.store_state + def execute(self) -> None: + """Send PNF registration event.""" + super().execute() + registration_number: int = 0 + + source_name = settings.SERVICE_INSTANCE_NAME + jinja_env = Environment(autoescape=select_autoescape(['jinja']), + loader=PackageLoader('onaptests.templates', + 'artifacts')) + template = jinja_env.get_template("pnf_register_ves_message.jinja") + event_data = template.render( + source_name=source_name) + + registered_successfully: bool = False + while (registration_number < settings.PNF_REGISTRATION_NUMBER_OF_TRIES and + not registered_successfully): + try: + response = Ves.send_event(version="v7", json_event=event_data, + basic_auth=settings.VES_BASIC_AUTH) + if response is None: + raise OnapTestException("Failed to send event to VES SERVER") + response.raise_for_status() + registered_successfully = True + self._logger.info(f"PNF registered with {settings.SERVICE_INSTANCE_NAME} " + "source name") + except (requests.ConnectionError, requests.HTTPError) as http_error: + self._logger.debug(f"Can't send to ves: {str(http_error)}") + registration_number += 1 + time.sleep(settings.PNF_WAIT_TIME) + if not registered_successfully: + raise OnapTestException("PNF not registered successfully") diff --git a/src/onaptests/steps/onboard/cds.py b/src/onaptests/steps/onboard/cds.py index 2074296..c22ff4f 100644 --- a/src/onaptests/steps/onboard/cds.py +++ b/src/onaptests/steps/onboard/cds.py @@ -5,15 +5,14 @@ from abc import ABC from pathlib import Path from typing import Any, Dict -from kubernetes import client, config -from kubernetes.client.exceptions import ApiException from onapsdk.cds import Blueprint, DataDictionarySet from onapsdk.cds.blueprint import Workflow from onapsdk.cds.blueprint_processor import Blueprintprocessor from onapsdk.configuration import settings -import urllib3 from onaptests.steps.base import BaseStep +from onaptests.steps.cloud.expose_service_node_port import \ + ExposeServiceNodePortStep from onaptests.utils.exceptions import OnapTestException @@ -26,104 +25,11 @@ class CDSBaseStep(BaseStep, ABC): return "CDS" -class ExposeCDSBlueprintprocessorNodePortStep(CDSBaseStep): +class ExposeCDSBlueprintprocessorNodePortStep(CDSBaseStep, ExposeServiceNodePortStep): """Expose CDS blueprintsprocessor port.""" - def __init__(self) -> None: """Initialize step.""" - super().__init__(cleanup=settings.CLEANUP_FLAG) - self.service_name: str = "cds-blueprints-processor-http" - if settings.IN_CLUSTER: - config.load_incluster_config() - else: - config.load_kube_config(config_file=settings.K8S_CONFIG) - self.k8s_client: client.CoreV1Api = client.CoreV1Api() - - @property - def description(self) -> str: - """Step description.""" - return "Expose CDS blueprintsprocessor NodePort." - - def is_service_node_port_type(self) -> bool: - """Check if CDS blueprints processor service type is 'NodePort' - - Raises: - OnapTestException: Kubernetes API error - - Returns: - bool: True if service type is 'NodePort', False otherwise - - """ - try: - service_data: Dict[str, Any] = self.k8s_client.read_namespaced_service( - self.service_name, - settings.K8S_ONAP_NAMESPACE - ) - return service_data.spec.type == "NodePort" - except ApiException: - self._logger.exception("Kubernetes API exception") - raise OnapTestException - - @BaseStep.store_state - def execute(self) -> None: - """Expose CDS blueprintprocessor port using kubernetes client. - - Use settings values: - - K8S_CONFIG, - - K8S_ONAP_NAMESPACE. - - EXPOSE_SERVICES_NODE_PORTS - - """ - super().execute() - if not self.is_service_node_port_type(): - try: - self.k8s_client.patch_namespaced_service( - self.service_name, - settings.K8S_ONAP_NAMESPACE, - {"spec": {"ports": [{"port": 8080, "nodePort": 30449}], "type": "NodePort"}} - ) - except ApiException: - self._logger.exception("Kubernetes API exception") - raise OnapTestException - except urllib3.exceptions.HTTPError: - self._logger.exception("Can't connect with k8s") - raise OnapTestException - else: - self._logger.debug("Service already patched, skip") - - @BaseStep.store_state(cleanup=True) - def cleanup(self) -> None: - """Step cleanup. - - Restore CDS blueprintprocessor service. - - """ - if self.is_service_node_port_type(): - try: - self.k8s_client.patch_namespaced_service( - self.service_name, - settings.K8S_ONAP_NAMESPACE, - [ - { - "op": "remove", - "path": "/spec/ports/0/nodePort" - }, - { - "op": "replace", - "path": "/spec/type", - "value": "ClusterIP" - } - ] - ) - except ApiException: - self._logger.exception("Kubernetes API exception") - raise OnapTestException - except urllib3.exceptions.HTTPError: - self._logger.exception("Can't connect with k8s") - raise OnapTestException - else: - self._logger.debug("Service is not 'NodePort' type, skip") - return super().cleanup() + super().__init__(component = "CDS", service_name="cds-blueprints-processor-http", port=8080, node_port=settings.CDS_NODE_PORT) class BootstrapBlueprintprocessor(CDSBaseStep): |