aboutsummaryrefslogtreecommitdiffstats
path: root/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/impl/MessageRouterPublisherImplTest.java
diff options
context:
space:
mode:
Diffstat (limited to 'rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/impl/MessageRouterPublisherImplTest.java')
-rw-r--r--rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/impl/MessageRouterPublisherImplTest.java58
1 files changed, 54 insertions, 4 deletions
diff --git a/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/impl/MessageRouterPublisherImplTest.java b/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/impl/MessageRouterPublisherImplTest.java
index 2825a87c..2fde441d 100644
--- a/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/impl/MessageRouterPublisherImplTest.java
+++ b/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/impl/MessageRouterPublisherImplTest.java
@@ -46,6 +46,7 @@ import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.test.StepVerifier;
+import java.net.ConnectException;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
@@ -79,6 +80,7 @@ class MessageRouterPublisherImplTest {
private final MessageRouterPublishRequest plainPublishRequest = createPublishRequest(TOPIC_URL, ContentType.TEXT_PLAIN);
private final MessageRouterPublishRequest jsonPublishRequest = createPublishRequest(TOPIC_URL);
private final HttpResponse successHttpResponse = createHttpResponse("OK", 200);
+ private final HttpResponse retryableHttpResponse = createHttpResponse("ERROR", 500);
@Test
void puttingElementsShouldYieldNonChunkedHttpRequest() {
@@ -431,7 +433,49 @@ class MessageRouterPublisherImplTest {
// then
StepVerifier.create(responses)
- .consumeNextWith(this::assertTimeoutError)
+ .consumeNextWith(this::assertFailedResponse)
+ .expectComplete()
+ .verify(TIMEOUT);
+ }
+
+ @Test
+ void onPut_whenConnectionExceptionOccurs_shouldReturnOneConnectionException() {
+ // given
+ final List<String> plainMessage = List.of("I", "like", "cookies");
+
+ final Flux<JsonElement> plainMessagesMaxBatch = plainBatch(plainMessage);
+ given(clientErrorReasonPresenter.present(any()))
+ .willReturn(ERROR_MESSAGE);
+ given(httpClient.call(any(HttpRequest.class)))
+ .willReturn(Mono.error(new ConnectException()));
+
+ // when
+ final Flux<MessageRouterPublishResponse> responses = cut
+ .put(plainPublishRequest, plainMessagesMaxBatch);
+
+ // then
+ StepVerifier.create(responses)
+ .consumeNextWith(this::assertFailedResponse)
+ .expectComplete()
+ .verify(TIMEOUT);
+ }
+
+ @Test
+ void onPut_whenRetryableExceptionOccurs_shouldReturnCertainFailedResponse() {
+ // given
+ final List<String> plainMessage = List.of("I", "like", "cookies");
+
+ final Flux<JsonElement> plainMessagesMaxBatch = plainBatch(plainMessage);
+ given(httpClient.call(any(HttpRequest.class)))
+ .willReturn(Mono.just(retryableHttpResponse));
+
+ // when
+ final Flux<MessageRouterPublishResponse> responses = cut
+ .put(plainPublishRequest, plainMessagesMaxBatch);
+
+ // then
+ StepVerifier.create(responses)
+ .consumeNextWith(this::assertRetryableFailedResponse)
.expectComplete()
.verify(TIMEOUT);
}
@@ -458,7 +502,7 @@ class MessageRouterPublisherImplTest {
// then
StepVerifier.create(responses)
.consumeNextWith(response -> verifySuccessfulResponses(parsedThreeMessages, response))
- .consumeNextWith(this::assertTimeoutError)
+ .consumeNextWith(this::assertFailedResponse)
.expectComplete()
.verify(TIMEOUT);
}
@@ -484,7 +528,7 @@ class MessageRouterPublisherImplTest {
// then
StepVerifier.create(responses)
- .consumeNextWith(this::assertTimeoutError)
+ .consumeNextWith(this::assertFailedResponse)
.consumeNextWith(response -> verifySuccessfulResponses(parsedTwoMessages, response))
.expectComplete()
.verify(TIMEOUT);
@@ -549,12 +593,18 @@ class MessageRouterPublisherImplTest {
}
}
- private void assertTimeoutError(MessageRouterPublishResponse response) {
+ private void assertFailedResponse(MessageRouterPublishResponse response) {
assertThat(response.failed()).isTrue();
assertThat(response.items()).isEmpty();
assertThat(response.failReason()).isEqualTo(ERROR_MESSAGE);
}
+ private void assertRetryableFailedResponse(MessageRouterPublishResponse response) {
+ assertThat(response.failed()).isTrue();
+ assertThat(response.items()).isEmpty();
+ assertThat(response.failReason()).startsWith("500 ERROR");
+ }
+
private void verifySingleResponse(List<? extends JsonElement> threeMessages,
Flux<MessageRouterPublishResponse> responses) {
StepVerifier.create(responses)