diff options
Diffstat (limited to 'rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterPublisherTest.java')
-rw-r--r-- | rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterPublisherTest.java | 49 |
1 files changed, 34 insertions, 15 deletions
diff --git a/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterPublisherTest.java b/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterPublisherTest.java index 82b6661c..82a2b008 100644 --- a/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterPublisherTest.java +++ b/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterPublisherTest.java @@ -23,7 +23,6 @@ package org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api; import com.google.gson.JsonElement; import com.google.gson.JsonPrimitive; import io.vavr.collection.List; -import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.CsvSource; @@ -57,6 +56,7 @@ class MessageRouterPublisherTest { private static final String ERROR_MESSAGE = "Something went wrong"; private static final String TIMEOUT_ERROR_MESSAGE = "408 Request Timeout"; + private static final String CONNECTION_ERROR_MESSAGE = "503 Service unavailable"; private static final String SUCCESS_RESP_TOPIC_PATH = "/events/TOPIC"; private static final String DELAY_RESP_TOPIC_PATH = "/events/DELAY"; private static final String FAILING_WITH_400_RESP_PATH = "/events/TOPIC400"; @@ -68,15 +68,13 @@ class MessageRouterPublisherTest { private static final Flux<JsonPrimitive> messageBatch = Flux.just("ala", "ma", "kota") .map(JsonPrimitive::new); private static final List<String> messageBatchItems = List.of("ala", "ma", "kota"); - - private static DummyHttpServer server; + private static final DummyHttpServer DISPOSED_HTTP_SERVER = initialize().closeAndGet(); + private static final DummyHttpServer SERVER = initialize(); private MessageRouterPublisher sut = DmaapClientFactory .createMessageRouterPublisher(MessageRouterPublisherConfig.createDefault()); - - @BeforeAll - static void setUp() { - server = DummyHttpServer.start(routes -> routes + private static DummyHttpServer initialize() { + return DummyHttpServer.start(routes -> routes .post(SUCCESS_RESP_TOPIC_PATH, (req, resp) -> sendString(resp, Mono.just("OK"))) .post(DELAY_RESP_TOPIC_PATH, (req, resp) -> sendWithDelay(resp, 200, TIMEOUT)) .post(FAILING_WITH_400_RESP_PATH, (req, resp) -> sendError(resp, 400, ERROR_MESSAGE)) @@ -90,7 +88,7 @@ class MessageRouterPublisherTest { @Test void test_put_givenMessageBatch_shouldMakeSuccessfulPostRequestReturningBatch() { //given - final MessageRouterPublishRequest mrRequest = createTextPlainMRRequest(SUCCESS_RESP_TOPIC_PATH); + final MessageRouterPublishRequest mrRequest = createTextPlainMRRequest(SUCCESS_RESP_TOPIC_PATH, SERVER); final List<JsonElement> expectedItems = messageBatchItems.map(JsonPrimitive::new); //when @@ -113,7 +111,7 @@ class MessageRouterPublisherTest { }) void publisher_shouldHandleError(String failingPath, String failReason) { //given - final MessageRouterPublishRequest mrRequest = createTextPlainMRRequest(failingPath); + final MessageRouterPublishRequest mrRequest = createTextPlainMRRequest(failingPath, SERVER); final MessageRouterPublishResponse expectedResponse = createErrorResponse(failReason); //when @@ -142,8 +140,24 @@ class MessageRouterPublisherTest { .verify(TIMEOUT); } - private static MessageRouterPublishRequest createTextPlainMRRequest(String topicPath) { - final MessageRouterSink sinkDefinition = createMRSink(topicPath); + @Test + void publisher_shouldHandleConnectionError() { + //given + final MessageRouterPublishRequest mrRequest = createTextPlainMRRequest( + SUCCESS_RESP_TOPIC_PATH, DISPOSED_HTTP_SERVER); + + //when + final Flux<MessageRouterPublishResponse> result = sut.put(mrRequest, messageBatch); + + //then + StepVerifier.create(result) + .consumeNextWith(this::assertConnectionError) + .expectComplete() + .verify(TIMEOUT); + } + + private static MessageRouterPublishRequest createTextPlainMRRequest(String topicPath, DummyHttpServer dummyHttpServer) { + final MessageRouterSink sinkDefinition = createMRSink(topicPath, dummyHttpServer); return ImmutableMessageRouterPublishRequest.builder() .sinkDefinition(sinkDefinition) .contentType(ContentType.TEXT_PLAIN) @@ -151,7 +165,7 @@ class MessageRouterPublisherTest { } private static MessageRouterPublishRequest createTextPlainMRRequest(String topicPath, Duration timeout) { - final MessageRouterSink sinkDefinition = createMRSink(topicPath); + final MessageRouterSink sinkDefinition = createMRSink(topicPath, SERVER); return ImmutableMessageRouterPublishRequest.builder() .sinkDefinition(sinkDefinition) .contentType(ContentType.TEXT_PLAIN) @@ -159,12 +173,12 @@ class MessageRouterPublisherTest { .build(); } - private static MessageRouterSink createMRSink(String topicPath) { + private static MessageRouterSink createMRSink(String topicPath, DummyHttpServer dummyHttpServer) { return ImmutableMessageRouterSink.builder() .name("the topic") .topicUrl(String.format("http://%s:%d%s", - server.host(), - server.port(), + dummyHttpServer.host(), + dummyHttpServer.port(), topicPath) ) .build(); @@ -182,5 +196,10 @@ class MessageRouterPublisherTest { assertThat(response.failed()).isTrue(); assertThat(response.failReason()).startsWith(TIMEOUT_ERROR_MESSAGE); } + + private void assertConnectionError(DmaapResponse response) { + assertThat(response.failed()).isTrue(); + assertThat(response.failReason()).startsWith(CONNECTION_ERROR_MESSAGE); + } } |