aboutsummaryrefslogtreecommitdiffstats
path: root/src/onaptests/steps/cloud/expose_service_node_port.py
blob: db5b646a456b726541a4ca9a402aafd8c70ae8ee (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
# http://www.apache.org/licenses/LICENSE-2.0
"""Expose service NodePort module."""

from typing import Any, Dict

import urllib3
from kubernetes import client, config
from kubernetes.client.exceptions import ApiException
from onapsdk.configuration import settings

from onaptests.steps.base import BaseStep
from onaptests.utils.exceptions import OnapTestException


class ExposeServiceNodePortStep(BaseStep):
    """Expose Service NodePort."""

    def __init__(self, component: str, service_name: str, port: int, node_port: int) -> None:
        """Initialize step."""
        super().__init__(cleanup=settings.CLEANUP_FLAG)
        self.component_value = component
        self.service_name = service_name
        self.port = port
        self.node_port = node_port
        self.k8s_client: client.CoreV1Api = None

    @property
    def component(self) -> str:
        return self.component_value

    @property
    def description(self) -> str:
        """Step description."""
        return "Expose service NodePort."

    def is_service_node_port_type(self) -> bool:
        """Check if service type is 'NodePort'

        Raises:
            OnapTestException: Kubernetes API error

        Returns:
            bool: True if service type is 'NodePort', False otherwise

        """
        try:
            service_data: Dict[str, Any] = self.k8s_client.read_namespaced_service(
                self.service_name,
                settings.K8S_TESTS_NAMESPACE
            )
            return service_data.spec.type == "NodePort"
        except ApiException as exc:
            self._logger.exception("Kubernetes API exception")
            raise OnapTestException from exc

    @BaseStep.store_state
    def execute(self) -> None:
        """Expose services ports using kubernetes client.

        Use settings values:
         - K8S_CONFIG,
         - K8S_TESTS_NAMESPACE.
         - EXPOSE_SERVICES_NODE_PORTS

        """
        super().execute()
        if settings.IN_CLUSTER:
            config.load_incluster_config()
        else:
            config.load_kube_config(config_file=settings.K8S_CONFIG)
        self.k8s_client: client.CoreV1Api = client.CoreV1Api()
        if not self.is_service_node_port_type():
            try:
                self.k8s_client.patch_namespaced_service(
                    self.service_name,
                    settings.K8S_TESTS_NAMESPACE,
                    {"spec": {"ports": [{"port": self.port,
                                         "nodePort": self.node_port}],
                              "type": "NodePort"}}
                )
            except ApiException as exc:
                self._logger.exception("Kubernetes API exception")
                raise OnapTestException from exc
            except urllib3.exceptions.HTTPError as exc:
                self._logger.exception("Can't connect with k8s")
                raise OnapTestException from exc
        else:
            self._logger.debug("Service already patched, skip")

    @BaseStep.store_state(cleanup=True)
    def cleanup(self) -> None:
        """Step cleanup.

        Restore service.

        """
        if self.is_service_node_port_type():
            try:
                self.k8s_client.patch_namespaced_service(
                    self.service_name,
                    settings.K8S_TESTS_NAMESPACE,
                    [
                        {
                            "op": "remove",
                            "path": "/spec/ports/0/nodePort"
                        },
                        {
                            "op": "replace",
                            "path": "/spec/type",
                            "value": "ClusterIP"
                        }
                    ]
                )
            except ApiException as exc:
                self._logger.exception("Kubernetes API exception")
                raise OnapTestException from exc
            except urllib3.exceptions.HTTPError as exc:
                self._logger.exception("Can't connect with k8s")
                raise OnapTestException from exc
        else:
            self._logger.debug("Service is not 'NodePort' type, skip")
        return super().cleanup()