diff options
Diffstat (limited to 'src/onapsdk/aai/bulk.py')
-rw-r--r-- | src/onapsdk/aai/bulk.py | 90 |
1 files changed, 90 insertions, 0 deletions
diff --git a/src/onapsdk/aai/bulk.py b/src/onapsdk/aai/bulk.py new file mode 100644 index 0000000..435a0b4 --- /dev/null +++ b/src/onapsdk/aai/bulk.py @@ -0,0 +1,90 @@ +"""A&AI bulk module.""" +# Copyright 2022 Orange, Deutsche Telekom AG +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from dataclasses import dataclass +from typing import Any, Dict, Iterable + +from more_itertools import chunked + +from onapsdk.configuration import settings +from onapsdk.utils.jinja import jinja_env + +from .aai_element import AaiElement + + +@dataclass +class AaiBulkRequest: + """Class to store information about a request to be sent in A&AI bulk request.""" + + action: str + uri: str + body: Dict[Any, Any] + + +@dataclass +class AaiBulkResponse: + """Class to store A&AI bulk response.""" + + action: str + uri: str + status_code: int + body: str + + +class AaiBulk(AaiElement): + """A&AI bulk class. + + Use it to send bulk request to A&AI. With bulk request you can send + multiple requests at once. + """ + + @property + def url(self) -> str: + """Bulk url. + + Returns: + str: A&AI bulk API url. + + """ + return f"{self.base_url}{self.api_version}/bulk" + + @classmethod + def single_transaction(cls, aai_requests: Iterable[AaiBulkRequest])\ + -> Iterable[AaiBulkResponse]: + """Singe transaction bulk request. + + Args: + aai_requests (Iterable[AaiBulkRequest]): Iterable object of requests to be sent + as a bulk request. + + Yields: + Iterator[Iterable[AaiBulkResponse]]: Bulk request responses. Each object + correspond to the sent request. + + """ + for requests_chunk in chunked(aai_requests, settings.AAI_BULK_CHUNK): + for response in cls.send_message_json(\ + "POST",\ + "Send bulk A&AI request",\ + f"{cls.base_url}{cls.api_version}/bulk/single-transaction",\ + data=jinja_env().get_template(\ + "aai_bulk.json.j2").render(operations=requests_chunk)\ + )["operation-responses"]: + yield AaiBulkResponse( + action=response["action"], + uri=response["uri"], + status_code=response["response-status-code"], + body=response["response-body"] + ) |