aboutsummaryrefslogtreecommitdiffstats
path: root/src/onaptests/steps
diff options
context:
space:
mode:
Diffstat (limited to 'src/onaptests/steps')
-rw-r--r--src/onaptests/steps/cloud/expose_service_node_port.py118
-rw-r--r--src/onaptests/steps/instantiate/pnf_register_ves.py68
-rw-r--r--src/onaptests/steps/onboard/cds.py102
3 files changed, 190 insertions, 98 deletions
diff --git a/src/onaptests/steps/cloud/expose_service_node_port.py b/src/onaptests/steps/cloud/expose_service_node_port.py
new file mode 100644
index 0000000..ee820f7
--- /dev/null
+++ b/src/onaptests/steps/cloud/expose_service_node_port.py
@@ -0,0 +1,118 @@
+# http://www.apache.org/licenses/LICENSE-2.0
+"""Expose service NodePort module."""
+
+from typing import Any, Dict
+
+import urllib3
+from kubernetes import client, config
+from kubernetes.client.exceptions import ApiException
+from onapsdk.configuration import settings
+
+from onaptests.steps.base import BaseStep
+from onaptests.utils.exceptions import OnapTestException
+
+
+class ExposeServiceNodePortStep(BaseStep):
+ """Expose Service NodePort."""
+
+ def __init__(self, component: str, service_name: str, port: int, node_port: int) -> None:
+ """Initialize step."""
+ super().__init__(cleanup=settings.CLEANUP_FLAG)
+ self.component_value = component
+ self.service_name = service_name
+ self.port = port
+ self.node_port = node_port
+ if settings.IN_CLUSTER:
+ config.load_incluster_config()
+ else:
+ config.load_kube_config(config_file=settings.K8S_CONFIG)
+ self.k8s_client: client.CoreV1Api = client.CoreV1Api()
+
+ @property
+ def component(self) -> str:
+ return self.component_value
+
+ @property
+ def description(self) -> str:
+ """Step description."""
+ return "Expose service NodePort."
+
+ def is_service_node_port_type(self) -> bool:
+ """Check if service type is 'NodePort'
+
+ Raises:
+ OnapTestException: Kubernetes API error
+
+ Returns:
+ bool: True if service type is 'NodePort', False otherwise
+
+ """
+ try:
+ service_data: Dict[str, Any] = self.k8s_client.read_namespaced_service(
+ self.service_name,
+ settings.K8S_ONAP_NAMESPACE
+ )
+ return service_data.spec.type == "NodePort"
+ except ApiException:
+ self._logger.exception("Kubernetes API exception")
+ raise OnapTestException
+
+ @BaseStep.store_state
+ def execute(self) -> None:
+ """Expose services ports using kubernetes client.
+
+ Use settings values:
+ - K8S_CONFIG,
+ - K8S_ONAP_NAMESPACE.
+ - EXPOSE_SERVICES_NODE_PORTS
+
+ """
+ super().execute()
+ if not self.is_service_node_port_type():
+ try:
+ self.k8s_client.patch_namespaced_service(
+ self.service_name,
+ settings.K8S_ONAP_NAMESPACE,
+ {"spec": {"ports": [{"port": self.port, "nodePort": self.node_port}], "type": "NodePort"}}
+ )
+ except ApiException:
+ self._logger.exception("Kubernetes API exception")
+ raise OnapTestException
+ except urllib3.exceptions.HTTPError:
+ self._logger.exception("Can't connect with k8s")
+ raise OnapTestException
+ else:
+ self._logger.debug("Service already patched, skip")
+
+ def cleanup(self) -> None:
+ """Step cleanup.
+
+ Restore service.
+
+ """
+ if self.is_service_node_port_type():
+ try:
+ self.k8s_client.patch_namespaced_service(
+ self.service_name,
+ settings.K8S_ONAP_NAMESPACE,
+ [
+ {
+ "op": "remove",
+ "path": "/spec/ports/0/nodePort"
+ },
+ {
+ "op": "replace",
+ "path": "/spec/type",
+ "value": "ClusterIP"
+ }
+ ]
+ )
+ except ApiException:
+ self._logger.exception("Kubernetes API exception")
+ raise OnapTestException
+ except urllib3.exceptions.HTTPError:
+ self._logger.exception("Can't connect with k8s")
+ raise OnapTestException
+ else:
+ self._logger.debug("Service is not 'NodePort' type, skip")
+ return super().cleanup()
diff --git a/src/onaptests/steps/instantiate/pnf_register_ves.py b/src/onaptests/steps/instantiate/pnf_register_ves.py
new file mode 100644
index 0000000..cc606ed
--- /dev/null
+++ b/src/onaptests/steps/instantiate/pnf_register_ves.py
@@ -0,0 +1,68 @@
+# http://www.apache.org/licenses/LICENSE-2.0
+"""PNF simulator registration module."""
+
+import time
+
+import requests
+from jinja2 import Environment, PackageLoader, select_autoescape
+from onapsdk.configuration import settings
+from onapsdk.ves.ves import Ves
+
+from onaptests.steps.base import BaseStep
+from onaptests.steps.cloud.expose_service_node_port import \
+ ExposeServiceNodePortStep
+from onaptests.utils.exceptions import OnapTestException
+
+
+class SendPnfRegisterVesEvent(BaseStep):
+ """PNF VES registration step."""
+
+ def __init__(self) -> None:
+ """Initialize step."""
+ super().__init__(cleanup=BaseStep.HAS_NO_CLEANUP)
+ if settings.EXPOSE_SERVICES_NODE_PORTS:
+ self.add_step(ExposeServiceNodePortStep(component="VES-Collector",
+ service_name="dcae-ves-collector", port=8080, node_port=settings.VES_NODE_PORT))
+
+ @property
+ def description(self) -> str:
+ """Step description."""
+ return "Register PNF with VES."
+
+ @property
+ def component(self) -> str:
+ """Component name."""
+ return "Environment"
+
+ @BaseStep.store_state
+ def execute(self) -> None:
+ """Send PNF registration event."""
+ super().execute()
+ registration_number: int = 0
+
+ source_name = settings.SERVICE_INSTANCE_NAME
+ jinja_env = Environment(autoescape=select_autoescape(['jinja']),
+ loader=PackageLoader('onaptests.templates',
+ 'artifacts'))
+ template = jinja_env.get_template("pnf_register_ves_message.jinja")
+ event_data = template.render(
+ source_name=source_name)
+
+ registered_successfully: bool = False
+ while (registration_number < settings.PNF_REGISTRATION_NUMBER_OF_TRIES and
+ not registered_successfully):
+ try:
+ response = Ves.send_event(version="v7", json_event=event_data,
+ basic_auth=settings.VES_BASIC_AUTH)
+ if response is None:
+ raise OnapTestException("Failed to send event to VES SERVER")
+ response.raise_for_status()
+ registered_successfully = True
+ self._logger.info(f"PNF registered with {settings.SERVICE_INSTANCE_NAME} "
+ "source name")
+ except (requests.ConnectionError, requests.HTTPError) as http_error:
+ self._logger.debug(f"Can't send to ves: {str(http_error)}")
+ registration_number += 1
+ time.sleep(settings.PNF_WAIT_TIME)
+ if not registered_successfully:
+ raise OnapTestException("PNF not registered successfully")
diff --git a/src/onaptests/steps/onboard/cds.py b/src/onaptests/steps/onboard/cds.py
index 2074296..c22ff4f 100644
--- a/src/onaptests/steps/onboard/cds.py
+++ b/src/onaptests/steps/onboard/cds.py
@@ -5,15 +5,14 @@ from abc import ABC
from pathlib import Path
from typing import Any, Dict
-from kubernetes import client, config
-from kubernetes.client.exceptions import ApiException
from onapsdk.cds import Blueprint, DataDictionarySet
from onapsdk.cds.blueprint import Workflow
from onapsdk.cds.blueprint_processor import Blueprintprocessor
from onapsdk.configuration import settings
-import urllib3
from onaptests.steps.base import BaseStep
+from onaptests.steps.cloud.expose_service_node_port import \
+ ExposeServiceNodePortStep
from onaptests.utils.exceptions import OnapTestException
@@ -26,104 +25,11 @@ class CDSBaseStep(BaseStep, ABC):
return "CDS"
-class ExposeCDSBlueprintprocessorNodePortStep(CDSBaseStep):
+class ExposeCDSBlueprintprocessorNodePortStep(CDSBaseStep, ExposeServiceNodePortStep):
"""Expose CDS blueprintsprocessor port."""
-
def __init__(self) -> None:
"""Initialize step."""
- super().__init__(cleanup=settings.CLEANUP_FLAG)
- self.service_name: str = "cds-blueprints-processor-http"
- if settings.IN_CLUSTER:
- config.load_incluster_config()
- else:
- config.load_kube_config(config_file=settings.K8S_CONFIG)
- self.k8s_client: client.CoreV1Api = client.CoreV1Api()
-
- @property
- def description(self) -> str:
- """Step description."""
- return "Expose CDS blueprintsprocessor NodePort."
-
- def is_service_node_port_type(self) -> bool:
- """Check if CDS blueprints processor service type is 'NodePort'
-
- Raises:
- OnapTestException: Kubernetes API error
-
- Returns:
- bool: True if service type is 'NodePort', False otherwise
-
- """
- try:
- service_data: Dict[str, Any] = self.k8s_client.read_namespaced_service(
- self.service_name,
- settings.K8S_ONAP_NAMESPACE
- )
- return service_data.spec.type == "NodePort"
- except ApiException:
- self._logger.exception("Kubernetes API exception")
- raise OnapTestException
-
- @BaseStep.store_state
- def execute(self) -> None:
- """Expose CDS blueprintprocessor port using kubernetes client.
-
- Use settings values:
- - K8S_CONFIG,
- - K8S_ONAP_NAMESPACE.
- - EXPOSE_SERVICES_NODE_PORTS
-
- """
- super().execute()
- if not self.is_service_node_port_type():
- try:
- self.k8s_client.patch_namespaced_service(
- self.service_name,
- settings.K8S_ONAP_NAMESPACE,
- {"spec": {"ports": [{"port": 8080, "nodePort": 30449}], "type": "NodePort"}}
- )
- except ApiException:
- self._logger.exception("Kubernetes API exception")
- raise OnapTestException
- except urllib3.exceptions.HTTPError:
- self._logger.exception("Can't connect with k8s")
- raise OnapTestException
- else:
- self._logger.debug("Service already patched, skip")
-
- @BaseStep.store_state(cleanup=True)
- def cleanup(self) -> None:
- """Step cleanup.
-
- Restore CDS blueprintprocessor service.
-
- """
- if self.is_service_node_port_type():
- try:
- self.k8s_client.patch_namespaced_service(
- self.service_name,
- settings.K8S_ONAP_NAMESPACE,
- [
- {
- "op": "remove",
- "path": "/spec/ports/0/nodePort"
- },
- {
- "op": "replace",
- "path": "/spec/type",
- "value": "ClusterIP"
- }
- ]
- )
- except ApiException:
- self._logger.exception("Kubernetes API exception")
- raise OnapTestException
- except urllib3.exceptions.HTTPError:
- self._logger.exception("Can't connect with k8s")
- raise OnapTestException
- else:
- self._logger.debug("Service is not 'NodePort' type, skip")
- return super().cleanup()
+ super().__init__(component = "CDS", service_name="cds-blueprints-processor-http", port=8080, node_port=settings.CDS_NODE_PORT)
class BootstrapBlueprintprocessor(CDSBaseStep):