blob: ef958763b6106f1284709b94aaa9fbd579d08266 (
plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
|
from onapsdk.configuration import settings
from onapsdk.msb.k8s import ConnectivityInfo
from ..base import BaseStep
class K8SConnectivityInfoStep(BaseStep):
"""CreateConnnectivityInfoStep."""
@BaseStep.store_state
def execute(self):
"""Creation k8s connectivity information
Use settings values:
- CLOUD_REGION_ID,
- CLOUD_REGION_CLOUD_OWNER,
- K8S_KUBECONFIG_FILE.
"""
super().execute()
######## Create Connectivity Info #########################################
try:
ConnectivityInfo.get_connectivity_info_by_region_id(settings.CLOUD_REGION_ID)
except ValueError:
ConnectivityInfo.create(settings.CLOUD_REGION_ID,
settings.CLOUD_REGION_CLOUD_OWNER,
open(settings.K8S_KUBECONFIG_FILE, 'rb').read())
def cleanup(self) -> None:
"""Cleanup K8S Connectivity information.
"""
self._logger.info("*Clean the k8s connectivity information *")
super().cleanup()
connectinfo = ConnectivityInfo.get_connectivity_info_by_region_id(settings.CLOUD_REGION_ID)
connectinfo.delete()
|