1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
|
# http://www.apache.org/licenses/LICENSE-2.0
"""Expose service NodePort module."""
from typing import Any, Dict
import urllib3
from kubernetes import client, config
from kubernetes.client.exceptions import ApiException
from onapsdk.configuration import settings
from onaptests.steps.base import BaseStep
from onaptests.utils.exceptions import OnapTestException
class ExposeServiceNodePortStep(BaseStep):
"""Expose Service NodePort."""
def __init__(self, component: str, service_name: str, port: int, node_port: int) -> None:
"""Initialize step."""
super().__init__(cleanup=settings.CLEANUP_FLAG)
self.component_value = component
self.service_name = service_name
self.port = port
self.node_port = node_port
if settings.IN_CLUSTER:
config.load_incluster_config()
else:
config.load_kube_config(config_file=settings.K8S_CONFIG)
self.k8s_client: client.CoreV1Api = client.CoreV1Api()
@property
def component(self) -> str:
return self.component_value
@property
def description(self) -> str:
"""Step description."""
return "Expose service NodePort."
def is_service_node_port_type(self) -> bool:
"""Check if service type is 'NodePort'
Raises:
OnapTestException: Kubernetes API error
Returns:
bool: True if service type is 'NodePort', False otherwise
"""
try:
service_data: Dict[str, Any] = self.k8s_client.read_namespaced_service(
self.service_name,
settings.K8S_ONAP_NAMESPACE
)
return service_data.spec.type == "NodePort"
except ApiException:
self._logger.exception("Kubernetes API exception")
raise OnapTestException
@BaseStep.store_state
def execute(self) -> None:
"""Expose services ports using kubernetes client.
Use settings values:
- K8S_CONFIG,
- K8S_ONAP_NAMESPACE.
- EXPOSE_SERVICES_NODE_PORTS
"""
super().execute()
if not self.is_service_node_port_type():
try:
self.k8s_client.patch_namespaced_service(
self.service_name,
settings.K8S_ONAP_NAMESPACE,
{"spec": {"ports": [{"port": self.port,
"nodePort": self.node_port}],
"type": "NodePort"}}
)
except ApiException:
self._logger.exception("Kubernetes API exception")
raise OnapTestException
except urllib3.exceptions.HTTPError:
self._logger.exception("Can't connect with k8s")
raise OnapTestException
else:
self._logger.debug("Service already patched, skip")
def cleanup(self) -> None:
"""Step cleanup.
Restore service.
"""
if self.is_service_node_port_type():
try:
self.k8s_client.patch_namespaced_service(
self.service_name,
settings.K8S_ONAP_NAMESPACE,
[
{
"op": "remove",
"path": "/spec/ports/0/nodePort"
},
{
"op": "replace",
"path": "/spec/type",
"value": "ClusterIP"
}
]
)
except ApiException:
self._logger.exception("Kubernetes API exception")
raise OnapTestException
except urllib3.exceptions.HTTPError:
self._logger.exception("Can't connect with k8s")
raise OnapTestException
else:
self._logger.debug("Service is not 'NodePort' type, skip")
return super().cleanup()
|