diff options
Diffstat (limited to 'rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterPublisherIT.java')
-rw-r--r-- | rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterPublisherIT.java | 58 |
1 files changed, 56 insertions, 2 deletions
diff --git a/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterPublisherIT.java b/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterPublisherIT.java index 70adf59d..a1ad951f 100644 --- a/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterPublisherIT.java +++ b/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterPublisherIT.java @@ -95,6 +95,18 @@ class MessageRouterPublisherIT { + "}" + "}" + "}"; + private static final String CONNECTION_POLL_LIMIT_MESSAGE = "429 Too Many Requests\n" + + "{" + + "\"requestError\":" + + "{" + + "\"serviceException\":" + + "{" + + "\"messageId\":\"SVC2000\"," + + "\"text\":\"Pending acquire queue has reached its maximum size\"," + + "\"variables\":[\"429\"]" + + "}" + + "}" + + "}"; private final MessageRouterPublisher publisher = DmaapClientFactory .createMessageRouterPublisher(MessageRouterPublisherConfig.createDefault()); @@ -570,6 +582,48 @@ class MessageRouterPublisherIT { .withHeader("Authorization" ,"Basic dXNlcm5hbWU6cGFzc3dvcmQ="), VerificationTimes.exactly(1)); } + @Test + void publisher_shouldHandleError429WhenConnectionPollLimitsHasBeenReached() { + //given + final String topic = "TOPIC17"; + final String topicUrl = String.format("%s/%s", PROXY_MOCK_EVENTS_PATH, topic); + + final List<String> twoJsonMessages = List.of("{\"message\":\"message1\"}", + "{\"differentMessage\":\"message2\"}"); + final Flux<JsonElement> plainBatch = plainBatch(twoJsonMessages); + + final MessageRouterPublishRequest publishRequest = createPublishRequest(topicUrl, Duration.ofSeconds(1)); + + final MessageRouterPublishResponse expectedResponse = errorPublishResponse( + CONNECTION_POLL_LIMIT_MESSAGE); + + final String path = String.format("/events/%s", topic); + + //maxConnectionPoll + pendingAcquireMaxCount(default 2*maxConnectionPoll) + final int maxNumberOfConcurrentRequest = 3; + MOCK_SERVER_CLIENT + .when(request().withPath(path), Times.exactly(maxNumberOfConcurrentRequest)) + .respond(response().withStatusCode(429).withDelay(TimeUnit.SECONDS,1)); + + MOCK_SERVER_CLIENT + .when(request().withPath(path), Times.once()) + .respond(response().withStatusCode(200)); + + final MessageRouterPublisher publisher = DmaapClientFactory.createMessageRouterPublisher(connectionPoolConfiguration()); + + //when + final Flux<MessageRouterPublishResponse> result = publisher.put(publishRequest, plainBatch); + + for(int i = 0; i < maxNumberOfConcurrentRequest; i++) { + publisher.put(publishRequest, plainBatch).subscribe(); + } + + //then + StepVerifier.create(result) + .expectNext(expectedResponse) + .expectComplete() + .verify(); + } private MessageRouterPublisherConfig retryConfig(int retryInterval, int retryCount) { return ImmutableMessageRouterPublisherConfig.builder() @@ -582,7 +636,7 @@ class MessageRouterPublisherIT { private MessageRouterPublisherConfig connectionPoolConfiguration() { return ImmutableMessageRouterPublisherConfig.builder() .connectionPoolConfig(ImmutableDmaapConnectionPoolConfig.builder() - .connectionPool(10) + .connectionPool(1) .maxIdleTime(10) .maxLifeTime(20) .build()) @@ -592,7 +646,7 @@ class MessageRouterPublisherIT { private MessageRouterPublisherConfig connectionPoolAndRetryConfiguration() { return ImmutableMessageRouterPublisherConfig.builder() .connectionPoolConfig(ImmutableDmaapConnectionPoolConfig.builder() - .connectionPool(10) + .connectionPool(1) .maxIdleTime(10) .maxLifeTime(20) .build()) |