aboutsummaryrefslogtreecommitdiffstats
path: root/src/onaptests/steps/cloud/expose_service_node_port.py
blob: 1f43a65926aa88872afb900f1519da1370722184 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
# http://www.apache.org/licenses/LICENSE-2.0
"""Expose service NodePort module."""

from typing import Any, Dict

import urllib3
from kubernetes import client, config
from kubernetes.client.exceptions import ApiException
from onapsdk.configuration import settings

from onaptests.steps.base import BaseStep
from onaptests.utils.exceptions import OnapTestException


class ExposeServiceNodePortStep(BaseStep):
    """Expose Service NodePort."""

    def __init__(self, component: str, service_name: str, port: int, node_port: int) -> None:
        """Initialize step."""
        super().__init__(cleanup=settings.CLEANUP_FLAG)
        self.component_value = component
        self.service_name = service_name
        self.port = port
        self.node_port = node_port
        if settings.IN_CLUSTER:
            config.load_incluster_config()
        else:
            config.load_kube_config(config_file=settings.K8S_CONFIG)
        self.k8s_client: client.CoreV1Api = client.CoreV1Api()

    @property
    def component(self) -> str:
        return self.component_value

    @property
    def description(self) -> str:
        """Step description."""
        return "Expose service NodePort."

    def is_service_node_port_type(self) -> bool:
        """Check if service type is 'NodePort'

        Raises:
            OnapTestException: Kubernetes API error

        Returns:
            bool: True if service type is 'NodePort', False otherwise

        """
        try:
            service_data: Dict[str, Any] = self.k8s_client.read_namespaced_service(
                self.service_name,
                settings.K8S_ONAP_NAMESPACE
            )
            return service_data.spec.type == "NodePort"
        except ApiException:
            self._logger.exception("Kubernetes API exception")
            raise OnapTestException

    @BaseStep.store_state
    def execute(self) -> None:
        """Expose services ports using kubernetes client.

        Use settings values:
         - K8S_CONFIG,
         - K8S_ONAP_NAMESPACE.
         - EXPOSE_SERVICES_NODE_PORTS

        """
        super().execute()
        if not self.is_service_node_port_type():
            try:
                self.k8s_client.patch_namespaced_service(
                    self.service_name,
                    settings.K8S_ONAP_NAMESPACE,
                    {"spec": {"ports": [{"port": self.port,
                                         "nodePort": self.node_port}],
                              "type": "NodePort"}}
                )
            except ApiException:
                self._logger.exception("Kubernetes API exception")
                raise OnapTestException
            except urllib3.exceptions.HTTPError:
                self._logger.exception("Can't connect with k8s")
                raise OnapTestException
        else:
            self._logger.debug("Service already patched, skip")

    def cleanup(self) -> None:
        """Step cleanup.

        Restore service.

        """
        if self.is_service_node_port_type():
            try:
                self.k8s_client.patch_namespaced_service(
                    self.service_name,
                    settings.K8S_ONAP_NAMESPACE,
                    [
                        {
                            "op": "remove",
                            "path": "/spec/ports/0/nodePort"
                        },
                        {
                            "op": "replace",
                            "path": "/spec/type",
                            "value": "ClusterIP"
                        }
                    ]
                )
            except ApiException:
                self._logger.exception("Kubernetes API exception")
                raise OnapTestException
            except urllib3.exceptions.HTTPError:
                self._logger.exception("Can't connect with k8s")
                raise OnapTestException
        else:
            self._logger.debug("Service is not 'NodePort' type, skip")
        return super().cleanup()