aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorMichal Jagiello <michal.jagiello@t-mobile.pl>2024-01-10 14:16:29 +0100
committerMichal Jagiello <michal.jagiello@t-mobile.pl>2024-01-11 13:28:10 +0100
commit00c91ce35acabc66cb781482b9dfbff20e6a9b62 (patch)
treedf5eb413f7a91ef2cb5d99f104a0e6ce91840d38
parentb161a121e3cd0e981430777706f2d925966c850f (diff)
Refactor bulk class
Find failing requests and retry bulk without them Issue-ID: TEST-404 Change-Id: Ia60f7761bcee99bcb6d0af0381db267d259e77a2 Signed-off-by: Michal Jagiello <michal.jagiello@t-mobile.pl>
-rw-r--r--src/onapsdk/aai/bulk.py196
-rw-r--r--src/onapsdk/exceptions.py24
-rw-r--r--src/onapsdk/onap_service.py10
-rw-r--r--tests/test_aai_bulk.py165
4 files changed, 364 insertions, 31 deletions
diff --git a/src/onapsdk/aai/bulk.py b/src/onapsdk/aai/bulk.py
index 435a0b4..1f676c0 100644
--- a/src/onapsdk/aai/bulk.py
+++ b/src/onapsdk/aai/bulk.py
@@ -14,15 +14,20 @@
# limitations under the License.
from dataclasses import dataclass
-from typing import Any, Dict, Iterable
+from re import compile as re_compile, Match, Pattern
+from typing import Any, Dict, Iterable, List, Optional, TYPE_CHECKING
from more_itertools import chunked
from onapsdk.configuration import settings
+from onapsdk.exceptions import APIError
from onapsdk.utils.jinja import jinja_env
from .aai_element import AaiElement
+if TYPE_CHECKING:
+ from jinja2 import Template
+
@dataclass
class AaiBulkRequest:
@@ -50,6 +55,26 @@ class AaiBulk(AaiElement):
multiple requests at once.
"""
+ BULK_TEMPLATE = "aai_bulk.json.j2"
+ FIRST_REGEX_GROUP_NAME = "index1"
+ SECOND_REGEX_GROUP_NAME = "index2"
+ OPERATION_INDEX_REGEX = (fr".*((Error with operation (?P<{FIRST_REGEX_GROUP_NAME}>\d+))|"
+ fr"(Operation (?P<{SECOND_REGEX_GROUP_NAME}>\d+) with action))")
+
+ def __init__(self,
+ chunk_size: int = settings.AAI_BULK_CHUNK) -> None:
+ """Init AAI bulk class.
+
+ Args:
+ chunk_size (int, optional): How many operations are going to be send with one request.
+ Defaults to settings.AAI_BULK_CHUNK.
+ """
+ super().__init__()
+ self.chunk_size: int = chunk_size
+ self._jinja_template: Optional["Template"] = None
+ self._failed_requests: Optional[List[AaiBulkRequest]] = None
+ self._operation_index_regex: Pattern = re_compile(self.OPERATION_INDEX_REGEX)
+
@property
def url(self) -> str:
"""Bulk url.
@@ -60,31 +85,156 @@ class AaiBulk(AaiElement):
"""
return f"{self.base_url}{self.api_version}/bulk"
- @classmethod
- def single_transaction(cls, aai_requests: Iterable[AaiBulkRequest])\
- -> Iterable[AaiBulkResponse]:
- """Singe transaction bulk request.
+ @property
+ def jinja_template(self) -> "Template":
+ """Jinja template propery.
+
+ As we are reusing same template multiple times it's better to load it once.
+
+ Returns:
+ Template: Template of A&AI bulk request body
+
+ """
+ if not self._jinja_template:
+ self._jinja_template = jinja_env().get_template(self.BULK_TEMPLATE)
+ return self._jinja_template
+
+ @property
+ def single_transaction_url(self) -> str:
+ """Single transaction url.
+
+ Returns:
+ str: A&AI bulk single transaction url.
+ """
+ return f"{self.url}/single-transaction"
+
+ @property
+ def failed_requests(self) -> List[AaiBulkRequest]:
+ """Collection of failed requests.
+
+ If user decide to retry bulk without failing request then they are
+ stored in given collection for logging/debugging purposes.
+
+ Returns:
+ List[AaiBulkRequest]: List of failing bulk requests
+
+ """
+ if not self._failed_requests:
+ return []
+ return self._failed_requests
+
+ def _add_failed_request(self, failed_request: AaiBulkRequest) -> None:
+ """Add failed request into internal `failed_requests` collection.
+
+ Args:
+ failed_request (AaiBulkRequest): Request which failed
+
+ """
+ if not self._failed_requests:
+ self._failed_requests = []
+ self._failed_requests.append(failed_request)
+
+ def _send_single_transaction_request(self,
+ aai_requests: List[AaiBulkResponse]
+ ) -> Iterable[AaiBulkResponse]:
+ """Send single transaction request.
+
+ Using send_message_json send chunk of requests to A&AI
+
+ Args:
+ aai_requests (List[AaiBulkResponse]): List of requests to be sent
+
+ Yields:
+ AaiBulkResponse: Response for each bulk request
+
+ """
+ if not aai_requests:
+ self._logger.info("No operations to send, abort")
+ return
+ for response in self.send_message_json(
+ "POST",
+ "Send bulk A&AI request",
+ self.single_transaction_url,
+ data=self.jinja_template.render(operations=aai_requests)
+ )["operation-responses"]:
+ yield AaiBulkResponse(
+ action=response["action"],
+ uri=response["uri"],
+ status_code=response["response-status-code"],
+ body=response["response-body"]
+ )
+
+ def _get_failed_operation_index(self, failed_response_body: Optional[str]) -> int:
+ """Get index of an operation which failed.
+
+ Using regular expressions we are able to read an index of request which we sent
+ and failed. Thanks to that we would be able to debug it, remove it and try to
+ rerun whole bulk request.
+
+ Args:
+ failed_response_body (Optional[str]): Body of failed A&AI bulk request
+
+ Returns:
+ int: Index of a request which failed. -1 if regular expression didn't find
+ any match
+
+ """
+ if not failed_response_body:
+ return -1
+ match: Match = self._operation_index_regex.match(failed_response_body)
+ if not match:
+ return -1
+ groupsdict: Dict[str, Any] = match.groupdict()
+ if groupsdict[self.FIRST_REGEX_GROUP_NAME]:
+ str_index: str = groupsdict[self.FIRST_REGEX_GROUP_NAME]
+ else:
+ str_index = groupsdict[self.SECOND_REGEX_GROUP_NAME]
+ return int(str_index)
+
+ def _send_chunk(self, aai_requests: List[AaiBulkRequest],
+ remove_failed_operation_on_failure: bool = True) -> Iterable[AaiBulkResponse]:
+ """Send a bulk requests chunk.
+
+ If it failed and `remove_failed_operation_on_failure` is set
+ then try to find which bulk request is failing, remove it and retry.
+
+ Args:
+ aai_requests (List[AaiBulkRequest]): List of requests to send
+ remove_failed_operation_on_failure (bool, optional): Flag to determine if
+ find failing request, remove it and retry. Defaults to True.
+
+ Yields:
+ AaiBulkResponse: Response for each bulk request
+
+ """
+ try:
+ yield from self._send_single_transaction_request(aai_requests)
+ except APIError as api_error:
+ if not remove_failed_operation_on_failure:
+ raise
+ operation_index: int = self._get_failed_operation_index(api_error.response_text)
+ if operation_index < 0:
+ self._logger.error("Wanted to remove failing bulk operation, "
+ "but there is no index on API response, "
+ "probably it's an A&AI error!")
+ raise
+ self._add_failed_request(aai_requests.pop(operation_index))
+ yield from self._send_chunk(aai_requests, remove_failed_operation_on_failure)
+
+ def single_transaction(self,
+ aai_requests: Iterable[AaiBulkRequest],
+ remove_failed_operation_on_failure: bool = True
+ ) -> Iterable[AaiBulkResponse]:
+ """Send aai requests using A&AI single transaction API.
Args:
- aai_requests (Iterable[AaiBulkRequest]): Iterable object of requests to be sent
- as a bulk request.
+ aai_requests (List[AaiBulkRequest]): List of requests to send
+ remove_failed_operation_on_failure (bool, optional): Flag to determine if
+ find failing request, remove it and retry. Defaults to True.
Yields:
- Iterator[Iterable[AaiBulkResponse]]: Bulk request responses. Each object
- correspond to the sent request.
+ AaiBulkResponse: Response for each bulk request
"""
- for requests_chunk in chunked(aai_requests, settings.AAI_BULK_CHUNK):
- for response in cls.send_message_json(\
- "POST",\
- "Send bulk A&AI request",\
- f"{cls.base_url}{cls.api_version}/bulk/single-transaction",\
- data=jinja_env().get_template(\
- "aai_bulk.json.j2").render(operations=requests_chunk)\
- )["operation-responses"]:
- yield AaiBulkResponse(
- action=response["action"],
- uri=response["uri"],
- status_code=response["response-status-code"],
- body=response["response-body"]
- )
+ for requests_chunk in chunked(aai_requests, self.chunk_size):
+ yield from self._send_chunk(requests_chunk, remove_failed_operation_on_failure)
diff --git a/src/onapsdk/exceptions.py b/src/onapsdk/exceptions.py
index 76af60e..a9977fd 100644
--- a/src/onapsdk/exceptions.py
+++ b/src/onapsdk/exceptions.py
@@ -33,7 +33,8 @@ class APIError(RequestError):
def __init__(self,
message: Optional[str] = None,
- response_status_code: Optional[int] = None) -> None:
+ response_status_code: Optional[int] = None,
+ response_text: Optional[str] = None) -> None:
"""Init api error exception.
Save message and optional response status code.
@@ -48,6 +49,7 @@ class APIError(RequestError):
else:
super().__init__()
self._response_status_code: int = response_status_code if response_status_code else 0
+ self._response_text: Optional[str] = response_text
@property
def response_status_code(self) -> int:
@@ -69,6 +71,26 @@ class APIError(RequestError):
"""
self._response_status_code = status_code
+ @property
+ def response_text(self) -> Optional[str]:
+ """Response status code property.
+
+ Returns:
+ int: Response status code. If not set, returns 0
+
+ """
+ return self._response_text
+
+ @response_text.setter
+ def response_text(self, response_text: str) -> None:
+ """Response status code property setter.
+
+ Args:
+ status_code (int): Response status code
+
+ """
+ self._response_text = response_text
+
class InvalidResponse(RequestError):
"""Unable to decode response."""
diff --git a/src/onapsdk/onap_service.py b/src/onapsdk/onap_service.py
index 06c358c..a219f5a 100644
--- a/src/onapsdk/onap_service.py
+++ b/src/onapsdk/onap_service.py
@@ -175,11 +175,13 @@ class OnapService(ABC):
msg = f'Code: {cause.response.status_code}. Info: {cause.response.text}.'
if cause.response.status_code == 404:
- exc = ResourceNotFound(msg)
+ exc = ResourceNotFound(msg,
+ response_status_code=cause.response.status_code,
+ response_text=cause.response.text)
else:
- exc = APIError(msg)
-
- exc.response_status_code = cause.response.status_code
+ exc = APIError(msg,
+ response_status_code=cause.response.status_code,
+ response_text=cause.response.text)
raise exc from cause
diff --git a/tests/test_aai_bulk.py b/tests/test_aai_bulk.py
index 0567055..880cc5c 100644
--- a/tests/test_aai_bulk.py
+++ b/tests/test_aai_bulk.py
@@ -12,8 +12,10 @@
# See the License for the specific language governing permissions and
# limitations under the License.
from unittest import mock
+from pytest import raises
from onapsdk.aai.bulk import AaiBulk, AaiBulkRequest, AaiBulkResponse
+from onapsdk.exceptions import APIError
BULK_RESPONSES = {
@@ -38,7 +40,7 @@ BULK_RESPONSES = {
def test_aai_bulk(mock_send_message_json):
assert AaiBulk().url.endswith("bulk")
mock_send_message_json.return_value = BULK_RESPONSES
- responses = list(AaiBulk.single_transaction(
+ responses = list(AaiBulk().single_transaction(
[
AaiBulkRequest(
action="post",
@@ -65,7 +67,7 @@ def test_aai_bulk(mock_send_message_json):
# Check if requests was splitted into chunks for generator
mock_send_message_json.reset_mock()
- responses = list(AaiBulk.single_transaction(
+ responses = list(AaiBulk().single_transaction(
(
AaiBulkRequest(
action="post",
@@ -78,7 +80,7 @@ def test_aai_bulk(mock_send_message_json):
# Check if requests was splitted into chunks for list
mock_send_message_json.reset_mock()
- responses = list(AaiBulk.single_transaction(
+ responses = list(AaiBulk().single_transaction(
[
AaiBulkRequest(
action="post",
@@ -88,3 +90,160 @@ def test_aai_bulk(mock_send_message_json):
]
))
assert mock_send_message_json.call_count == 2
+
+ mock_send_message_json.reset_mock()
+ responses = list(AaiBulk().single_transaction([]))
+ assert mock_send_message_json.call_count == 0
+
+
+@mock.patch("onapsdk.aai.bulk.AaiBulk.send_message_json")
+def test_aai_bulk_retry_and_remove_first_which_is_failing(mock_send_message_json):
+ mock_send_message_json.side_effect = [APIError(response_text="Error with operation 0"), BULK_RESPONSES]
+ aai_bulk = AaiBulk()
+ assert len(aai_bulk.failed_requests) == 0
+ list(aai_bulk.single_transaction([
+ AaiBulkRequest(
+ action="post",
+ uri="test-uri",
+ body={"blabla: blabla"}
+ ),
+ AaiBulkRequest(
+ action="get",
+ uri="test-uri",
+ body={}
+ )]))
+ assert mock_send_message_json.call_count == 2
+ assert len(aai_bulk.failed_requests) == 1
+ assert aai_bulk.failed_requests[0].action == "post"
+ assert aai_bulk.failed_requests[0].uri == "test-uri"
+ assert aai_bulk.failed_requests[0].body == {"blabla: blabla"}
+
+
+@mock.patch("onapsdk.aai.bulk.AaiBulk.send_message_json")
+def test_aai_bulk_retry_and_remove_second_which_is_failing(mock_send_message_json):
+ mock_send_message_json.side_effect = [APIError(response_text="Error with operation 1"), BULK_RESPONSES]
+ aai_bulk = AaiBulk()
+ list(aai_bulk.single_transaction([
+ AaiBulkRequest(
+ action="post",
+ uri="test-uri",
+ body={"blabla: blabla"}
+ ),
+ AaiBulkRequest(
+ action="get",
+ uri="test-uri",
+ body={}
+ )]))
+ assert mock_send_message_json.call_count == 2
+ assert len(aai_bulk.failed_requests) == 1
+ assert aai_bulk.failed_requests[0].action == "get"
+ assert aai_bulk.failed_requests[0].uri == "test-uri"
+ assert aai_bulk.failed_requests[0].body == {}
+
+
+@mock.patch("onapsdk.aai.bulk.AaiBulk.send_message_json")
+def test_aai_bulk_retry_and_remove_both_which_are_failing_reverse_order(mock_send_message_json):
+ mock_send_message_json.side_effect = [APIError(response_text="Error with operation 1"), APIError(response_text="Error with operation 0")]
+ aai_bulk = AaiBulk()
+ list(aai_bulk.single_transaction([
+ AaiBulkRequest(
+ action="post",
+ uri="test-uri",
+ body={"blabla: blabla"}
+ ),
+ AaiBulkRequest(
+ action="get",
+ uri="test-uri",
+ body={}
+ )]))
+ assert mock_send_message_json.call_count == 2
+ assert len(aai_bulk.failed_requests) == 2
+ assert aai_bulk.failed_requests[0].action == "get"
+ assert aai_bulk.failed_requests[0].uri == "test-uri"
+ assert aai_bulk.failed_requests[0].body == {}
+ assert aai_bulk.failed_requests[1].action == "post"
+ assert aai_bulk.failed_requests[1].uri == "test-uri"
+ assert aai_bulk.failed_requests[1].body == {"blabla: blabla"}
+
+
+@mock.patch("onapsdk.aai.bulk.AaiBulk.send_message_json")
+def test_aai_bulk_retry_and_remove_both_which_are_failing_in_order(mock_send_message_json):
+ mock_send_message_json.side_effect = [APIError(response_text="Error with operation 0"), APIError(response_text="Error with operation 0")]
+ aai_bulk = AaiBulk()
+ list(aai_bulk.single_transaction([
+ AaiBulkRequest(
+ action="post",
+ uri="test-uri",
+ body={"blabla: blabla"}
+ ),
+ AaiBulkRequest(
+ action="get",
+ uri="test-uri",
+ body={}
+ )]))
+ assert mock_send_message_json.call_count == 2
+ assert len(aai_bulk.failed_requests) == 2
+ assert aai_bulk.failed_requests[0].action == "post"
+ assert aai_bulk.failed_requests[0].uri == "test-uri"
+ assert aai_bulk.failed_requests[0].body == {"blabla: blabla"}
+ assert aai_bulk.failed_requests[1].action == "get"
+ assert aai_bulk.failed_requests[1].uri == "test-uri"
+ assert aai_bulk.failed_requests[1].body == {}
+
+
+@mock.patch("onapsdk.aai.bulk.AaiBulk.send_message_json")
+def test_aai_bulk_parse_invalid_response_text(mock_send_message_json):
+ mock_send_message_json.side_effect = [APIError(response_text="Invalid response text")]
+ aai_bulk = AaiBulk()
+ with raises(APIError):
+ list(aai_bulk.single_transaction([
+ AaiBulkRequest(
+ action="post",
+ uri="test-uri",
+ body={"blabla: blabla"}
+ ),
+ AaiBulkRequest(
+ action="get",
+ uri="test-uri",
+ body={}
+ )]))
+
+
+@mock.patch("onapsdk.aai.bulk.AaiBulk.send_message_json")
+def test_aai_bulk_do_not_retry(mock_send_message_json):
+ mock_send_message_json.side_effect = [APIError(response_text="Error with operation 0")]
+ aai_bulk = AaiBulk()
+ with raises(APIError):
+ list(aai_bulk.single_transaction([
+ AaiBulkRequest(
+ action="post",
+ uri="test-uri",
+ body={"blabla: blabla"}
+ ),
+ AaiBulkRequest(
+ action="get",
+ uri="test-uri",
+ body={}
+ )], remove_failed_operation_on_failure=False))
+ assert len(aai_bulk.failed_requests) == 0
+ assert mock_send_message_json.call_count == 1
+
+
+def test_get_failed_operation_index():
+ aai_bulk = AaiBulk()
+ def get_formatted_response_error_with_operation(index: int) -> str:
+ return ('{"requestError":{"serviceException":{"messageId":"SVC3000","text":"Invalid input performing %1 on %2 (msg=%3)'
+ f' (ec=%4)","variables":["POST","v27/bulk/single-transaction","Invalid input performing %1 on %2:Error with operation {index}: '
+ 'Missing required property: physical-location-type,Missing required property: street1,Missing required property: city,'
+ 'Missing required property: postal-code,Missing required property: country,Missing required property: region","ERR.5.2.3000"]}}}')
+ def get_formatted_response_not_found(index: int) -> str:
+ return ('{"requestError":{"serviceException":{"messageId":"SVC3000","text":"Invalid input performing %1 on %2 (msg=%3) '
+ f'(ec=%4)","variables":["POST","v27/bulk/single-transaction","Invalid input performing %1 on %2:Operation {index} with action '
+ '(DELETE) on uri (/cloud-infrastructure/complexes/complex/test-parse-bulk-response) failed with status code (404), error code '
+ '(ERR.5.4.6114) and msg (Node Not Found:No Node of type complex found at: /cloud-infrastructure/complexes/complex/test-parse-bulk-response)","ERR.5.2.3000"]}}}')
+ assert aai_bulk._get_failed_operation_index(None) == -1
+ assert aai_bulk._get_failed_operation_index("Something on what there is no index") == -1
+ assert aai_bulk._get_failed_operation_index("There is an index: 0 but it's not a valid string") == -1
+ for i in [pow(10, x) for x in range(6)]:
+ assert aai_bulk._get_failed_operation_index(get_formatted_response_error_with_operation(i)) == i
+ assert aai_bulk._get_failed_operation_index(get_formatted_response_not_found(i)) == i