aboutsummaryrefslogtreecommitdiffstats
path: root/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterPublisherTest.java
diff options
context:
space:
mode:
Diffstat (limited to 'rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterPublisherTest.java')
-rw-r--r--rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterPublisherTest.java49
1 files changed, 34 insertions, 15 deletions
diff --git a/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterPublisherTest.java b/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterPublisherTest.java
index 82b6661c..82a2b008 100644
--- a/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterPublisherTest.java
+++ b/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterPublisherTest.java
@@ -23,7 +23,6 @@ package org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api;
import com.google.gson.JsonElement;
import com.google.gson.JsonPrimitive;
import io.vavr.collection.List;
-import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.CsvSource;
@@ -57,6 +56,7 @@ class MessageRouterPublisherTest {
private static final String ERROR_MESSAGE = "Something went wrong";
private static final String TIMEOUT_ERROR_MESSAGE = "408 Request Timeout";
+ private static final String CONNECTION_ERROR_MESSAGE = "503 Service unavailable";
private static final String SUCCESS_RESP_TOPIC_PATH = "/events/TOPIC";
private static final String DELAY_RESP_TOPIC_PATH = "/events/DELAY";
private static final String FAILING_WITH_400_RESP_PATH = "/events/TOPIC400";
@@ -68,15 +68,13 @@ class MessageRouterPublisherTest {
private static final Flux<JsonPrimitive> messageBatch = Flux.just("ala", "ma", "kota")
.map(JsonPrimitive::new);
private static final List<String> messageBatchItems = List.of("ala", "ma", "kota");
-
- private static DummyHttpServer server;
+ private static final DummyHttpServer DISPOSED_HTTP_SERVER = initialize().closeAndGet();
+ private static final DummyHttpServer SERVER = initialize();
private MessageRouterPublisher sut = DmaapClientFactory
.createMessageRouterPublisher(MessageRouterPublisherConfig.createDefault());
-
- @BeforeAll
- static void setUp() {
- server = DummyHttpServer.start(routes -> routes
+ private static DummyHttpServer initialize() {
+ return DummyHttpServer.start(routes -> routes
.post(SUCCESS_RESP_TOPIC_PATH, (req, resp) -> sendString(resp, Mono.just("OK")))
.post(DELAY_RESP_TOPIC_PATH, (req, resp) -> sendWithDelay(resp, 200, TIMEOUT))
.post(FAILING_WITH_400_RESP_PATH, (req, resp) -> sendError(resp, 400, ERROR_MESSAGE))
@@ -90,7 +88,7 @@ class MessageRouterPublisherTest {
@Test
void test_put_givenMessageBatch_shouldMakeSuccessfulPostRequestReturningBatch() {
//given
- final MessageRouterPublishRequest mrRequest = createTextPlainMRRequest(SUCCESS_RESP_TOPIC_PATH);
+ final MessageRouterPublishRequest mrRequest = createTextPlainMRRequest(SUCCESS_RESP_TOPIC_PATH, SERVER);
final List<JsonElement> expectedItems = messageBatchItems.map(JsonPrimitive::new);
//when
@@ -113,7 +111,7 @@ class MessageRouterPublisherTest {
})
void publisher_shouldHandleError(String failingPath, String failReason) {
//given
- final MessageRouterPublishRequest mrRequest = createTextPlainMRRequest(failingPath);
+ final MessageRouterPublishRequest mrRequest = createTextPlainMRRequest(failingPath, SERVER);
final MessageRouterPublishResponse expectedResponse = createErrorResponse(failReason);
//when
@@ -142,8 +140,24 @@ class MessageRouterPublisherTest {
.verify(TIMEOUT);
}
- private static MessageRouterPublishRequest createTextPlainMRRequest(String topicPath) {
- final MessageRouterSink sinkDefinition = createMRSink(topicPath);
+ @Test
+ void publisher_shouldHandleConnectionError() {
+ //given
+ final MessageRouterPublishRequest mrRequest = createTextPlainMRRequest(
+ SUCCESS_RESP_TOPIC_PATH, DISPOSED_HTTP_SERVER);
+
+ //when
+ final Flux<MessageRouterPublishResponse> result = sut.put(mrRequest, messageBatch);
+
+ //then
+ StepVerifier.create(result)
+ .consumeNextWith(this::assertConnectionError)
+ .expectComplete()
+ .verify(TIMEOUT);
+ }
+
+ private static MessageRouterPublishRequest createTextPlainMRRequest(String topicPath, DummyHttpServer dummyHttpServer) {
+ final MessageRouterSink sinkDefinition = createMRSink(topicPath, dummyHttpServer);
return ImmutableMessageRouterPublishRequest.builder()
.sinkDefinition(sinkDefinition)
.contentType(ContentType.TEXT_PLAIN)
@@ -151,7 +165,7 @@ class MessageRouterPublisherTest {
}
private static MessageRouterPublishRequest createTextPlainMRRequest(String topicPath, Duration timeout) {
- final MessageRouterSink sinkDefinition = createMRSink(topicPath);
+ final MessageRouterSink sinkDefinition = createMRSink(topicPath, SERVER);
return ImmutableMessageRouterPublishRequest.builder()
.sinkDefinition(sinkDefinition)
.contentType(ContentType.TEXT_PLAIN)
@@ -159,12 +173,12 @@ class MessageRouterPublisherTest {
.build();
}
- private static MessageRouterSink createMRSink(String topicPath) {
+ private static MessageRouterSink createMRSink(String topicPath, DummyHttpServer dummyHttpServer) {
return ImmutableMessageRouterSink.builder()
.name("the topic")
.topicUrl(String.format("http://%s:%d%s",
- server.host(),
- server.port(),
+ dummyHttpServer.host(),
+ dummyHttpServer.port(),
topicPath)
)
.build();
@@ -182,5 +196,10 @@ class MessageRouterPublisherTest {
assertThat(response.failed()).isTrue();
assertThat(response.failReason()).startsWith(TIMEOUT_ERROR_MESSAGE);
}
+
+ private void assertConnectionError(DmaapResponse response) {
+ assertThat(response.failed()).isTrue();
+ assertThat(response.failReason()).startsWith(CONNECTION_ERROR_MESSAGE);
+ }
}