blob: a492479e23e7156003b26d30e9ada21f3a964a0e (
plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
|
"""Connectivity info creation module."""
from onapsdk.configuration import settings
from onapsdk.exceptions import APIError
from onapsdk.k8s import ConnectivityInfo
from ..base import BaseStep
class K8SConnectivityInfoStep(BaseStep):
"""CreateConnnectivityInfoStep."""
@property
def description(self) -> str:
"""Step description."""
return "Create K8S connectivity info."
@property
def component(self) -> str:
"""Component name."""
return "K8S plugin"
@BaseStep.store_state
def execute(self):
"""Creation k8s connectivity information.
Use settings values:
- CLOUD_REGION_ID,
- CLOUD_REGION_CLOUD_OWNER,
- K8S_CONFIG.
"""
super().execute()
######## Create Connectivity Info #########################################
try:
self._logger.info("Check if k8s connectivity information exists")
ConnectivityInfo.get_connectivity_info_by_region_id(settings.CLOUD_REGION_ID)
except APIError:
self._logger.info("Create the k8s connectivity information")
ConnectivityInfo.create(settings.CLOUD_REGION_ID,
settings.CLOUD_REGION_CLOUD_OWNER,
open(settings.K8S_CONFIG, 'rb').read())
@BaseStep.store_state(cleanup=True)
def cleanup(self) -> None:
"""Cleanup K8S Connectivity information."""
self._logger.info("Clean the k8s connectivity information")
super().cleanup()
connectinfo = ConnectivityInfo.get_connectivity_info_by_region_id(settings.CLOUD_REGION_ID)
connectinfo.delete()
|