summaryrefslogtreecommitdiffstats
path: root/rest-services/dmaap-client
diff options
context:
space:
mode:
authortkogut <tomasz.kogut@nokia.com>2021-02-01 10:26:29 +0100
committerTomasz Kogut <tomasz.kogut@nokia.com>2021-02-01 11:43:49 +0000
commite6a88a86d3b76b30efc1da367d619c71296b15e8 (patch)
treed087cdd511e9ebf0b64d20123cbdda02296dadc4 /rest-services/dmaap-client
parent214d24db845fe1485f91b03971c40640601881ca (diff)
Improve retry mechanism in dmaap-client.
Return last exception instead of timeout exception when retry exhausted. Handle no connection exception when sending requests to dmaap-mr. Issue-ID: DCAEGEN2-1483 Signed-off-by: tkogut <tomasz.kogut@nokia.com> Change-Id: Ibe318fa349b79999a5c8054e04e72e444a42ea78
Diffstat (limited to 'rest-services/dmaap-client')
-rw-r--r--rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/DmaapClientFactory.java2
-rw-r--r--rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/error/ClientErrorReasons.java8
-rw-r--r--rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/impl/MessageRouterPublisherImpl.java12
-rw-r--r--rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/impl/MessageRouterSubscriberImpl.java10
-rw-r--r--rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/model/config/DmaapRetryConfig.java1
-rw-r--r--rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterPublisherIT.java35
-rw-r--r--rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterPublisherTest.java49
-rw-r--r--rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterSubscriberIT.java34
-rw-r--r--rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterSubscriberTest.java44
-rw-r--r--rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/impl/MessageRouterPublisherImplTest.java58
-rw-r--r--rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/impl/MessageRouterSubscriberImplTest.java56
11 files changed, 264 insertions, 45 deletions
diff --git a/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/DmaapClientFactory.java b/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/DmaapClientFactory.java
index 9d255559..95850078 100644
--- a/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/DmaapClientFactory.java
+++ b/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/DmaapClientFactory.java
@@ -36,7 +36,6 @@ import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.Me
import java.time.Duration;
-import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.DmaapRetryConfig.ON_RETRY_EXHAUSTED_EXCEPTION;
import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.DmaapRetryConfig.RETRYABLE_EXCEPTIONS;
import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.DmaapRetryConfig.RETRYABLE_HTTP_CODES;
@@ -83,7 +82,6 @@ public final class DmaapClientFactory {
.retryCount(rc.retryCount())
.retryableHttpResponseCodes(RETRYABLE_HTTP_CODES)
.customRetryableExceptions(RETRYABLE_EXCEPTIONS)
- .onRetryExhaustedException(ON_RETRY_EXHAUSTED_EXCEPTION)
.build())
.getOrNull();
}
diff --git a/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/error/ClientErrorReasons.java b/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/error/ClientErrorReasons.java
index 5a51e5f2..431843ae 100644
--- a/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/error/ClientErrorReasons.java
+++ b/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/error/ClientErrorReasons.java
@@ -2,7 +2,7 @@
* ============LICENSE_START====================================
* DCAEGEN2-SERVICES-SDK
* =========================================================
- * Copyright (C) 2020 Nokia. All rights reserved.
+ * Copyright (C) 2021 Nokia. All rights reserved.
* =========================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -32,4 +32,10 @@ public class ClientErrorReasons {
.messageId("SVC0001")
.variables(Collections.singletonList("408")).build();
+ public static final ClientErrorReason SERVICE_UNAVAILABLE = ImmutableClientErrorReason.builder()
+ .header("503 Service unavailable")
+ .text("DMaaP MR is unavailable")
+ .messageId("SVC2001")
+ .build();
+
}
diff --git a/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/impl/MessageRouterPublisherImpl.java b/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/impl/MessageRouterPublisherImpl.java
index 7d1b0a93..5f72808e 100644
--- a/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/impl/MessageRouterPublisherImpl.java
+++ b/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/impl/MessageRouterPublisherImpl.java
@@ -34,6 +34,7 @@ import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.HttpResponse;
import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.ImmutableHttpRequest;
import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.RequestBody;
import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.RxHttpClient;
+import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.exceptions.RetryableException;
import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.ContentType;
import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.MessageRouterPublisher;
import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.error.ClientErrorReason;
@@ -48,6 +49,7 @@ import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
+import java.net.ConnectException;
import java.time.Duration;
import java.util.stream.Collectors;
@@ -87,8 +89,12 @@ public class MessageRouterPublisherImpl implements MessageRouterPublisher {
LOGGER.trace("The items to be sent: {}", batch);
return httpClient.call(buildHttpRequest(request, createBody(batch, request.contentType())))
.map(httpResponse -> buildResponse(httpResponse, batch))
- .doOnError(ReadTimeoutException.class, e -> LOGGER.error("Timeout exception occurred when sending items to DMaaP MR", e))
- .onErrorResume(ReadTimeoutException.class, e -> createErrorResponse(ClientErrorReasons.TIMEOUT));
+ .doOnError(ReadTimeoutException.class,
+ e -> LOGGER.error("Timeout exception occurred when sending items to DMaaP MR", e))
+ .onErrorResume(ReadTimeoutException.class, e -> buildErrorResponse(ClientErrorReasons.TIMEOUT))
+ .doOnError(ConnectException.class, e -> LOGGER.error("DMaaP MR is unavailable, {}", e.getMessage()))
+ .onErrorResume(ConnectException.class, e -> buildErrorResponse(ClientErrorReasons.SERVICE_UNAVAILABLE))
+ .onErrorResume(RetryableException.class, e -> Mono.just(buildResponse(e.getResponse(), batch)));
}
private @NotNull RequestBody createBody(List<? extends JsonElement> subItems, ContentType contentType) {
@@ -126,7 +132,7 @@ public class MessageRouterPublisherImpl implements MessageRouterPublisher {
: builder.failReason(extractFailReason(httpResponse)).build();
}
- private Mono<MessageRouterPublishResponse> createErrorResponse(ClientErrorReason clientErrorReason) {
+ private Mono<MessageRouterPublishResponse> buildErrorResponse(ClientErrorReason clientErrorReason) {
String failReason = clientErrorReasonPresenter.present(clientErrorReason);
return Mono.just(ImmutableMessageRouterPublishResponse.builder()
.failReason(failReason)
diff --git a/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/impl/MessageRouterSubscriberImpl.java b/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/impl/MessageRouterSubscriberImpl.java
index 292a7157..acb297ab 100644
--- a/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/impl/MessageRouterSubscriberImpl.java
+++ b/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/impl/MessageRouterSubscriberImpl.java
@@ -33,6 +33,7 @@ import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.HttpRequest;
import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.HttpResponse;
import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.ImmutableHttpRequest;
import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.RxHttpClient;
+import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.exceptions.RetryableException;
import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.MessageRouterSubscriber;
import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.error.ClientErrorReason;
import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.error.ClientErrorReasonPresenter;
@@ -44,6 +45,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Mono;
+import java.net.ConnectException;
import java.nio.charset.StandardCharsets;
import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.impl.Commons.extractFailReason;
@@ -72,10 +74,12 @@ public class MessageRouterSubscriberImpl implements MessageRouterSubscriber {
.map(this::buildGetResponse)
.doOnError(ReadTimeoutException.class,
e -> LOGGER.error("Timeout exception occurred when subscribe items from DMaaP MR", e))
- .onErrorResume(ReadTimeoutException.class, e -> createErrorResponse(ClientErrorReasons.TIMEOUT));
+ .onErrorResume(ReadTimeoutException.class, e -> buildErrorResponse(ClientErrorReasons.TIMEOUT))
+ .doOnError(ConnectException.class, e -> LOGGER.error("DMaaP MR is unavailable, {}", e.getMessage()))
+ .onErrorResume(ConnectException.class, e -> buildErrorResponse(ClientErrorReasons.SERVICE_UNAVAILABLE))
+ .onErrorResume(RetryableException.class, e -> Mono.just(buildGetResponse(e.getResponse())));
}
-
private @NotNull HttpRequest buildGetHttpRequest(MessageRouterSubscribeRequest request) {
ImmutableHttpRequest.Builder requestBuilder = ImmutableHttpRequest.builder()
.method(HttpMethod.GET)
@@ -107,7 +111,7 @@ public class MessageRouterSubscriberImpl implements MessageRouterSubscriber {
request.consumerId());
}
- private Mono<MessageRouterSubscribeResponse> createErrorResponse(ClientErrorReason clientErrorReason) {
+ private Mono<MessageRouterSubscribeResponse> buildErrorResponse(ClientErrorReason clientErrorReason) {
String failReason = clientErrorReasonPresenter.present(clientErrorReason);
return Mono.just(ImmutableMessageRouterSubscribeResponse.builder()
.failReason(failReason)
diff --git a/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/model/config/DmaapRetryConfig.java b/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/model/config/DmaapRetryConfig.java
index f82edfc9..bb662206 100644
--- a/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/model/config/DmaapRetryConfig.java
+++ b/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/model/config/DmaapRetryConfig.java
@@ -31,7 +31,6 @@ import java.net.ConnectException;
public interface DmaapRetryConfig {
Set<Class<? extends Throwable>> RETRYABLE_EXCEPTIONS = HashSet.of(ReadTimeoutException.class, ConnectException.class);
- RuntimeException ON_RETRY_EXHAUSTED_EXCEPTION = ReadTimeoutException.INSTANCE;
Set<Integer> RETRYABLE_HTTP_CODES = HashSet.of(404, 408, 413, 429, 500, 502, 503, 504);
@Value.Default
diff --git a/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterPublisherIT.java b/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterPublisherIT.java
index 159fc598..825ea395 100644
--- a/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterPublisherIT.java
+++ b/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterPublisherIT.java
@@ -438,6 +438,41 @@ class MessageRouterPublisherIT {
MOCK_SERVER_CLIENT.verify(request().withPath(path), VerificationTimes.exactly(5));
}
+ @Test
+ void publisher_shouldHandleLastRetryError500() {
+ final String topic = "TOPIC14";
+ final String topicUrl = String.format("%s/%s", PROXY_MOCK_EVENTS_PATH, topic);
+
+ final List<String> twoJsonMessages = List.of("{\"message\":\"message1\"}",
+ "{\"differentMessage\":\"message2\"}");
+ final Flux<JsonElement> plainBatch = plainBatch(twoJsonMessages);
+
+ final MessageRouterPublishRequest publishRequest = createPublishRequest(topicUrl, Duration.ofSeconds(1));
+ final String responseMessage = "Response Message";
+ final MessageRouterPublishResponse expectedResponse = errorPublishResponse(
+ "500 Internal Server Error\n%s", responseMessage);
+
+ final String path = String.format("/events/%s", topic);
+ MOCK_SERVER_CLIENT
+ .when(request().withPath(path), Times.once())
+ .respond(response().withStatusCode(404));
+ MOCK_SERVER_CLIENT
+ .when(request().withPath(path), Times.once())
+ .respond(response().withStatusCode(500).withBody(responseMessage));
+ final MessageRouterPublisher publisher = DmaapClientFactory.createMessageRouterPublisher(retryConfig(1, 1));
+
+ //when
+ final Flux<MessageRouterPublishResponse> result = publisher.put(publishRequest, plainBatch);
+
+ //then
+ StepVerifier.create(result)
+ .expectNext(expectedResponse)
+ .expectComplete()
+ .verify();
+
+ MOCK_SERVER_CLIENT.verify(request().withPath(path), VerificationTimes.exactly(2));
+ }
+
private MessageRouterPublisherConfig retryConfig(int retryInterval, int retryCount) {
return ImmutableMessageRouterPublisherConfig.builder()
.retryConfig(ImmutableDmaapRetryConfig.builder()
diff --git a/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterPublisherTest.java b/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterPublisherTest.java
index 82b6661c..82a2b008 100644
--- a/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterPublisherTest.java
+++ b/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterPublisherTest.java
@@ -23,7 +23,6 @@ package org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api;
import com.google.gson.JsonElement;
import com.google.gson.JsonPrimitive;
import io.vavr.collection.List;
-import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.CsvSource;
@@ -57,6 +56,7 @@ class MessageRouterPublisherTest {
private static final String ERROR_MESSAGE = "Something went wrong";
private static final String TIMEOUT_ERROR_MESSAGE = "408 Request Timeout";
+ private static final String CONNECTION_ERROR_MESSAGE = "503 Service unavailable";
private static final String SUCCESS_RESP_TOPIC_PATH = "/events/TOPIC";
private static final String DELAY_RESP_TOPIC_PATH = "/events/DELAY";
private static final String FAILING_WITH_400_RESP_PATH = "/events/TOPIC400";
@@ -68,15 +68,13 @@ class MessageRouterPublisherTest {
private static final Flux<JsonPrimitive> messageBatch = Flux.just("ala", "ma", "kota")
.map(JsonPrimitive::new);
private static final List<String> messageBatchItems = List.of("ala", "ma", "kota");
-
- private static DummyHttpServer server;
+ private static final DummyHttpServer DISPOSED_HTTP_SERVER = initialize().closeAndGet();
+ private static final DummyHttpServer SERVER = initialize();
private MessageRouterPublisher sut = DmaapClientFactory
.createMessageRouterPublisher(MessageRouterPublisherConfig.createDefault());
-
- @BeforeAll
- static void setUp() {
- server = DummyHttpServer.start(routes -> routes
+ private static DummyHttpServer initialize() {
+ return DummyHttpServer.start(routes -> routes
.post(SUCCESS_RESP_TOPIC_PATH, (req, resp) -> sendString(resp, Mono.just("OK")))
.post(DELAY_RESP_TOPIC_PATH, (req, resp) -> sendWithDelay(resp, 200, TIMEOUT))
.post(FAILING_WITH_400_RESP_PATH, (req, resp) -> sendError(resp, 400, ERROR_MESSAGE))
@@ -90,7 +88,7 @@ class MessageRouterPublisherTest {
@Test
void test_put_givenMessageBatch_shouldMakeSuccessfulPostRequestReturningBatch() {
//given
- final MessageRouterPublishRequest mrRequest = createTextPlainMRRequest(SUCCESS_RESP_TOPIC_PATH);
+ final MessageRouterPublishRequest mrRequest = createTextPlainMRRequest(SUCCESS_RESP_TOPIC_PATH, SERVER);
final List<JsonElement> expectedItems = messageBatchItems.map(JsonPrimitive::new);
//when
@@ -113,7 +111,7 @@ class MessageRouterPublisherTest {
})
void publisher_shouldHandleError(String failingPath, String failReason) {
//given
- final MessageRouterPublishRequest mrRequest = createTextPlainMRRequest(failingPath);
+ final MessageRouterPublishRequest mrRequest = createTextPlainMRRequest(failingPath, SERVER);
final MessageRouterPublishResponse expectedResponse = createErrorResponse(failReason);
//when
@@ -142,8 +140,24 @@ class MessageRouterPublisherTest {
.verify(TIMEOUT);
}
- private static MessageRouterPublishRequest createTextPlainMRRequest(String topicPath) {
- final MessageRouterSink sinkDefinition = createMRSink(topicPath);
+ @Test
+ void publisher_shouldHandleConnectionError() {
+ //given
+ final MessageRouterPublishRequest mrRequest = createTextPlainMRRequest(
+ SUCCESS_RESP_TOPIC_PATH, DISPOSED_HTTP_SERVER);
+
+ //when
+ final Flux<MessageRouterPublishResponse> result = sut.put(mrRequest, messageBatch);
+
+ //then
+ StepVerifier.create(result)
+ .consumeNextWith(this::assertConnectionError)
+ .expectComplete()
+ .verify(TIMEOUT);
+ }
+
+ private static MessageRouterPublishRequest createTextPlainMRRequest(String topicPath, DummyHttpServer dummyHttpServer) {
+ final MessageRouterSink sinkDefinition = createMRSink(topicPath, dummyHttpServer);
return ImmutableMessageRouterPublishRequest.builder()
.sinkDefinition(sinkDefinition)
.contentType(ContentType.TEXT_PLAIN)
@@ -151,7 +165,7 @@ class MessageRouterPublisherTest {
}
private static MessageRouterPublishRequest createTextPlainMRRequest(String topicPath, Duration timeout) {
- final MessageRouterSink sinkDefinition = createMRSink(topicPath);
+ final MessageRouterSink sinkDefinition = createMRSink(topicPath, SERVER);
return ImmutableMessageRouterPublishRequest.builder()
.sinkDefinition(sinkDefinition)
.contentType(ContentType.TEXT_PLAIN)
@@ -159,12 +173,12 @@ class MessageRouterPublisherTest {
.build();
}
- private static MessageRouterSink createMRSink(String topicPath) {
+ private static MessageRouterSink createMRSink(String topicPath, DummyHttpServer dummyHttpServer) {
return ImmutableMessageRouterSink.builder()
.name("the topic")
.topicUrl(String.format("http://%s:%d%s",
- server.host(),
- server.port(),
+ dummyHttpServer.host(),
+ dummyHttpServer.port(),
topicPath)
)
.build();
@@ -182,5 +196,10 @@ class MessageRouterPublisherTest {
assertThat(response.failed()).isTrue();
assertThat(response.failReason()).startsWith(TIMEOUT_ERROR_MESSAGE);
}
+
+ private void assertConnectionError(DmaapResponse response) {
+ assertThat(response.failed()).isTrue();
+ assertThat(response.failReason()).startsWith(CONNECTION_ERROR_MESSAGE);
+ }
}
diff --git a/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterSubscriberIT.java b/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterSubscriberIT.java
index 15c3bd8e..2cc6d339 100644
--- a/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterSubscriberIT.java
+++ b/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterSubscriberIT.java
@@ -23,6 +23,7 @@ package org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api;
import com.google.gson.JsonElement;
import com.google.gson.JsonObject;
import io.vavr.collection.List;
+import io.vavr.control.Try;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
@@ -382,6 +383,39 @@ class MessageRouterSubscriberIT {
MOCK_SERVER_CLIENT.verify(request().withPath(path), VerificationTimes.exactly(5));
}
+ @Test
+ void subscriber_shouldHandleLastRetryError500() {
+ //given
+ final String topic = "TOPIC9";
+ final String proxyTopicUrl = String.format("%s/%s", PROXY_MOCK_EVENTS_PATH, topic);
+ final MessageRouterSubscribeRequest subscribeRequest = createMRSubscribeRequest(
+ proxyTopicUrl, CONSUMER_GROUP, CONSUMER_ID);
+ final String responseMessage = "Response Message";
+ final MessageRouterSubscribeResponse expectedResponse = errorSubscribeResponse(
+ "500 Internal Server Error\n%s", responseMessage);
+
+ final String path = String.format("/events/%s/%s/%s", topic, CONSUMER_GROUP, CONSUMER_ID);
+ MOCK_SERVER_CLIENT
+ .when(request().withPath(path), Times.once())
+ .respond(response().withStatusCode(404));
+ MOCK_SERVER_CLIENT
+ .when(request().withPath(path), Times.once())
+ .respond(response().withStatusCode(500).withBody(responseMessage));
+ final MessageRouterSubscriber subscriber = DmaapClientFactory.createMessageRouterSubscriber(
+ retryConfig(1, 1));
+
+ //when
+ Mono<MessageRouterSubscribeResponse> response = subscriber.get(subscribeRequest);
+
+ //then
+ StepVerifier.create(response)
+ .expectNext(expectedResponse)
+ .expectComplete()
+ .verify();
+
+ MOCK_SERVER_CLIENT.verify(request().withPath(path), VerificationTimes.exactly(2));
+ }
+
private MessageRouterSubscriberConfig retryConfig(int retryInterval, int retryCount) {
return ImmutableMessageRouterSubscriberConfig.builder()
.retryConfig(ImmutableDmaapRetryConfig.builder()
diff --git a/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterSubscriberTest.java b/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterSubscriberTest.java
index 06875394..e928f03c 100644
--- a/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterSubscriberTest.java
+++ b/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterSubscriberTest.java
@@ -23,7 +23,6 @@ package org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api;
import com.google.gson.JsonElement;
import com.google.gson.JsonPrimitive;
import io.vavr.collection.List;
-import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.CsvSource;
@@ -55,6 +54,7 @@ import static org.onap.dcaegen2.services.sdk.rest.services.adapters.http.test.Du
class MessageRouterSubscriberTest {
private static final Duration TIMEOUT = Duration.ofSeconds(10);
private static final String ERROR_MESSAGE = "Something went wrong";
+ private static final String CONNECTION_ERROR_MESSAGE = "503 Service unavailable";
private static final String TIMEOUT_ERROR_MESSAGE = "408 Request Timeout";
private static final String CONSUMER_GROUP = "group1";
private static final String SUCCESS_CONSUMER_ID = "consumer200";
@@ -82,16 +82,18 @@ class MessageRouterSubscriberTest {
private static final String FAILING_WITH_500_RESP_PATH = String
.format("%s/%s", CONSUMER_PATH, FAILING_WITH_500_CONSUMER_ID);
- private static MessageRouterSubscribeRequest mrSuccessRequest;
- private static MessageRouterSubscribeRequest mrFailingRequest;
+ private static final DummyHttpServer DISPOSED_HTTP_SERVER = initialize().closeAndGet();
+ private static final DummyHttpServer SERVER = initialize();
+
private MessageRouterSubscriber sut = DmaapClientFactory
.createMessageRouterSubscriber(MessageRouterSubscriberConfig.createDefault());
- private static MessageRouterSource sourceDefinition;
-
+ private static MessageRouterSource sourceDefinition = createMessageRouterSource(SERVER);
+ private static MessageRouterSource failingSourceDefinition = createMessageRouterSource(DISPOSED_HTTP_SERVER);
+ private static MessageRouterSubscribeRequest mrSuccessRequest = createSuccessRequest(sourceDefinition);
+ private static MessageRouterSubscribeRequest mrFailingRequest = createFailingRequest(FAILING_WITH_500_CONSUMER_ID);
- @BeforeAll
- static void setUp() {
- DummyHttpServer server = DummyHttpServer.start(routes -> routes
+ private static DummyHttpServer initialize() {
+ return DummyHttpServer.start(routes -> routes
.get(SUCCESS_RESP_PATH, (req, resp) ->
sendResource(resp, "/sample-mr-subscribe-response.json"))
.get(DELAY_RESP_PATH, (req, resp) -> sendWithDelay(resp, 200, TIMEOUT))
@@ -100,12 +102,6 @@ class MessageRouterSubscriberTest {
.get(FAILING_WITH_409_RESP_PATH, (req, resp) -> sendError(resp, 409, ERROR_MESSAGE))
.get(FAILING_WITH_429_RESP_PATH, (req, resp) -> sendError(resp, 429, ERROR_MESSAGE))
.get(FAILING_WITH_500_RESP_PATH, (req, resp) -> sendError(resp, 500, ERROR_MESSAGE)));
-
- sourceDefinition = createMessageRouterSource(server);
-
- mrSuccessRequest = createSuccessRequest();
-
- mrFailingRequest = createFailingRequest(FAILING_WITH_500_CONSUMER_ID);
}
@Test
@@ -204,6 +200,17 @@ class MessageRouterSubscriberTest {
.verify(TIMEOUT);
}
+ @Test
+ void subscriber_shouldHandleConnectionError() {
+ MessageRouterSubscribeRequest request = createSuccessRequest(failingSourceDefinition);
+ Mono<MessageRouterSubscribeResponse> response = sut.get(request);
+
+ StepVerifier.create(response)
+ .consumeNextWith(this::assertConnectionError)
+ .expectComplete()
+ .verify(TIMEOUT);
+ }
+
private static MessageRouterSource createMessageRouterSource(DummyHttpServer server) {
return ImmutableMessageRouterSource.builder()
.name("the topic")
@@ -211,9 +218,9 @@ class MessageRouterSubscriberTest {
.build();
}
- private static MessageRouterSubscribeRequest createSuccessRequest() {
+ private static MessageRouterSubscribeRequest createSuccessRequest(MessageRouterSource source) {
return ImmutableMessageRouterSubscribeRequest.builder()
- .sourceDefinition(sourceDefinition)
+ .sourceDefinition(source)
.consumerGroup(CONSUMER_GROUP)
.consumerId(SUCCESS_CONSUMER_ID)
.build();
@@ -249,5 +256,10 @@ class MessageRouterSubscriberTest {
assertThat(response.failed()).isTrue();
assertThat(response.failReason()).startsWith(TIMEOUT_ERROR_MESSAGE);
}
+
+ private void assertConnectionError(DmaapResponse response) {
+ assertThat(response.failed()).isTrue();
+ assertThat(response.failReason()).startsWith(CONNECTION_ERROR_MESSAGE);
+ }
}
diff --git a/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/impl/MessageRouterPublisherImplTest.java b/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/impl/MessageRouterPublisherImplTest.java
index 2825a87c..2fde441d 100644
--- a/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/impl/MessageRouterPublisherImplTest.java
+++ b/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/impl/MessageRouterPublisherImplTest.java
@@ -46,6 +46,7 @@ import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.test.StepVerifier;
+import java.net.ConnectException;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
@@ -79,6 +80,7 @@ class MessageRouterPublisherImplTest {
private final MessageRouterPublishRequest plainPublishRequest = createPublishRequest(TOPIC_URL, ContentType.TEXT_PLAIN);
private final MessageRouterPublishRequest jsonPublishRequest = createPublishRequest(TOPIC_URL);
private final HttpResponse successHttpResponse = createHttpResponse("OK", 200);
+ private final HttpResponse retryableHttpResponse = createHttpResponse("ERROR", 500);
@Test
void puttingElementsShouldYieldNonChunkedHttpRequest() {
@@ -431,7 +433,49 @@ class MessageRouterPublisherImplTest {
// then
StepVerifier.create(responses)
- .consumeNextWith(this::assertTimeoutError)
+ .consumeNextWith(this::assertFailedResponse)
+ .expectComplete()
+ .verify(TIMEOUT);
+ }
+
+ @Test
+ void onPut_whenConnectionExceptionOccurs_shouldReturnOneConnectionException() {
+ // given
+ final List<String> plainMessage = List.of("I", "like", "cookies");
+
+ final Flux<JsonElement> plainMessagesMaxBatch = plainBatch(plainMessage);
+ given(clientErrorReasonPresenter.present(any()))
+ .willReturn(ERROR_MESSAGE);
+ given(httpClient.call(any(HttpRequest.class)))
+ .willReturn(Mono.error(new ConnectException()));
+
+ // when
+ final Flux<MessageRouterPublishResponse> responses = cut
+ .put(plainPublishRequest, plainMessagesMaxBatch);
+
+ // then
+ StepVerifier.create(responses)
+ .consumeNextWith(this::assertFailedResponse)
+ .expectComplete()
+ .verify(TIMEOUT);
+ }
+
+ @Test
+ void onPut_whenRetryableExceptionOccurs_shouldReturnCertainFailedResponse() {
+ // given
+ final List<String> plainMessage = List.of("I", "like", "cookies");
+
+ final Flux<JsonElement> plainMessagesMaxBatch = plainBatch(plainMessage);
+ given(httpClient.call(any(HttpRequest.class)))
+ .willReturn(Mono.just(retryableHttpResponse));
+
+ // when
+ final Flux<MessageRouterPublishResponse> responses = cut
+ .put(plainPublishRequest, plainMessagesMaxBatch);
+
+ // then
+ StepVerifier.create(responses)
+ .consumeNextWith(this::assertRetryableFailedResponse)
.expectComplete()
.verify(TIMEOUT);
}
@@ -458,7 +502,7 @@ class MessageRouterPublisherImplTest {
// then
StepVerifier.create(responses)
.consumeNextWith(response -> verifySuccessfulResponses(parsedThreeMessages, response))
- .consumeNextWith(this::assertTimeoutError)
+ .consumeNextWith(this::assertFailedResponse)
.expectComplete()
.verify(TIMEOUT);
}
@@ -484,7 +528,7 @@ class MessageRouterPublisherImplTest {
// then
StepVerifier.create(responses)
- .consumeNextWith(this::assertTimeoutError)
+ .consumeNextWith(this::assertFailedResponse)
.consumeNextWith(response -> verifySuccessfulResponses(parsedTwoMessages, response))
.expectComplete()
.verify(TIMEOUT);
@@ -549,12 +593,18 @@ class MessageRouterPublisherImplTest {
}
}
- private void assertTimeoutError(MessageRouterPublishResponse response) {
+ private void assertFailedResponse(MessageRouterPublishResponse response) {
assertThat(response.failed()).isTrue();
assertThat(response.items()).isEmpty();
assertThat(response.failReason()).isEqualTo(ERROR_MESSAGE);
}
+ private void assertRetryableFailedResponse(MessageRouterPublishResponse response) {
+ assertThat(response.failed()).isTrue();
+ assertThat(response.items()).isEmpty();
+ assertThat(response.failReason()).startsWith("500 ERROR");
+ }
+
private void verifySingleResponse(List<? extends JsonElement> threeMessages,
Flux<MessageRouterPublishResponse> responses) {
StepVerifier.create(responses)
diff --git a/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/impl/MessageRouterSubscriberImplTest.java b/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/impl/MessageRouterSubscriberImplTest.java
index 0396eff9..74b21ad6 100644
--- a/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/impl/MessageRouterSubscriberImplTest.java
+++ b/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/impl/MessageRouterSubscriberImplTest.java
@@ -25,6 +25,7 @@ import static org.assertj.core.api.Assertions.assertThatExceptionOfType;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.BDDMockito.given;
import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import com.google.gson.JsonSyntaxException;
@@ -42,6 +43,8 @@ import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRo
import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.MessageRouterSubscriberConfig;
import reactor.core.publisher.Mono;
+import java.net.ConnectException;
+
/**
* @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a>
* @since May 2019
@@ -70,6 +73,12 @@ class MessageRouterSubscriberImplTest {
.url(sourceDefinition.topicUrl())
.rawBody("[]".getBytes())
.build();
+ private final HttpResponse retryableHttpResponse = ImmutableHttpResponse.builder()
+ .statusCode(500)
+ .statusReason("Something braked")
+ .url(sourceDefinition.topicUrl())
+ .rawBody("[]".getBytes())
+ .build();
private final HttpResponse httpResponseWithWrongStatusCode = ImmutableHttpResponse.builder()
.statusCode(301)
.statusReason("Something braked")
@@ -154,6 +163,53 @@ class MessageRouterSubscriberImplTest {
assertThat(response.failReason()).isEqualTo(ERROR_MESSAGE);
assertThat(response.hasElements()).isFalse();
+ verify(clientErrorReasonPresenter, times(1)).present(any());
+ verify(httpClient).call(httpRequestArgumentCaptor.capture());
+ final HttpRequest httpRequest = httpRequestArgumentCaptor.getValue();
+ assertThat(httpRequest.method()).isEqualTo(HttpMethod.GET);
+ assertThat(httpRequest.url()).isEqualTo(String.format("%s/%s/%s", sourceDefinition.topicUrl(),
+ mrRequest.consumerGroup(), mrRequest.consumerId()));
+ assertThat(httpRequest.body()).isNull();
+ }
+
+ @Test
+ void getWithProperRequest_shouldReturnConnectionException() {
+ given(clientErrorReasonPresenter.present(any()))
+ .willReturn(ERROR_MESSAGE);
+ given(httpClient.call(any(HttpRequest.class)))
+ .willReturn(Mono.error(new ConnectException()));
+
+ // when
+ final Mono<MessageRouterSubscribeResponse> responses = cut
+ .get(mrRequest);
+ final MessageRouterSubscribeResponse response = responses.block();
+
+ assertThat(response.failed()).isTrue();
+ assertThat(response.failReason()).isEqualTo(ERROR_MESSAGE);
+ assertThat(response.hasElements()).isFalse();
+
+ verify(clientErrorReasonPresenter, times(1)).present(any());
+ verify(httpClient).call(httpRequestArgumentCaptor.capture());
+ final HttpRequest httpRequest = httpRequestArgumentCaptor.getValue();
+ assertThat(httpRequest.method()).isEqualTo(HttpMethod.GET);
+ assertThat(httpRequest.url()).isEqualTo(String.format("%s/%s/%s", sourceDefinition.topicUrl(),
+ mrRequest.consumerGroup(), mrRequest.consumerId()));
+ assertThat(httpRequest.body()).isNull();
+ }
+
+ @Test
+ void getWithProperRequest_shouldReturnCertainFailedResponse() {
+ given(httpClient.call(any(HttpRequest.class)))
+ .willReturn(Mono.just(retryableHttpResponse));
+
+ // when
+ final Mono<MessageRouterSubscribeResponse> responses = cut
+ .get(mrRequest);
+ final MessageRouterSubscribeResponse response = responses.block();
+
+ assertThat(response.failed()).isTrue();
+ assertThat(response.failReason()).startsWith("500 Something braked");
+ assertThat(response.hasElements()).isFalse();
verify(httpClient).call(httpRequestArgumentCaptor.capture());
final HttpRequest httpRequest = httpRequestArgumentCaptor.getValue();