aboutsummaryrefslogtreecommitdiffstats
path: root/src/onapsdk/aai/bulk.py
blob: 1f676c0b8218164a740a4179b4de825f1fd5f10b (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
"""A&AI bulk module."""
#   Copyright 2022 Orange, Deutsche Telekom AG
#
#   Licensed under the Apache License, Version 2.0 (the "License");
#   you may not use this file except in compliance with the License.
#   You may obtain a copy of the License at
#
#       http://www.apache.org/licenses/LICENSE-2.0
#
#   Unless required by applicable law or agreed to in writing, software
#   distributed under the License is distributed on an "AS IS" BASIS,
#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#   See the License for the specific language governing permissions and
#   limitations under the License.

from dataclasses import dataclass
from re import compile as re_compile, Match, Pattern
from typing import Any, Dict, Iterable, List, Optional, TYPE_CHECKING

from more_itertools import chunked

from onapsdk.configuration import settings
from onapsdk.exceptions import APIError
from onapsdk.utils.jinja import jinja_env

from .aai_element import AaiElement

if TYPE_CHECKING:
    from jinja2 import Template


@dataclass
class AaiBulkRequest:
    """Class to store information about a request to be sent in A&AI bulk request."""

    action: str
    uri: str
    body: Dict[Any, Any]


@dataclass
class AaiBulkResponse:
    """Class to store A&AI bulk response."""

    action: str
    uri: str
    status_code: int
    body: str


class AaiBulk(AaiElement):
    """A&AI bulk class.

    Use it to send bulk request to A&AI. With bulk request you can send
        multiple requests at once.
    """

    BULK_TEMPLATE = "aai_bulk.json.j2"
    FIRST_REGEX_GROUP_NAME = "index1"
    SECOND_REGEX_GROUP_NAME = "index2"
    OPERATION_INDEX_REGEX = (fr".*((Error with operation (?P<{FIRST_REGEX_GROUP_NAME}>\d+))|"
                             fr"(Operation (?P<{SECOND_REGEX_GROUP_NAME}>\d+) with action))")

    def __init__(self,
                 chunk_size: int = settings.AAI_BULK_CHUNK) -> None:
        """Init AAI bulk class.

        Args:
            chunk_size (int, optional): How many operations are going to be send with one request.
                Defaults to settings.AAI_BULK_CHUNK.
        """
        super().__init__()
        self.chunk_size: int = chunk_size
        self._jinja_template: Optional["Template"] = None
        self._failed_requests: Optional[List[AaiBulkRequest]] = None
        self._operation_index_regex: Pattern = re_compile(self.OPERATION_INDEX_REGEX)

    @property
    def url(self) -> str:
        """Bulk url.

        Returns:
            str: A&AI bulk API url.

        """
        return f"{self.base_url}{self.api_version}/bulk"

    @property
    def jinja_template(self) -> "Template":
        """Jinja template propery.

        As we are reusing same template multiple times it's better to load it once.

        Returns:
            Template: Template of A&AI bulk request body

        """
        if not self._jinja_template:
            self._jinja_template = jinja_env().get_template(self.BULK_TEMPLATE)
        return self._jinja_template

    @property
    def single_transaction_url(self) -> str:
        """Single transaction url.

        Returns:
            str: A&AI bulk single transaction url.
        """
        return f"{self.url}/single-transaction"

    @property
    def failed_requests(self) -> List[AaiBulkRequest]:
        """Collection of failed requests.

        If user decide to retry bulk without failing request then they are
            stored in given collection for logging/debugging purposes.

        Returns:
            List[AaiBulkRequest]: List of failing bulk requests

        """
        if not self._failed_requests:
            return []
        return self._failed_requests

    def _add_failed_request(self, failed_request: AaiBulkRequest) -> None:
        """Add failed request into internal `failed_requests` collection.

        Args:
            failed_request (AaiBulkRequest): Request which failed

        """
        if not self._failed_requests:
            self._failed_requests = []
        self._failed_requests.append(failed_request)

    def _send_single_transaction_request(self,
                                         aai_requests: List[AaiBulkResponse]
                                         ) -> Iterable[AaiBulkResponse]:
        """Send single transaction request.

        Using send_message_json send chunk of requests to A&AI

        Args:
            aai_requests (List[AaiBulkResponse]): List of requests to be sent

        Yields:
            AaiBulkResponse: Response for each bulk request

        """
        if not aai_requests:
            self._logger.info("No operations to send, abort")
            return
        for response in self.send_message_json(
            "POST",
            "Send bulk A&AI request",
            self.single_transaction_url,
            data=self.jinja_template.render(operations=aai_requests)
        )["operation-responses"]:
            yield AaiBulkResponse(
                action=response["action"],
                uri=response["uri"],
                status_code=response["response-status-code"],
                body=response["response-body"]
            )

    def _get_failed_operation_index(self, failed_response_body: Optional[str]) -> int:
        """Get index of an operation which failed.

        Using regular expressions we are able to read an index of request which we sent
            and failed. Thanks to that we would be able to debug it, remove it and try to
            rerun whole bulk request.

        Args:
            failed_response_body (Optional[str]): Body of failed A&AI bulk request

        Returns:
            int: Index of a request which failed. -1 if regular expression didn't find
                any match

        """
        if not failed_response_body:
            return -1
        match: Match = self._operation_index_regex.match(failed_response_body)
        if not match:
            return -1
        groupsdict: Dict[str, Any] = match.groupdict()
        if groupsdict[self.FIRST_REGEX_GROUP_NAME]:
            str_index: str = groupsdict[self.FIRST_REGEX_GROUP_NAME]
        else:
            str_index = groupsdict[self.SECOND_REGEX_GROUP_NAME]
        return int(str_index)

    def _send_chunk(self, aai_requests: List[AaiBulkRequest],
                    remove_failed_operation_on_failure: bool = True) -> Iterable[AaiBulkResponse]:
        """Send a bulk requests chunk.

        If it failed and `remove_failed_operation_on_failure` is set
            then try to find which bulk request is failing, remove it and retry.

        Args:
            aai_requests (List[AaiBulkRequest]): List of requests to send
            remove_failed_operation_on_failure (bool, optional): Flag to determine if
                find failing request, remove it and retry. Defaults to True.

        Yields:
            AaiBulkResponse: Response for each bulk request

        """
        try:
            yield from self._send_single_transaction_request(aai_requests)
        except APIError as api_error:
            if not remove_failed_operation_on_failure:
                raise
            operation_index: int = self._get_failed_operation_index(api_error.response_text)
            if operation_index < 0:
                self._logger.error("Wanted to remove failing bulk operation, "
                                   "but there is no index on API response, "
                                   "probably it's an A&AI error!")
                raise
            self._add_failed_request(aai_requests.pop(operation_index))
            yield from self._send_chunk(aai_requests, remove_failed_operation_on_failure)

    def single_transaction(self,
                           aai_requests: Iterable[AaiBulkRequest],
                           remove_failed_operation_on_failure: bool = True
                           ) -> Iterable[AaiBulkResponse]:
        """Send aai requests using A&AI single transaction API.

        Args:
            aai_requests (List[AaiBulkRequest]): List of requests to send
            remove_failed_operation_on_failure (bool, optional): Flag to determine if
                find failing request, remove it and retry. Defaults to True.

        Yields:
            AaiBulkResponse: Response for each bulk request

        """
        for requests_chunk in chunked(aai_requests, self.chunk_size):
            yield from self._send_chunk(requests_chunk, remove_failed_operation_on_failure)