aboutsummaryrefslogtreecommitdiffstats
path: root/src/onapsdk/msb/k8s/connectivity_info.py
blob: 71a43c13d4e69c25bbed5994860419d54b11efcc (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
"""Connectivity-Info module."""
#   Copyright 2022 Orange, Deutsche Telekom AG
#
#   Licensed under the Apache License, Version 2.0 (the "License");
#   you may not use this file except in compliance with the License.
#   You may obtain a copy of the License at
#
#       http://www.apache.org/licenses/LICENSE-2.0
#
#   Unless required by applicable law or agreed to in writing, software
#   distributed under the License is distributed on an "AS IS" BASIS,
#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#   See the License for the specific language governing permissions and
#   limitations under the License.
from onapsdk.utils.jinja import jinja_env
from ..msb_service import MSB


class ConnectivityInfo(MSB):
    """Connectivity-Info class."""

    api_version = "/api/multicloud-k8s/v1/v1"
    url = f"{MSB.base_url}{api_version}/connectivity-info"

    def __init__(self, cloud_region_id: str,
                 cloud_owner: str,
                 other_connectivity_list: dict,
                 kubeconfig: str) -> None:
        """Connectivity-info object initialization.

        Args:
            cloud_region_id (str): Cloud region ID
            cloud_owner (str): Cloud owner name
            other_connectivity_list (dict): Optional other connectivity list
            kubeconfig (str): kubernetes cluster kubeconfig
        """
        super().__init__()
        self.cloud_region_id: str = cloud_region_id
        self.cloud_owner: str = cloud_owner
        self.other_connectivity_list: dict = other_connectivity_list
        self.kubeconfig: str = kubeconfig

    @classmethod
    def get_connectivity_info_by_region_id(cls, cloud_region_id: str) -> "ConnectivityInfo":
        """Get connectivity-info by its name (cloud region id).

        Args:
            cloud_region_id (str): Cloud region ID

        Returns:
            ConnectivityInfo: Connectivity-Info object

        """
        url: str = f"{cls.url}/{cloud_region_id}"
        connectivity_info: dict = cls.send_message_json(
            "GET",
            "Get Connectivity Info",
            url
        )
        return cls(
            connectivity_info["cloud-region"],
            connectivity_info["cloud-owner"],
            connectivity_info.get("other-connectivity-list"),
            connectivity_info["kubeconfig"]
        )

    def delete(self) -> None:
        """Delete connectivity info."""
        url: str = f"{self.url}/{self.cloud_region_id}"
        self.send_message(
            "DELETE",
            "Delete Connectivity Info",
            url
        )

    @classmethod
    def create(cls,
               cloud_region_id: str,
               cloud_owner: str,
               kubeconfig: bytes = None) -> "ConnectivityInfo":
        """Create Connectivity Info.

        Args:
            cloud_region_id (str): Cloud region ID
            cloud_owner (str): Cloud owner name
            kubeconfig (bytes): kubernetes cluster kubeconfig file

        Returns:
            ConnectivityInfo: Created object

        """
        json_file = jinja_env().get_template("multicloud_k8s_add_connectivity_info.json.j2").render(
            cloud_region_id=cloud_region_id,
            cloud_owner=cloud_owner
        )
        url: str = f"{cls.url}"
        cls.send_message(
            "POST",
            "Create Connectivity Info",
            url,
            files={"file": kubeconfig,
                   "metadata": (None, json_file)},
            headers={}
        )
        return cls.get_connectivity_info_by_region_id(cloud_region_id)