From 00c91ce35acabc66cb781482b9dfbff20e6a9b62 Mon Sep 17 00:00:00 2001 From: Michal Jagiello Date: Wed, 10 Jan 2024 14:16:29 +0100 Subject: Refactor bulk class Find failing requests and retry bulk without them Issue-ID: TEST-404 Change-Id: Ia60f7761bcee99bcb6d0af0381db267d259e77a2 Signed-off-by: Michal Jagiello --- src/onapsdk/aai/bulk.py | 196 ++++++++++++++++++++++++++++++++++++++------ src/onapsdk/exceptions.py | 24 +++++- src/onapsdk/onap_service.py | 10 ++- tests/test_aai_bulk.py | 165 ++++++++++++++++++++++++++++++++++++- 4 files changed, 364 insertions(+), 31 deletions(-) diff --git a/src/onapsdk/aai/bulk.py b/src/onapsdk/aai/bulk.py index 435a0b4..1f676c0 100644 --- a/src/onapsdk/aai/bulk.py +++ b/src/onapsdk/aai/bulk.py @@ -14,15 +14,20 @@ # limitations under the License. from dataclasses import dataclass -from typing import Any, Dict, Iterable +from re import compile as re_compile, Match, Pattern +from typing import Any, Dict, Iterable, List, Optional, TYPE_CHECKING from more_itertools import chunked from onapsdk.configuration import settings +from onapsdk.exceptions import APIError from onapsdk.utils.jinja import jinja_env from .aai_element import AaiElement +if TYPE_CHECKING: + from jinja2 import Template + @dataclass class AaiBulkRequest: @@ -50,6 +55,26 @@ class AaiBulk(AaiElement): multiple requests at once. """ + BULK_TEMPLATE = "aai_bulk.json.j2" + FIRST_REGEX_GROUP_NAME = "index1" + SECOND_REGEX_GROUP_NAME = "index2" + OPERATION_INDEX_REGEX = (fr".*((Error with operation (?P<{FIRST_REGEX_GROUP_NAME}>\d+))|" + fr"(Operation (?P<{SECOND_REGEX_GROUP_NAME}>\d+) with action))") + + def __init__(self, + chunk_size: int = settings.AAI_BULK_CHUNK) -> None: + """Init AAI bulk class. + + Args: + chunk_size (int, optional): How many operations are going to be send with one request. + Defaults to settings.AAI_BULK_CHUNK. + """ + super().__init__() + self.chunk_size: int = chunk_size + self._jinja_template: Optional["Template"] = None + self._failed_requests: Optional[List[AaiBulkRequest]] = None + self._operation_index_regex: Pattern = re_compile(self.OPERATION_INDEX_REGEX) + @property def url(self) -> str: """Bulk url. @@ -60,31 +85,156 @@ class AaiBulk(AaiElement): """ return f"{self.base_url}{self.api_version}/bulk" - @classmethod - def single_transaction(cls, aai_requests: Iterable[AaiBulkRequest])\ - -> Iterable[AaiBulkResponse]: - """Singe transaction bulk request. + @property + def jinja_template(self) -> "Template": + """Jinja template propery. + + As we are reusing same template multiple times it's better to load it once. + + Returns: + Template: Template of A&AI bulk request body + + """ + if not self._jinja_template: + self._jinja_template = jinja_env().get_template(self.BULK_TEMPLATE) + return self._jinja_template + + @property + def single_transaction_url(self) -> str: + """Single transaction url. + + Returns: + str: A&AI bulk single transaction url. + """ + return f"{self.url}/single-transaction" + + @property + def failed_requests(self) -> List[AaiBulkRequest]: + """Collection of failed requests. + + If user decide to retry bulk without failing request then they are + stored in given collection for logging/debugging purposes. + + Returns: + List[AaiBulkRequest]: List of failing bulk requests + + """ + if not self._failed_requests: + return [] + return self._failed_requests + + def _add_failed_request(self, failed_request: AaiBulkRequest) -> None: + """Add failed request into internal `failed_requests` collection. + + Args: + failed_request (AaiBulkRequest): Request which failed + + """ + if not self._failed_requests: + self._failed_requests = [] + self._failed_requests.append(failed_request) + + def _send_single_transaction_request(self, + aai_requests: List[AaiBulkResponse] + ) -> Iterable[AaiBulkResponse]: + """Send single transaction request. + + Using send_message_json send chunk of requests to A&AI + + Args: + aai_requests (List[AaiBulkResponse]): List of requests to be sent + + Yields: + AaiBulkResponse: Response for each bulk request + + """ + if not aai_requests: + self._logger.info("No operations to send, abort") + return + for response in self.send_message_json( + "POST", + "Send bulk A&AI request", + self.single_transaction_url, + data=self.jinja_template.render(operations=aai_requests) + )["operation-responses"]: + yield AaiBulkResponse( + action=response["action"], + uri=response["uri"], + status_code=response["response-status-code"], + body=response["response-body"] + ) + + def _get_failed_operation_index(self, failed_response_body: Optional[str]) -> int: + """Get index of an operation which failed. + + Using regular expressions we are able to read an index of request which we sent + and failed. Thanks to that we would be able to debug it, remove it and try to + rerun whole bulk request. + + Args: + failed_response_body (Optional[str]): Body of failed A&AI bulk request + + Returns: + int: Index of a request which failed. -1 if regular expression didn't find + any match + + """ + if not failed_response_body: + return -1 + match: Match = self._operation_index_regex.match(failed_response_body) + if not match: + return -1 + groupsdict: Dict[str, Any] = match.groupdict() + if groupsdict[self.FIRST_REGEX_GROUP_NAME]: + str_index: str = groupsdict[self.FIRST_REGEX_GROUP_NAME] + else: + str_index = groupsdict[self.SECOND_REGEX_GROUP_NAME] + return int(str_index) + + def _send_chunk(self, aai_requests: List[AaiBulkRequest], + remove_failed_operation_on_failure: bool = True) -> Iterable[AaiBulkResponse]: + """Send a bulk requests chunk. + + If it failed and `remove_failed_operation_on_failure` is set + then try to find which bulk request is failing, remove it and retry. + + Args: + aai_requests (List[AaiBulkRequest]): List of requests to send + remove_failed_operation_on_failure (bool, optional): Flag to determine if + find failing request, remove it and retry. Defaults to True. + + Yields: + AaiBulkResponse: Response for each bulk request + + """ + try: + yield from self._send_single_transaction_request(aai_requests) + except APIError as api_error: + if not remove_failed_operation_on_failure: + raise + operation_index: int = self._get_failed_operation_index(api_error.response_text) + if operation_index < 0: + self._logger.error("Wanted to remove failing bulk operation, " + "but there is no index on API response, " + "probably it's an A&AI error!") + raise + self._add_failed_request(aai_requests.pop(operation_index)) + yield from self._send_chunk(aai_requests, remove_failed_operation_on_failure) + + def single_transaction(self, + aai_requests: Iterable[AaiBulkRequest], + remove_failed_operation_on_failure: bool = True + ) -> Iterable[AaiBulkResponse]: + """Send aai requests using A&AI single transaction API. Args: - aai_requests (Iterable[AaiBulkRequest]): Iterable object of requests to be sent - as a bulk request. + aai_requests (List[AaiBulkRequest]): List of requests to send + remove_failed_operation_on_failure (bool, optional): Flag to determine if + find failing request, remove it and retry. Defaults to True. Yields: - Iterator[Iterable[AaiBulkResponse]]: Bulk request responses. Each object - correspond to the sent request. + AaiBulkResponse: Response for each bulk request """ - for requests_chunk in chunked(aai_requests, settings.AAI_BULK_CHUNK): - for response in cls.send_message_json(\ - "POST",\ - "Send bulk A&AI request",\ - f"{cls.base_url}{cls.api_version}/bulk/single-transaction",\ - data=jinja_env().get_template(\ - "aai_bulk.json.j2").render(operations=requests_chunk)\ - )["operation-responses"]: - yield AaiBulkResponse( - action=response["action"], - uri=response["uri"], - status_code=response["response-status-code"], - body=response["response-body"] - ) + for requests_chunk in chunked(aai_requests, self.chunk_size): + yield from self._send_chunk(requests_chunk, remove_failed_operation_on_failure) diff --git a/src/onapsdk/exceptions.py b/src/onapsdk/exceptions.py index 76af60e..a9977fd 100644 --- a/src/onapsdk/exceptions.py +++ b/src/onapsdk/exceptions.py @@ -33,7 +33,8 @@ class APIError(RequestError): def __init__(self, message: Optional[str] = None, - response_status_code: Optional[int] = None) -> None: + response_status_code: Optional[int] = None, + response_text: Optional[str] = None) -> None: """Init api error exception. Save message and optional response status code. @@ -48,6 +49,7 @@ class APIError(RequestError): else: super().__init__() self._response_status_code: int = response_status_code if response_status_code else 0 + self._response_text: Optional[str] = response_text @property def response_status_code(self) -> int: @@ -69,6 +71,26 @@ class APIError(RequestError): """ self._response_status_code = status_code + @property + def response_text(self) -> Optional[str]: + """Response status code property. + + Returns: + int: Response status code. If not set, returns 0 + + """ + return self._response_text + + @response_text.setter + def response_text(self, response_text: str) -> None: + """Response status code property setter. + + Args: + status_code (int): Response status code + + """ + self._response_text = response_text + class InvalidResponse(RequestError): """Unable to decode response.""" diff --git a/src/onapsdk/onap_service.py b/src/onapsdk/onap_service.py index 06c358c..a219f5a 100644 --- a/src/onapsdk/onap_service.py +++ b/src/onapsdk/onap_service.py @@ -175,11 +175,13 @@ class OnapService(ABC): msg = f'Code: {cause.response.status_code}. Info: {cause.response.text}.' if cause.response.status_code == 404: - exc = ResourceNotFound(msg) + exc = ResourceNotFound(msg, + response_status_code=cause.response.status_code, + response_text=cause.response.text) else: - exc = APIError(msg) - - exc.response_status_code = cause.response.status_code + exc = APIError(msg, + response_status_code=cause.response.status_code, + response_text=cause.response.text) raise exc from cause diff --git a/tests/test_aai_bulk.py b/tests/test_aai_bulk.py index 0567055..880cc5c 100644 --- a/tests/test_aai_bulk.py +++ b/tests/test_aai_bulk.py @@ -12,8 +12,10 @@ # See the License for the specific language governing permissions and # limitations under the License. from unittest import mock +from pytest import raises from onapsdk.aai.bulk import AaiBulk, AaiBulkRequest, AaiBulkResponse +from onapsdk.exceptions import APIError BULK_RESPONSES = { @@ -38,7 +40,7 @@ BULK_RESPONSES = { def test_aai_bulk(mock_send_message_json): assert AaiBulk().url.endswith("bulk") mock_send_message_json.return_value = BULK_RESPONSES - responses = list(AaiBulk.single_transaction( + responses = list(AaiBulk().single_transaction( [ AaiBulkRequest( action="post", @@ -65,7 +67,7 @@ def test_aai_bulk(mock_send_message_json): # Check if requests was splitted into chunks for generator mock_send_message_json.reset_mock() - responses = list(AaiBulk.single_transaction( + responses = list(AaiBulk().single_transaction( ( AaiBulkRequest( action="post", @@ -78,7 +80,7 @@ def test_aai_bulk(mock_send_message_json): # Check if requests was splitted into chunks for list mock_send_message_json.reset_mock() - responses = list(AaiBulk.single_transaction( + responses = list(AaiBulk().single_transaction( [ AaiBulkRequest( action="post", @@ -88,3 +90,160 @@ def test_aai_bulk(mock_send_message_json): ] )) assert mock_send_message_json.call_count == 2 + + mock_send_message_json.reset_mock() + responses = list(AaiBulk().single_transaction([])) + assert mock_send_message_json.call_count == 0 + + +@mock.patch("onapsdk.aai.bulk.AaiBulk.send_message_json") +def test_aai_bulk_retry_and_remove_first_which_is_failing(mock_send_message_json): + mock_send_message_json.side_effect = [APIError(response_text="Error with operation 0"), BULK_RESPONSES] + aai_bulk = AaiBulk() + assert len(aai_bulk.failed_requests) == 0 + list(aai_bulk.single_transaction([ + AaiBulkRequest( + action="post", + uri="test-uri", + body={"blabla: blabla"} + ), + AaiBulkRequest( + action="get", + uri="test-uri", + body={} + )])) + assert mock_send_message_json.call_count == 2 + assert len(aai_bulk.failed_requests) == 1 + assert aai_bulk.failed_requests[0].action == "post" + assert aai_bulk.failed_requests[0].uri == "test-uri" + assert aai_bulk.failed_requests[0].body == {"blabla: blabla"} + + +@mock.patch("onapsdk.aai.bulk.AaiBulk.send_message_json") +def test_aai_bulk_retry_and_remove_second_which_is_failing(mock_send_message_json): + mock_send_message_json.side_effect = [APIError(response_text="Error with operation 1"), BULK_RESPONSES] + aai_bulk = AaiBulk() + list(aai_bulk.single_transaction([ + AaiBulkRequest( + action="post", + uri="test-uri", + body={"blabla: blabla"} + ), + AaiBulkRequest( + action="get", + uri="test-uri", + body={} + )])) + assert mock_send_message_json.call_count == 2 + assert len(aai_bulk.failed_requests) == 1 + assert aai_bulk.failed_requests[0].action == "get" + assert aai_bulk.failed_requests[0].uri == "test-uri" + assert aai_bulk.failed_requests[0].body == {} + + +@mock.patch("onapsdk.aai.bulk.AaiBulk.send_message_json") +def test_aai_bulk_retry_and_remove_both_which_are_failing_reverse_order(mock_send_message_json): + mock_send_message_json.side_effect = [APIError(response_text="Error with operation 1"), APIError(response_text="Error with operation 0")] + aai_bulk = AaiBulk() + list(aai_bulk.single_transaction([ + AaiBulkRequest( + action="post", + uri="test-uri", + body={"blabla: blabla"} + ), + AaiBulkRequest( + action="get", + uri="test-uri", + body={} + )])) + assert mock_send_message_json.call_count == 2 + assert len(aai_bulk.failed_requests) == 2 + assert aai_bulk.failed_requests[0].action == "get" + assert aai_bulk.failed_requests[0].uri == "test-uri" + assert aai_bulk.failed_requests[0].body == {} + assert aai_bulk.failed_requests[1].action == "post" + assert aai_bulk.failed_requests[1].uri == "test-uri" + assert aai_bulk.failed_requests[1].body == {"blabla: blabla"} + + +@mock.patch("onapsdk.aai.bulk.AaiBulk.send_message_json") +def test_aai_bulk_retry_and_remove_both_which_are_failing_in_order(mock_send_message_json): + mock_send_message_json.side_effect = [APIError(response_text="Error with operation 0"), APIError(response_text="Error with operation 0")] + aai_bulk = AaiBulk() + list(aai_bulk.single_transaction([ + AaiBulkRequest( + action="post", + uri="test-uri", + body={"blabla: blabla"} + ), + AaiBulkRequest( + action="get", + uri="test-uri", + body={} + )])) + assert mock_send_message_json.call_count == 2 + assert len(aai_bulk.failed_requests) == 2 + assert aai_bulk.failed_requests[0].action == "post" + assert aai_bulk.failed_requests[0].uri == "test-uri" + assert aai_bulk.failed_requests[0].body == {"blabla: blabla"} + assert aai_bulk.failed_requests[1].action == "get" + assert aai_bulk.failed_requests[1].uri == "test-uri" + assert aai_bulk.failed_requests[1].body == {} + + +@mock.patch("onapsdk.aai.bulk.AaiBulk.send_message_json") +def test_aai_bulk_parse_invalid_response_text(mock_send_message_json): + mock_send_message_json.side_effect = [APIError(response_text="Invalid response text")] + aai_bulk = AaiBulk() + with raises(APIError): + list(aai_bulk.single_transaction([ + AaiBulkRequest( + action="post", + uri="test-uri", + body={"blabla: blabla"} + ), + AaiBulkRequest( + action="get", + uri="test-uri", + body={} + )])) + + +@mock.patch("onapsdk.aai.bulk.AaiBulk.send_message_json") +def test_aai_bulk_do_not_retry(mock_send_message_json): + mock_send_message_json.side_effect = [APIError(response_text="Error with operation 0")] + aai_bulk = AaiBulk() + with raises(APIError): + list(aai_bulk.single_transaction([ + AaiBulkRequest( + action="post", + uri="test-uri", + body={"blabla: blabla"} + ), + AaiBulkRequest( + action="get", + uri="test-uri", + body={} + )], remove_failed_operation_on_failure=False)) + assert len(aai_bulk.failed_requests) == 0 + assert mock_send_message_json.call_count == 1 + + +def test_get_failed_operation_index(): + aai_bulk = AaiBulk() + def get_formatted_response_error_with_operation(index: int) -> str: + return ('{"requestError":{"serviceException":{"messageId":"SVC3000","text":"Invalid input performing %1 on %2 (msg=%3)' + f' (ec=%4)","variables":["POST","v27/bulk/single-transaction","Invalid input performing %1 on %2:Error with operation {index}: ' + 'Missing required property: physical-location-type,Missing required property: street1,Missing required property: city,' + 'Missing required property: postal-code,Missing required property: country,Missing required property: region","ERR.5.2.3000"]}}}') + def get_formatted_response_not_found(index: int) -> str: + return ('{"requestError":{"serviceException":{"messageId":"SVC3000","text":"Invalid input performing %1 on %2 (msg=%3) ' + f'(ec=%4)","variables":["POST","v27/bulk/single-transaction","Invalid input performing %1 on %2:Operation {index} with action ' + '(DELETE) on uri (/cloud-infrastructure/complexes/complex/test-parse-bulk-response) failed with status code (404), error code ' + '(ERR.5.4.6114) and msg (Node Not Found:No Node of type complex found at: /cloud-infrastructure/complexes/complex/test-parse-bulk-response)","ERR.5.2.3000"]}}}') + assert aai_bulk._get_failed_operation_index(None) == -1 + assert aai_bulk._get_failed_operation_index("Something on what there is no index") == -1 + assert aai_bulk._get_failed_operation_index("There is an index: 0 but it's not a valid string") == -1 + for i in [pow(10, x) for x in range(6)]: + assert aai_bulk._get_failed_operation_index(get_formatted_response_error_with_operation(i)) == i + assert aai_bulk._get_failed_operation_index(get_formatted_response_not_found(i)) == i -- cgit 1.2.3-korg