aboutsummaryrefslogtreecommitdiffstats
path: root/src/onaptests/steps/cloud/k8s_connectivity_info_create.py
diff options
context:
space:
mode:
Diffstat (limited to 'src/onaptests/steps/cloud/k8s_connectivity_info_create.py')
-rw-r--r--src/onaptests/steps/cloud/k8s_connectivity_info_create.py35
1 files changed, 35 insertions, 0 deletions
diff --git a/src/onaptests/steps/cloud/k8s_connectivity_info_create.py b/src/onaptests/steps/cloud/k8s_connectivity_info_create.py
new file mode 100644
index 0000000..6106d7e
--- /dev/null
+++ b/src/onaptests/steps/cloud/k8s_connectivity_info_create.py
@@ -0,0 +1,35 @@
+from onapsdk.configuration import settings
+from onapsdk.msb.k8s import ConnectivityInfo
+
+from ..base import BaseStep
+
+class K8SConnectivityInfoStep(BaseStep):
+ """CreateConnnectivityInfoStep."""
+
+ @BaseStep.store_state
+ def execute(self):
+ """Creation k8s connectivity information
+
+ Use settings values:
+ - CLOUD_REGION_ID,
+ - CLOUD_REGION_CLOUD_OWNER,
+ - K8S_KUBECONFIG_FILE.
+ """
+ super().execute()
+ ######## Create Connectivity Info #########################################
+ try:
+ self._logger.info("Check if k8s connectivity information exists")
+ ConnectivityInfo.get_connectivity_info_by_region_id(settings.CLOUD_REGION_ID)
+ except ValueError:
+ self._logger.info("Create the k8s connectivity information")
+ ConnectivityInfo.create(settings.CLOUD_REGION_ID,
+ settings.CLOUD_REGION_CLOUD_OWNER,
+ open(settings.K8S_KUBECONFIG_FILE, 'rb').read())
+
+ def cleanup(self) -> None:
+ """Cleanup K8S Connectivity information.
+ """
+ self._logger.info("Clean the k8s connectivity information")
+ super().cleanup()
+ connectinfo = ConnectivityInfo.get_connectivity_info_by_region_id(settings.CLOUD_REGION_ID)
+ connectinfo.delete()