From c42b1c55eedbac2663ada8cf5b2a9a67560300e9 Mon Sep 17 00:00:00 2001 From: Pawel Date: Mon, 7 Jun 2021 10:46:00 +0200 Subject: Handle 429 error Too Many Requests Issue-ID: DCAEGEN2-2827 Signed-off-by: Pawel Change-Id: Iedfb6572f008876b52a102948aeb640d27e51314 --- .../dmaap/client/error/ClientErrorReasons.java | 9 +++- .../client/impl/MessageRouterPublisherImpl.java | 2 + .../model/config/DmaapConnectionPoolConfig.java | 3 +- .../dmaap/client/api/MessageRouterPublisherIT.java | 58 +++++++++++++++++++++- .../client/api/MessageRouterPublisherTest.java | 5 +- 5 files changed, 72 insertions(+), 5 deletions(-) (limited to 'rest-services/dmaap-client/src') diff --git a/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/error/ClientErrorReasons.java b/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/error/ClientErrorReasons.java index 431843ae..f6e5c2bb 100644 --- a/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/error/ClientErrorReasons.java +++ b/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/error/ClientErrorReasons.java @@ -30,7 +30,8 @@ public class ClientErrorReasons { .header("408 Request Timeout") .text("Client timeout exception occurred, Error code is %1") .messageId("SVC0001") - .variables(Collections.singletonList("408")).build(); + .variables(Collections.singletonList("408")) + .build(); public static final ClientErrorReason SERVICE_UNAVAILABLE = ImmutableClientErrorReason.builder() .header("503 Service unavailable") @@ -38,4 +39,10 @@ public class ClientErrorReasons { .messageId("SVC2001") .build(); + public static final ClientErrorReason CONNECTION_POLL_LIMIT = ImmutableClientErrorReason.builder() + .header("429 Too Many Requests") + .text("Pending acquire queue has reached its maximum size") + .messageId("SVC2000") + .variables(Collections.singletonList("429")) + .build(); } diff --git a/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/impl/MessageRouterPublisherImpl.java b/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/impl/MessageRouterPublisherImpl.java index 6e4679c3..534fca6b 100644 --- a/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/impl/MessageRouterPublisherImpl.java +++ b/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/impl/MessageRouterPublisherImpl.java @@ -50,6 +50,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; +import reactor.netty.internal.shaded.reactor.pool.PoolAcquirePendingLimitException; import java.net.ConnectException; import java.time.Duration; @@ -95,6 +96,7 @@ public class MessageRouterPublisherImpl implements MessageRouterPublisher { e -> LOGGER.error("Timeout exception occurred when sending items to DMaaP MR", e)) .onErrorResume(ReadTimeoutException.class, e -> buildErrorResponse(ClientErrorReasons.TIMEOUT)) .doOnError(ConnectException.class, e -> LOGGER.error("DMaaP MR is unavailable, {}", e.getMessage())) + .onErrorResume(PoolAcquirePendingLimitException.class, e -> buildErrorResponse(ClientErrorReasons.CONNECTION_POLL_LIMIT)) .onErrorResume(ConnectException.class, e -> buildErrorResponse(ClientErrorReasons.SERVICE_UNAVAILABLE)) .onErrorResume(RetryableException.class, e -> Mono.just(buildResponse(e.getResponse(), batch))); } diff --git a/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/model/config/DmaapConnectionPoolConfig.java b/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/model/config/DmaapConnectionPoolConfig.java index 38166e4e..4a4add0e 100644 --- a/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/model/config/DmaapConnectionPoolConfig.java +++ b/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/model/config/DmaapConnectionPoolConfig.java @@ -20,13 +20,14 @@ package org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config; import org.immutables.value.Value; +import reactor.netty.resources.ConnectionProvider; @Value.Immutable public interface DmaapConnectionPoolConfig { @Value.Default default int connectionPool(){ - return 16; + return ConnectionProvider.DEFAULT_POOL_MAX_CONNECTIONS; } @Value.Default default int maxLifeTime(){ diff --git a/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterPublisherIT.java b/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterPublisherIT.java index 70adf59d..a1ad951f 100644 --- a/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterPublisherIT.java +++ b/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterPublisherIT.java @@ -95,6 +95,18 @@ class MessageRouterPublisherIT { + "}" + "}" + "}"; + private static final String CONNECTION_POLL_LIMIT_MESSAGE = "429 Too Many Requests\n" + + "{" + + "\"requestError\":" + + "{" + + "\"serviceException\":" + + "{" + + "\"messageId\":\"SVC2000\"," + + "\"text\":\"Pending acquire queue has reached its maximum size\"," + + "\"variables\":[\"429\"]" + + "}" + + "}" + + "}"; private final MessageRouterPublisher publisher = DmaapClientFactory .createMessageRouterPublisher(MessageRouterPublisherConfig.createDefault()); @@ -570,6 +582,48 @@ class MessageRouterPublisherIT { .withHeader("Authorization" ,"Basic dXNlcm5hbWU6cGFzc3dvcmQ="), VerificationTimes.exactly(1)); } + @Test + void publisher_shouldHandleError429WhenConnectionPollLimitsHasBeenReached() { + //given + final String topic = "TOPIC17"; + final String topicUrl = String.format("%s/%s", PROXY_MOCK_EVENTS_PATH, topic); + + final List twoJsonMessages = List.of("{\"message\":\"message1\"}", + "{\"differentMessage\":\"message2\"}"); + final Flux plainBatch = plainBatch(twoJsonMessages); + + final MessageRouterPublishRequest publishRequest = createPublishRequest(topicUrl, Duration.ofSeconds(1)); + + final MessageRouterPublishResponse expectedResponse = errorPublishResponse( + CONNECTION_POLL_LIMIT_MESSAGE); + + final String path = String.format("/events/%s", topic); + + //maxConnectionPoll + pendingAcquireMaxCount(default 2*maxConnectionPoll) + final int maxNumberOfConcurrentRequest = 3; + MOCK_SERVER_CLIENT + .when(request().withPath(path), Times.exactly(maxNumberOfConcurrentRequest)) + .respond(response().withStatusCode(429).withDelay(TimeUnit.SECONDS,1)); + + MOCK_SERVER_CLIENT + .when(request().withPath(path), Times.once()) + .respond(response().withStatusCode(200)); + + final MessageRouterPublisher publisher = DmaapClientFactory.createMessageRouterPublisher(connectionPoolConfiguration()); + + //when + final Flux result = publisher.put(publishRequest, plainBatch); + + for(int i = 0; i < maxNumberOfConcurrentRequest; i++) { + publisher.put(publishRequest, plainBatch).subscribe(); + } + + //then + StepVerifier.create(result) + .expectNext(expectedResponse) + .expectComplete() + .verify(); + } private MessageRouterPublisherConfig retryConfig(int retryInterval, int retryCount) { return ImmutableMessageRouterPublisherConfig.builder() @@ -582,7 +636,7 @@ class MessageRouterPublisherIT { private MessageRouterPublisherConfig connectionPoolConfiguration() { return ImmutableMessageRouterPublisherConfig.builder() .connectionPoolConfig(ImmutableDmaapConnectionPoolConfig.builder() - .connectionPool(10) + .connectionPool(1) .maxIdleTime(10) .maxLifeTime(20) .build()) @@ -592,7 +646,7 @@ class MessageRouterPublisherIT { private MessageRouterPublisherConfig connectionPoolAndRetryConfiguration() { return ImmutableMessageRouterPublisherConfig.builder() .connectionPoolConfig(ImmutableDmaapConnectionPoolConfig.builder() - .connectionPool(10) + .connectionPool(1) .maxIdleTime(10) .maxLifeTime(20) .build()) diff --git a/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterPublisherTest.java b/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterPublisherTest.java index 82a2b008..97fd26f5 100644 --- a/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterPublisherTest.java +++ b/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterPublisherTest.java @@ -64,6 +64,7 @@ class MessageRouterPublisherTest { private static final String FAILING_WITH_403_RESP_PATH = "/events/TOPIC403"; private static final String FAILING_WITH_404_RESP_PATH = "/events/TOPIC404"; private static final String FAILING_WITH_500_RESP_PATH = "/events/TOPIC500"; + private static final String FAILING_WITH_429_RESP_PATH = "/events/TOPIC429"; private static final Duration TIMEOUT = Duration.ofSeconds(10); private static final Flux messageBatch = Flux.just("ala", "ma", "kota") .map(JsonPrimitive::new); @@ -82,6 +83,7 @@ class MessageRouterPublisherTest { .post(FAILING_WITH_403_RESP_PATH, (req, resp) -> sendError(resp, 403, ERROR_MESSAGE)) .post(FAILING_WITH_404_RESP_PATH, (req, resp) -> sendError(resp, 404, ERROR_MESSAGE)) .post(FAILING_WITH_500_RESP_PATH, (req, resp) -> sendError(resp, 500, ERROR_MESSAGE)) + .post(FAILING_WITH_429_RESP_PATH, (req, resp) -> sendError(resp, 429, ERROR_MESSAGE)) ); } @@ -107,7 +109,8 @@ class MessageRouterPublisherTest { FAILING_WITH_401_RESP_PATH + "," + "401 Unauthorized", FAILING_WITH_403_RESP_PATH + "," + "403 Forbidden", FAILING_WITH_404_RESP_PATH + "," + "404 Not Found", - FAILING_WITH_500_RESP_PATH + "," + "500 Internal Server Error" + FAILING_WITH_500_RESP_PATH + "," + "500 Internal Server Error", + FAILING_WITH_429_RESP_PATH + "," + "429 Too Many Requests" }) void publisher_shouldHandleError(String failingPath, String failReason) { //given -- cgit 1.2.3-korg