aboutsummaryrefslogtreecommitdiffstats
path: root/src/onaptests/steps/cloud/k8s_connectivity_info_create.py
blob: a492479e23e7156003b26d30e9ada21f3a964a0e (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
"""Connectivity info creation module."""
from onapsdk.configuration import settings
from onapsdk.exceptions import APIError
from onapsdk.k8s import ConnectivityInfo

from ..base import BaseStep

class K8SConnectivityInfoStep(BaseStep):
    """CreateConnnectivityInfoStep."""

    @property
    def description(self) -> str:
        """Step description."""
        return "Create K8S connectivity info."

    @property
    def component(self) -> str:
        """Component name."""
        return "K8S plugin"

    @BaseStep.store_state
    def execute(self):
        """Creation k8s connectivity information.

        Use settings values:
         - CLOUD_REGION_ID,
         - CLOUD_REGION_CLOUD_OWNER,
         - K8S_CONFIG.
        """
        super().execute()
        ######## Create Connectivity Info #########################################
        try:
            self._logger.info("Check if k8s connectivity information exists")
            ConnectivityInfo.get_connectivity_info_by_region_id(settings.CLOUD_REGION_ID)
        except APIError:
            self._logger.info("Create the k8s connectivity information")
            ConnectivityInfo.create(settings.CLOUD_REGION_ID,
                                    settings.CLOUD_REGION_CLOUD_OWNER,
                                    open(settings.K8S_CONFIG, 'rb').read())

    @BaseStep.store_state(cleanup=True)
    def cleanup(self) -> None:
        """Cleanup K8S Connectivity information."""
        self._logger.info("Clean the k8s connectivity information")
        super().cleanup()
        connectinfo = ConnectivityInfo.get_connectivity_info_by_region_id(settings.CLOUD_REGION_ID)
        connectinfo.delete()