From f2adf542e878c96895210f97ebf1ebb763b2f465 Mon Sep 17 00:00:00 2001 From: Michal Jagiello Date: Mon, 17 Oct 2022 12:46:49 +0000 Subject: Release ONAP SDK Issue-ID: INT-2150 Signed-off-by: Michal Jagiello Change-Id: I650047c599a5aae6de7c6b42d38e34aea88578e2 --- src/onapsdk/msb/k8s/instance.py | 190 ++++++++++++++++++++++++++++++++++++++++ 1 file changed, 190 insertions(+) create mode 100644 src/onapsdk/msb/k8s/instance.py (limited to 'src/onapsdk/msb/k8s/instance.py') diff --git a/src/onapsdk/msb/k8s/instance.py b/src/onapsdk/msb/k8s/instance.py new file mode 100644 index 0000000..196b9d2 --- /dev/null +++ b/src/onapsdk/msb/k8s/instance.py @@ -0,0 +1,190 @@ +"""Instantiation module.""" +# Copyright 2022 Orange, Deutsche Telekom AG +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +from typing import Iterator +from dataclasses import dataclass + +from onapsdk.msb import MSB +from onapsdk.utils.jinja import jinja_env + + +# pylint: disable=too-many-arguments +@dataclass +class InstantiationRequest: + """Instantiation Request class.""" + + def __init__(self, request: dict) -> None: + """Request object initialization. + + Args: + cloud_region_id (str): Cloud region ID + profile_name (str): Name of profile + rb_name (str): Definition name + rb_version (str): Definition version + override_values (dict): Optional parameters + labels (dict): Optional labels + """ + super().__init__() + self.cloud_region_id: str = request["cloud-region"] + self.profile_name: str = request["profile-name"] + self.rb_name: str = request["rb-name"] + self.rb_version: str = request["rb-version"] + self.override_values: dict = request["override-values"] + self.labels: dict = request["labels"] + + +@dataclass +class InstantiationParameter: + """Class to store instantiation parameters used to pass override_values and labels. + + Contains two values: name of parameter and it's value + """ + + name: str + value: str + + +class Instance(MSB): + """Instance class.""" + + base_url = f"{MSB.base_url}/api/multicloud-k8s/v1/v1/instance" + + def __init__(self, instance_id: str, + namespace: str, + request: InstantiationRequest, + resources: dict = None, + override_values: dict = None) -> None: + """Instance object initialization. + + Args: + instance_id (str): instance ID + namespace (str): namespace that instance is created in + request (InstantiationRequest): datails of the instantiation request + resources (dict): Created resources + override_values (dict): Optional values + """ + super().__init__() + self.instance_id: str = instance_id + self.namespace: str = namespace + self.request: InstantiationRequest = request + self.resources: dict = resources + self.override_values: dict = override_values + + @property + def url(self) -> str: + """URL address. + + Returns: + str: URL to Instance + + """ + return f"{self.base_url}/{self.instance_id}" + + @classmethod + def get_all(cls) -> Iterator["Instance"]: + """Get all instantiated Kubernetes resources. + + Yields: + Instantiation: Instantiation object + + """ + for resource in cls.send_message_json("GET", + "Get Kubernetes resources", + cls.base_url): + yield cls( + instance_id=resource["id"], + namespace=resource["namespace"], + request=InstantiationRequest(resource["request"]) + ) + + @classmethod + def get_by_id(cls, instance_id: str) -> "Instance": + """Get Kubernetes resource by id. + + Args: + instance_id (str): instance ID + + Returns: + Instantiation: Instantiation object + + """ + url: str = f"{cls.base_url}/{instance_id}" + resource: dict = cls.send_message_json( + "GET", + "Get Kubernetes resource by id", + url + ) + return cls( + instance_id=resource["id"], + namespace=resource["namespace"], + request=InstantiationRequest(resource["request"]), + resources=resource["resources"], + override_values=resource.get("override-values") + ) + + @classmethod + def create(cls, + cloud_region_id: str, + profile_name: str, + rb_name: str, + rb_version: str, + override_values: dict = None, + labels: dict = None) -> "Instance": + """Create Instance. + + Args: + cloud_region_id (str): Cloud region ID + profile_name (str): Name of profile to be instantiated + rb_name: (bytes): Definition name + rb_version (str): Definition version + override_values (dict): List of optional override values + labels (dict): List of optional labels + + Returns: + Instance: Created object + + """ + if labels is None: + labels = {} + if override_values is None: + override_values = {} + url: str = f"{cls.base_url}" + response: dict = cls.send_message_json( + "POST", + "Create Instance", + url, + data=jinja_env().get_template("multicloud_k8s_instantiate.json.j2").render( + cloud_region_id=cloud_region_id, + profile_name=profile_name, + rb_name=rb_name, + rb_version=rb_version, + override_values=override_values, + labels=labels), + headers={} + ) + return cls( + instance_id=response["id"], + namespace=response["namespace"], + request=InstantiationRequest(response["request"]), + resources=response["resources"], + override_values=response.get("override-values") + ) + + def delete(self) -> None: + """Delete Instance object.""" + self.send_message( + "DELETE", + f"Delete {self.instance_id} instance", + self.url + ) -- cgit 1.2.3-korg