diff options
Diffstat (limited to 'rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/impl/MessageRouterPublisherImplTest.java')
-rw-r--r-- | rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/impl/MessageRouterPublisherImplTest.java | 58 |
1 files changed, 54 insertions, 4 deletions
diff --git a/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/impl/MessageRouterPublisherImplTest.java b/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/impl/MessageRouterPublisherImplTest.java index 2825a87c..2fde441d 100644 --- a/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/impl/MessageRouterPublisherImplTest.java +++ b/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/impl/MessageRouterPublisherImplTest.java @@ -46,6 +46,7 @@ import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.test.StepVerifier; +import java.net.ConnectException; import java.nio.charset.StandardCharsets; import java.time.Duration; @@ -79,6 +80,7 @@ class MessageRouterPublisherImplTest { private final MessageRouterPublishRequest plainPublishRequest = createPublishRequest(TOPIC_URL, ContentType.TEXT_PLAIN); private final MessageRouterPublishRequest jsonPublishRequest = createPublishRequest(TOPIC_URL); private final HttpResponse successHttpResponse = createHttpResponse("OK", 200); + private final HttpResponse retryableHttpResponse = createHttpResponse("ERROR", 500); @Test void puttingElementsShouldYieldNonChunkedHttpRequest() { @@ -431,7 +433,49 @@ class MessageRouterPublisherImplTest { // then StepVerifier.create(responses) - .consumeNextWith(this::assertTimeoutError) + .consumeNextWith(this::assertFailedResponse) + .expectComplete() + .verify(TIMEOUT); + } + + @Test + void onPut_whenConnectionExceptionOccurs_shouldReturnOneConnectionException() { + // given + final List<String> plainMessage = List.of("I", "like", "cookies"); + + final Flux<JsonElement> plainMessagesMaxBatch = plainBatch(plainMessage); + given(clientErrorReasonPresenter.present(any())) + .willReturn(ERROR_MESSAGE); + given(httpClient.call(any(HttpRequest.class))) + .willReturn(Mono.error(new ConnectException())); + + // when + final Flux<MessageRouterPublishResponse> responses = cut + .put(plainPublishRequest, plainMessagesMaxBatch); + + // then + StepVerifier.create(responses) + .consumeNextWith(this::assertFailedResponse) + .expectComplete() + .verify(TIMEOUT); + } + + @Test + void onPut_whenRetryableExceptionOccurs_shouldReturnCertainFailedResponse() { + // given + final List<String> plainMessage = List.of("I", "like", "cookies"); + + final Flux<JsonElement> plainMessagesMaxBatch = plainBatch(plainMessage); + given(httpClient.call(any(HttpRequest.class))) + .willReturn(Mono.just(retryableHttpResponse)); + + // when + final Flux<MessageRouterPublishResponse> responses = cut + .put(plainPublishRequest, plainMessagesMaxBatch); + + // then + StepVerifier.create(responses) + .consumeNextWith(this::assertRetryableFailedResponse) .expectComplete() .verify(TIMEOUT); } @@ -458,7 +502,7 @@ class MessageRouterPublisherImplTest { // then StepVerifier.create(responses) .consumeNextWith(response -> verifySuccessfulResponses(parsedThreeMessages, response)) - .consumeNextWith(this::assertTimeoutError) + .consumeNextWith(this::assertFailedResponse) .expectComplete() .verify(TIMEOUT); } @@ -484,7 +528,7 @@ class MessageRouterPublisherImplTest { // then StepVerifier.create(responses) - .consumeNextWith(this::assertTimeoutError) + .consumeNextWith(this::assertFailedResponse) .consumeNextWith(response -> verifySuccessfulResponses(parsedTwoMessages, response)) .expectComplete() .verify(TIMEOUT); @@ -549,12 +593,18 @@ class MessageRouterPublisherImplTest { } } - private void assertTimeoutError(MessageRouterPublishResponse response) { + private void assertFailedResponse(MessageRouterPublishResponse response) { assertThat(response.failed()).isTrue(); assertThat(response.items()).isEmpty(); assertThat(response.failReason()).isEqualTo(ERROR_MESSAGE); } + private void assertRetryableFailedResponse(MessageRouterPublishResponse response) { + assertThat(response.failed()).isTrue(); + assertThat(response.items()).isEmpty(); + assertThat(response.failReason()).startsWith("500 ERROR"); + } + private void verifySingleResponse(List<? extends JsonElement> threeMessages, Flux<MessageRouterPublishResponse> responses) { StepVerifier.create(responses) |