diff options
author | tkogut <tomasz.kogut@nokia.com> | 2021-02-01 10:26:29 +0100 |
---|---|---|
committer | Tomasz Kogut <tomasz.kogut@nokia.com> | 2021-02-01 11:43:49 +0000 |
commit | e6a88a86d3b76b30efc1da367d619c71296b15e8 (patch) | |
tree | d087cdd511e9ebf0b64d20123cbdda02296dadc4 /rest-services/dmaap-client/src | |
parent | 214d24db845fe1485f91b03971c40640601881ca (diff) |
Improve retry mechanism in dmaap-client.
Return last exception instead of timeout exception when retry exhausted.
Handle no connection exception when sending requests to dmaap-mr.
Issue-ID: DCAEGEN2-1483
Signed-off-by: tkogut <tomasz.kogut@nokia.com>
Change-Id: Ibe318fa349b79999a5c8054e04e72e444a42ea78
Diffstat (limited to 'rest-services/dmaap-client/src')
11 files changed, 264 insertions, 45 deletions
diff --git a/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/DmaapClientFactory.java b/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/DmaapClientFactory.java index 9d255559..95850078 100644 --- a/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/DmaapClientFactory.java +++ b/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/DmaapClientFactory.java @@ -36,7 +36,6 @@ import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.Me import java.time.Duration; -import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.DmaapRetryConfig.ON_RETRY_EXHAUSTED_EXCEPTION; import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.DmaapRetryConfig.RETRYABLE_EXCEPTIONS; import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.DmaapRetryConfig.RETRYABLE_HTTP_CODES; @@ -83,7 +82,6 @@ public final class DmaapClientFactory { .retryCount(rc.retryCount()) .retryableHttpResponseCodes(RETRYABLE_HTTP_CODES) .customRetryableExceptions(RETRYABLE_EXCEPTIONS) - .onRetryExhaustedException(ON_RETRY_EXHAUSTED_EXCEPTION) .build()) .getOrNull(); } diff --git a/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/error/ClientErrorReasons.java b/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/error/ClientErrorReasons.java index 5a51e5f2..431843ae 100644 --- a/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/error/ClientErrorReasons.java +++ b/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/error/ClientErrorReasons.java @@ -2,7 +2,7 @@ * ============LICENSE_START==================================== * DCAEGEN2-SERVICES-SDK * ========================================================= - * Copyright (C) 2020 Nokia. All rights reserved. + * Copyright (C) 2021 Nokia. All rights reserved. * ========================================================= * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -32,4 +32,10 @@ public class ClientErrorReasons { .messageId("SVC0001") .variables(Collections.singletonList("408")).build(); + public static final ClientErrorReason SERVICE_UNAVAILABLE = ImmutableClientErrorReason.builder() + .header("503 Service unavailable") + .text("DMaaP MR is unavailable") + .messageId("SVC2001") + .build(); + } diff --git a/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/impl/MessageRouterPublisherImpl.java b/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/impl/MessageRouterPublisherImpl.java index 7d1b0a93..5f72808e 100644 --- a/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/impl/MessageRouterPublisherImpl.java +++ b/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/impl/MessageRouterPublisherImpl.java @@ -34,6 +34,7 @@ import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.HttpResponse; import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.ImmutableHttpRequest; import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.RequestBody; import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.RxHttpClient; +import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.exceptions.RetryableException; import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.ContentType; import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.MessageRouterPublisher; import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.error.ClientErrorReason; @@ -48,6 +49,7 @@ import org.slf4j.LoggerFactory; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; +import java.net.ConnectException; import java.time.Duration; import java.util.stream.Collectors; @@ -87,8 +89,12 @@ public class MessageRouterPublisherImpl implements MessageRouterPublisher { LOGGER.trace("The items to be sent: {}", batch); return httpClient.call(buildHttpRequest(request, createBody(batch, request.contentType()))) .map(httpResponse -> buildResponse(httpResponse, batch)) - .doOnError(ReadTimeoutException.class, e -> LOGGER.error("Timeout exception occurred when sending items to DMaaP MR", e)) - .onErrorResume(ReadTimeoutException.class, e -> createErrorResponse(ClientErrorReasons.TIMEOUT)); + .doOnError(ReadTimeoutException.class, + e -> LOGGER.error("Timeout exception occurred when sending items to DMaaP MR", e)) + .onErrorResume(ReadTimeoutException.class, e -> buildErrorResponse(ClientErrorReasons.TIMEOUT)) + .doOnError(ConnectException.class, e -> LOGGER.error("DMaaP MR is unavailable, {}", e.getMessage())) + .onErrorResume(ConnectException.class, e -> buildErrorResponse(ClientErrorReasons.SERVICE_UNAVAILABLE)) + .onErrorResume(RetryableException.class, e -> Mono.just(buildResponse(e.getResponse(), batch))); } private @NotNull RequestBody createBody(List<? extends JsonElement> subItems, ContentType contentType) { @@ -126,7 +132,7 @@ public class MessageRouterPublisherImpl implements MessageRouterPublisher { : builder.failReason(extractFailReason(httpResponse)).build(); } - private Mono<MessageRouterPublishResponse> createErrorResponse(ClientErrorReason clientErrorReason) { + private Mono<MessageRouterPublishResponse> buildErrorResponse(ClientErrorReason clientErrorReason) { String failReason = clientErrorReasonPresenter.present(clientErrorReason); return Mono.just(ImmutableMessageRouterPublishResponse.builder() .failReason(failReason) diff --git a/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/impl/MessageRouterSubscriberImpl.java b/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/impl/MessageRouterSubscriberImpl.java index 292a7157..acb297ab 100644 --- a/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/impl/MessageRouterSubscriberImpl.java +++ b/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/impl/MessageRouterSubscriberImpl.java @@ -33,6 +33,7 @@ import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.HttpRequest; import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.HttpResponse; import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.ImmutableHttpRequest; import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.RxHttpClient; +import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.exceptions.RetryableException; import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.MessageRouterSubscriber; import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.error.ClientErrorReason; import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.error.ClientErrorReasonPresenter; @@ -44,6 +45,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import reactor.core.publisher.Mono; +import java.net.ConnectException; import java.nio.charset.StandardCharsets; import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.impl.Commons.extractFailReason; @@ -72,10 +74,12 @@ public class MessageRouterSubscriberImpl implements MessageRouterSubscriber { .map(this::buildGetResponse) .doOnError(ReadTimeoutException.class, e -> LOGGER.error("Timeout exception occurred when subscribe items from DMaaP MR", e)) - .onErrorResume(ReadTimeoutException.class, e -> createErrorResponse(ClientErrorReasons.TIMEOUT)); + .onErrorResume(ReadTimeoutException.class, e -> buildErrorResponse(ClientErrorReasons.TIMEOUT)) + .doOnError(ConnectException.class, e -> LOGGER.error("DMaaP MR is unavailable, {}", e.getMessage())) + .onErrorResume(ConnectException.class, e -> buildErrorResponse(ClientErrorReasons.SERVICE_UNAVAILABLE)) + .onErrorResume(RetryableException.class, e -> Mono.just(buildGetResponse(e.getResponse()))); } - private @NotNull HttpRequest buildGetHttpRequest(MessageRouterSubscribeRequest request) { ImmutableHttpRequest.Builder requestBuilder = ImmutableHttpRequest.builder() .method(HttpMethod.GET) @@ -107,7 +111,7 @@ public class MessageRouterSubscriberImpl implements MessageRouterSubscriber { request.consumerId()); } - private Mono<MessageRouterSubscribeResponse> createErrorResponse(ClientErrorReason clientErrorReason) { + private Mono<MessageRouterSubscribeResponse> buildErrorResponse(ClientErrorReason clientErrorReason) { String failReason = clientErrorReasonPresenter.present(clientErrorReason); return Mono.just(ImmutableMessageRouterSubscribeResponse.builder() .failReason(failReason) diff --git a/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/model/config/DmaapRetryConfig.java b/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/model/config/DmaapRetryConfig.java index f82edfc9..bb662206 100644 --- a/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/model/config/DmaapRetryConfig.java +++ b/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/model/config/DmaapRetryConfig.java @@ -31,7 +31,6 @@ import java.net.ConnectException; public interface DmaapRetryConfig { Set<Class<? extends Throwable>> RETRYABLE_EXCEPTIONS = HashSet.of(ReadTimeoutException.class, ConnectException.class); - RuntimeException ON_RETRY_EXHAUSTED_EXCEPTION = ReadTimeoutException.INSTANCE; Set<Integer> RETRYABLE_HTTP_CODES = HashSet.of(404, 408, 413, 429, 500, 502, 503, 504); @Value.Default diff --git a/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterPublisherIT.java b/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterPublisherIT.java index 159fc598..825ea395 100644 --- a/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterPublisherIT.java +++ b/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterPublisherIT.java @@ -438,6 +438,41 @@ class MessageRouterPublisherIT { MOCK_SERVER_CLIENT.verify(request().withPath(path), VerificationTimes.exactly(5)); } + @Test + void publisher_shouldHandleLastRetryError500() { + final String topic = "TOPIC14"; + final String topicUrl = String.format("%s/%s", PROXY_MOCK_EVENTS_PATH, topic); + + final List<String> twoJsonMessages = List.of("{\"message\":\"message1\"}", + "{\"differentMessage\":\"message2\"}"); + final Flux<JsonElement> plainBatch = plainBatch(twoJsonMessages); + + final MessageRouterPublishRequest publishRequest = createPublishRequest(topicUrl, Duration.ofSeconds(1)); + final String responseMessage = "Response Message"; + final MessageRouterPublishResponse expectedResponse = errorPublishResponse( + "500 Internal Server Error\n%s", responseMessage); + + final String path = String.format("/events/%s", topic); + MOCK_SERVER_CLIENT + .when(request().withPath(path), Times.once()) + .respond(response().withStatusCode(404)); + MOCK_SERVER_CLIENT + .when(request().withPath(path), Times.once()) + .respond(response().withStatusCode(500).withBody(responseMessage)); + final MessageRouterPublisher publisher = DmaapClientFactory.createMessageRouterPublisher(retryConfig(1, 1)); + + //when + final Flux<MessageRouterPublishResponse> result = publisher.put(publishRequest, plainBatch); + + //then + StepVerifier.create(result) + .expectNext(expectedResponse) + .expectComplete() + .verify(); + + MOCK_SERVER_CLIENT.verify(request().withPath(path), VerificationTimes.exactly(2)); + } + private MessageRouterPublisherConfig retryConfig(int retryInterval, int retryCount) { return ImmutableMessageRouterPublisherConfig.builder() .retryConfig(ImmutableDmaapRetryConfig.builder() diff --git a/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterPublisherTest.java b/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterPublisherTest.java index 82b6661c..82a2b008 100644 --- a/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterPublisherTest.java +++ b/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterPublisherTest.java @@ -23,7 +23,6 @@ package org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api; import com.google.gson.JsonElement; import com.google.gson.JsonPrimitive; import io.vavr.collection.List; -import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.CsvSource; @@ -57,6 +56,7 @@ class MessageRouterPublisherTest { private static final String ERROR_MESSAGE = "Something went wrong"; private static final String TIMEOUT_ERROR_MESSAGE = "408 Request Timeout"; + private static final String CONNECTION_ERROR_MESSAGE = "503 Service unavailable"; private static final String SUCCESS_RESP_TOPIC_PATH = "/events/TOPIC"; private static final String DELAY_RESP_TOPIC_PATH = "/events/DELAY"; private static final String FAILING_WITH_400_RESP_PATH = "/events/TOPIC400"; @@ -68,15 +68,13 @@ class MessageRouterPublisherTest { private static final Flux<JsonPrimitive> messageBatch = Flux.just("ala", "ma", "kota") .map(JsonPrimitive::new); private static final List<String> messageBatchItems = List.of("ala", "ma", "kota"); - - private static DummyHttpServer server; + private static final DummyHttpServer DISPOSED_HTTP_SERVER = initialize().closeAndGet(); + private static final DummyHttpServer SERVER = initialize(); private MessageRouterPublisher sut = DmaapClientFactory .createMessageRouterPublisher(MessageRouterPublisherConfig.createDefault()); - - @BeforeAll - static void setUp() { - server = DummyHttpServer.start(routes -> routes + private static DummyHttpServer initialize() { + return DummyHttpServer.start(routes -> routes .post(SUCCESS_RESP_TOPIC_PATH, (req, resp) -> sendString(resp, Mono.just("OK"))) .post(DELAY_RESP_TOPIC_PATH, (req, resp) -> sendWithDelay(resp, 200, TIMEOUT)) .post(FAILING_WITH_400_RESP_PATH, (req, resp) -> sendError(resp, 400, ERROR_MESSAGE)) @@ -90,7 +88,7 @@ class MessageRouterPublisherTest { @Test void test_put_givenMessageBatch_shouldMakeSuccessfulPostRequestReturningBatch() { //given - final MessageRouterPublishRequest mrRequest = createTextPlainMRRequest(SUCCESS_RESP_TOPIC_PATH); + final MessageRouterPublishRequest mrRequest = createTextPlainMRRequest(SUCCESS_RESP_TOPIC_PATH, SERVER); final List<JsonElement> expectedItems = messageBatchItems.map(JsonPrimitive::new); //when @@ -113,7 +111,7 @@ class MessageRouterPublisherTest { }) void publisher_shouldHandleError(String failingPath, String failReason) { //given - final MessageRouterPublishRequest mrRequest = createTextPlainMRRequest(failingPath); + final MessageRouterPublishRequest mrRequest = createTextPlainMRRequest(failingPath, SERVER); final MessageRouterPublishResponse expectedResponse = createErrorResponse(failReason); //when @@ -142,8 +140,24 @@ class MessageRouterPublisherTest { .verify(TIMEOUT); } - private static MessageRouterPublishRequest createTextPlainMRRequest(String topicPath) { - final MessageRouterSink sinkDefinition = createMRSink(topicPath); + @Test + void publisher_shouldHandleConnectionError() { + //given + final MessageRouterPublishRequest mrRequest = createTextPlainMRRequest( + SUCCESS_RESP_TOPIC_PATH, DISPOSED_HTTP_SERVER); + + //when + final Flux<MessageRouterPublishResponse> result = sut.put(mrRequest, messageBatch); + + //then + StepVerifier.create(result) + .consumeNextWith(this::assertConnectionError) + .expectComplete() + .verify(TIMEOUT); + } + + private static MessageRouterPublishRequest createTextPlainMRRequest(String topicPath, DummyHttpServer dummyHttpServer) { + final MessageRouterSink sinkDefinition = createMRSink(topicPath, dummyHttpServer); return ImmutableMessageRouterPublishRequest.builder() .sinkDefinition(sinkDefinition) .contentType(ContentType.TEXT_PLAIN) @@ -151,7 +165,7 @@ class MessageRouterPublisherTest { } private static MessageRouterPublishRequest createTextPlainMRRequest(String topicPath, Duration timeout) { - final MessageRouterSink sinkDefinition = createMRSink(topicPath); + final MessageRouterSink sinkDefinition = createMRSink(topicPath, SERVER); return ImmutableMessageRouterPublishRequest.builder() .sinkDefinition(sinkDefinition) .contentType(ContentType.TEXT_PLAIN) @@ -159,12 +173,12 @@ class MessageRouterPublisherTest { .build(); } - private static MessageRouterSink createMRSink(String topicPath) { + private static MessageRouterSink createMRSink(String topicPath, DummyHttpServer dummyHttpServer) { return ImmutableMessageRouterSink.builder() .name("the topic") .topicUrl(String.format("http://%s:%d%s", - server.host(), - server.port(), + dummyHttpServer.host(), + dummyHttpServer.port(), topicPath) ) .build(); @@ -182,5 +196,10 @@ class MessageRouterPublisherTest { assertThat(response.failed()).isTrue(); assertThat(response.failReason()).startsWith(TIMEOUT_ERROR_MESSAGE); } + + private void assertConnectionError(DmaapResponse response) { + assertThat(response.failed()).isTrue(); + assertThat(response.failReason()).startsWith(CONNECTION_ERROR_MESSAGE); + } } diff --git a/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterSubscriberIT.java b/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterSubscriberIT.java index 15c3bd8e..2cc6d339 100644 --- a/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterSubscriberIT.java +++ b/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterSubscriberIT.java @@ -23,6 +23,7 @@ package org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api; import com.google.gson.JsonElement; import com.google.gson.JsonObject; import io.vavr.collection.List; +import io.vavr.control.Try; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -382,6 +383,39 @@ class MessageRouterSubscriberIT { MOCK_SERVER_CLIENT.verify(request().withPath(path), VerificationTimes.exactly(5)); } + @Test + void subscriber_shouldHandleLastRetryError500() { + //given + final String topic = "TOPIC9"; + final String proxyTopicUrl = String.format("%s/%s", PROXY_MOCK_EVENTS_PATH, topic); + final MessageRouterSubscribeRequest subscribeRequest = createMRSubscribeRequest( + proxyTopicUrl, CONSUMER_GROUP, CONSUMER_ID); + final String responseMessage = "Response Message"; + final MessageRouterSubscribeResponse expectedResponse = errorSubscribeResponse( + "500 Internal Server Error\n%s", responseMessage); + + final String path = String.format("/events/%s/%s/%s", topic, CONSUMER_GROUP, CONSUMER_ID); + MOCK_SERVER_CLIENT + .when(request().withPath(path), Times.once()) + .respond(response().withStatusCode(404)); + MOCK_SERVER_CLIENT + .when(request().withPath(path), Times.once()) + .respond(response().withStatusCode(500).withBody(responseMessage)); + final MessageRouterSubscriber subscriber = DmaapClientFactory.createMessageRouterSubscriber( + retryConfig(1, 1)); + + //when + Mono<MessageRouterSubscribeResponse> response = subscriber.get(subscribeRequest); + + //then + StepVerifier.create(response) + .expectNext(expectedResponse) + .expectComplete() + .verify(); + + MOCK_SERVER_CLIENT.verify(request().withPath(path), VerificationTimes.exactly(2)); + } + private MessageRouterSubscriberConfig retryConfig(int retryInterval, int retryCount) { return ImmutableMessageRouterSubscriberConfig.builder() .retryConfig(ImmutableDmaapRetryConfig.builder() diff --git a/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterSubscriberTest.java b/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterSubscriberTest.java index 06875394..e928f03c 100644 --- a/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterSubscriberTest.java +++ b/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterSubscriberTest.java @@ -23,7 +23,6 @@ package org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api; import com.google.gson.JsonElement; import com.google.gson.JsonPrimitive; import io.vavr.collection.List; -import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.CsvSource; @@ -55,6 +54,7 @@ import static org.onap.dcaegen2.services.sdk.rest.services.adapters.http.test.Du class MessageRouterSubscriberTest { private static final Duration TIMEOUT = Duration.ofSeconds(10); private static final String ERROR_MESSAGE = "Something went wrong"; + private static final String CONNECTION_ERROR_MESSAGE = "503 Service unavailable"; private static final String TIMEOUT_ERROR_MESSAGE = "408 Request Timeout"; private static final String CONSUMER_GROUP = "group1"; private static final String SUCCESS_CONSUMER_ID = "consumer200"; @@ -82,16 +82,18 @@ class MessageRouterSubscriberTest { private static final String FAILING_WITH_500_RESP_PATH = String .format("%s/%s", CONSUMER_PATH, FAILING_WITH_500_CONSUMER_ID); - private static MessageRouterSubscribeRequest mrSuccessRequest; - private static MessageRouterSubscribeRequest mrFailingRequest; + private static final DummyHttpServer DISPOSED_HTTP_SERVER = initialize().closeAndGet(); + private static final DummyHttpServer SERVER = initialize(); + private MessageRouterSubscriber sut = DmaapClientFactory .createMessageRouterSubscriber(MessageRouterSubscriberConfig.createDefault()); - private static MessageRouterSource sourceDefinition; - + private static MessageRouterSource sourceDefinition = createMessageRouterSource(SERVER); + private static MessageRouterSource failingSourceDefinition = createMessageRouterSource(DISPOSED_HTTP_SERVER); + private static MessageRouterSubscribeRequest mrSuccessRequest = createSuccessRequest(sourceDefinition); + private static MessageRouterSubscribeRequest mrFailingRequest = createFailingRequest(FAILING_WITH_500_CONSUMER_ID); - @BeforeAll - static void setUp() { - DummyHttpServer server = DummyHttpServer.start(routes -> routes + private static DummyHttpServer initialize() { + return DummyHttpServer.start(routes -> routes .get(SUCCESS_RESP_PATH, (req, resp) -> sendResource(resp, "/sample-mr-subscribe-response.json")) .get(DELAY_RESP_PATH, (req, resp) -> sendWithDelay(resp, 200, TIMEOUT)) @@ -100,12 +102,6 @@ class MessageRouterSubscriberTest { .get(FAILING_WITH_409_RESP_PATH, (req, resp) -> sendError(resp, 409, ERROR_MESSAGE)) .get(FAILING_WITH_429_RESP_PATH, (req, resp) -> sendError(resp, 429, ERROR_MESSAGE)) .get(FAILING_WITH_500_RESP_PATH, (req, resp) -> sendError(resp, 500, ERROR_MESSAGE))); - - sourceDefinition = createMessageRouterSource(server); - - mrSuccessRequest = createSuccessRequest(); - - mrFailingRequest = createFailingRequest(FAILING_WITH_500_CONSUMER_ID); } @Test @@ -204,6 +200,17 @@ class MessageRouterSubscriberTest { .verify(TIMEOUT); } + @Test + void subscriber_shouldHandleConnectionError() { + MessageRouterSubscribeRequest request = createSuccessRequest(failingSourceDefinition); + Mono<MessageRouterSubscribeResponse> response = sut.get(request); + + StepVerifier.create(response) + .consumeNextWith(this::assertConnectionError) + .expectComplete() + .verify(TIMEOUT); + } + private static MessageRouterSource createMessageRouterSource(DummyHttpServer server) { return ImmutableMessageRouterSource.builder() .name("the topic") @@ -211,9 +218,9 @@ class MessageRouterSubscriberTest { .build(); } - private static MessageRouterSubscribeRequest createSuccessRequest() { + private static MessageRouterSubscribeRequest createSuccessRequest(MessageRouterSource source) { return ImmutableMessageRouterSubscribeRequest.builder() - .sourceDefinition(sourceDefinition) + .sourceDefinition(source) .consumerGroup(CONSUMER_GROUP) .consumerId(SUCCESS_CONSUMER_ID) .build(); @@ -249,5 +256,10 @@ class MessageRouterSubscriberTest { assertThat(response.failed()).isTrue(); assertThat(response.failReason()).startsWith(TIMEOUT_ERROR_MESSAGE); } + + private void assertConnectionError(DmaapResponse response) { + assertThat(response.failed()).isTrue(); + assertThat(response.failReason()).startsWith(CONNECTION_ERROR_MESSAGE); + } } diff --git a/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/impl/MessageRouterPublisherImplTest.java b/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/impl/MessageRouterPublisherImplTest.java index 2825a87c..2fde441d 100644 --- a/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/impl/MessageRouterPublisherImplTest.java +++ b/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/impl/MessageRouterPublisherImplTest.java @@ -46,6 +46,7 @@ import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.test.StepVerifier; +import java.net.ConnectException; import java.nio.charset.StandardCharsets; import java.time.Duration; @@ -79,6 +80,7 @@ class MessageRouterPublisherImplTest { private final MessageRouterPublishRequest plainPublishRequest = createPublishRequest(TOPIC_URL, ContentType.TEXT_PLAIN); private final MessageRouterPublishRequest jsonPublishRequest = createPublishRequest(TOPIC_URL); private final HttpResponse successHttpResponse = createHttpResponse("OK", 200); + private final HttpResponse retryableHttpResponse = createHttpResponse("ERROR", 500); @Test void puttingElementsShouldYieldNonChunkedHttpRequest() { @@ -431,7 +433,49 @@ class MessageRouterPublisherImplTest { // then StepVerifier.create(responses) - .consumeNextWith(this::assertTimeoutError) + .consumeNextWith(this::assertFailedResponse) + .expectComplete() + .verify(TIMEOUT); + } + + @Test + void onPut_whenConnectionExceptionOccurs_shouldReturnOneConnectionException() { + // given + final List<String> plainMessage = List.of("I", "like", "cookies"); + + final Flux<JsonElement> plainMessagesMaxBatch = plainBatch(plainMessage); + given(clientErrorReasonPresenter.present(any())) + .willReturn(ERROR_MESSAGE); + given(httpClient.call(any(HttpRequest.class))) + .willReturn(Mono.error(new ConnectException())); + + // when + final Flux<MessageRouterPublishResponse> responses = cut + .put(plainPublishRequest, plainMessagesMaxBatch); + + // then + StepVerifier.create(responses) + .consumeNextWith(this::assertFailedResponse) + .expectComplete() + .verify(TIMEOUT); + } + + @Test + void onPut_whenRetryableExceptionOccurs_shouldReturnCertainFailedResponse() { + // given + final List<String> plainMessage = List.of("I", "like", "cookies"); + + final Flux<JsonElement> plainMessagesMaxBatch = plainBatch(plainMessage); + given(httpClient.call(any(HttpRequest.class))) + .willReturn(Mono.just(retryableHttpResponse)); + + // when + final Flux<MessageRouterPublishResponse> responses = cut + .put(plainPublishRequest, plainMessagesMaxBatch); + + // then + StepVerifier.create(responses) + .consumeNextWith(this::assertRetryableFailedResponse) .expectComplete() .verify(TIMEOUT); } @@ -458,7 +502,7 @@ class MessageRouterPublisherImplTest { // then StepVerifier.create(responses) .consumeNextWith(response -> verifySuccessfulResponses(parsedThreeMessages, response)) - .consumeNextWith(this::assertTimeoutError) + .consumeNextWith(this::assertFailedResponse) .expectComplete() .verify(TIMEOUT); } @@ -484,7 +528,7 @@ class MessageRouterPublisherImplTest { // then StepVerifier.create(responses) - .consumeNextWith(this::assertTimeoutError) + .consumeNextWith(this::assertFailedResponse) .consumeNextWith(response -> verifySuccessfulResponses(parsedTwoMessages, response)) .expectComplete() .verify(TIMEOUT); @@ -549,12 +593,18 @@ class MessageRouterPublisherImplTest { } } - private void assertTimeoutError(MessageRouterPublishResponse response) { + private void assertFailedResponse(MessageRouterPublishResponse response) { assertThat(response.failed()).isTrue(); assertThat(response.items()).isEmpty(); assertThat(response.failReason()).isEqualTo(ERROR_MESSAGE); } + private void assertRetryableFailedResponse(MessageRouterPublishResponse response) { + assertThat(response.failed()).isTrue(); + assertThat(response.items()).isEmpty(); + assertThat(response.failReason()).startsWith("500 ERROR"); + } + private void verifySingleResponse(List<? extends JsonElement> threeMessages, Flux<MessageRouterPublishResponse> responses) { StepVerifier.create(responses) diff --git a/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/impl/MessageRouterSubscriberImplTest.java b/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/impl/MessageRouterSubscriberImplTest.java index 0396eff9..74b21ad6 100644 --- a/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/impl/MessageRouterSubscriberImplTest.java +++ b/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/impl/MessageRouterSubscriberImplTest.java @@ -25,6 +25,7 @@ import static org.assertj.core.api.Assertions.assertThatExceptionOfType; import static org.mockito.ArgumentMatchers.any; import static org.mockito.BDDMockito.given; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import com.google.gson.JsonSyntaxException; @@ -42,6 +43,8 @@ import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRo import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.MessageRouterSubscriberConfig; import reactor.core.publisher.Mono; +import java.net.ConnectException; + /** * @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a> * @since May 2019 @@ -70,6 +73,12 @@ class MessageRouterSubscriberImplTest { .url(sourceDefinition.topicUrl()) .rawBody("[]".getBytes()) .build(); + private final HttpResponse retryableHttpResponse = ImmutableHttpResponse.builder() + .statusCode(500) + .statusReason("Something braked") + .url(sourceDefinition.topicUrl()) + .rawBody("[]".getBytes()) + .build(); private final HttpResponse httpResponseWithWrongStatusCode = ImmutableHttpResponse.builder() .statusCode(301) .statusReason("Something braked") @@ -154,6 +163,53 @@ class MessageRouterSubscriberImplTest { assertThat(response.failReason()).isEqualTo(ERROR_MESSAGE); assertThat(response.hasElements()).isFalse(); + verify(clientErrorReasonPresenter, times(1)).present(any()); + verify(httpClient).call(httpRequestArgumentCaptor.capture()); + final HttpRequest httpRequest = httpRequestArgumentCaptor.getValue(); + assertThat(httpRequest.method()).isEqualTo(HttpMethod.GET); + assertThat(httpRequest.url()).isEqualTo(String.format("%s/%s/%s", sourceDefinition.topicUrl(), + mrRequest.consumerGroup(), mrRequest.consumerId())); + assertThat(httpRequest.body()).isNull(); + } + + @Test + void getWithProperRequest_shouldReturnConnectionException() { + given(clientErrorReasonPresenter.present(any())) + .willReturn(ERROR_MESSAGE); + given(httpClient.call(any(HttpRequest.class))) + .willReturn(Mono.error(new ConnectException())); + + // when + final Mono<MessageRouterSubscribeResponse> responses = cut + .get(mrRequest); + final MessageRouterSubscribeResponse response = responses.block(); + + assertThat(response.failed()).isTrue(); + assertThat(response.failReason()).isEqualTo(ERROR_MESSAGE); + assertThat(response.hasElements()).isFalse(); + + verify(clientErrorReasonPresenter, times(1)).present(any()); + verify(httpClient).call(httpRequestArgumentCaptor.capture()); + final HttpRequest httpRequest = httpRequestArgumentCaptor.getValue(); + assertThat(httpRequest.method()).isEqualTo(HttpMethod.GET); + assertThat(httpRequest.url()).isEqualTo(String.format("%s/%s/%s", sourceDefinition.topicUrl(), + mrRequest.consumerGroup(), mrRequest.consumerId())); + assertThat(httpRequest.body()).isNull(); + } + + @Test + void getWithProperRequest_shouldReturnCertainFailedResponse() { + given(httpClient.call(any(HttpRequest.class))) + .willReturn(Mono.just(retryableHttpResponse)); + + // when + final Mono<MessageRouterSubscribeResponse> responses = cut + .get(mrRequest); + final MessageRouterSubscribeResponse response = responses.block(); + + assertThat(response.failed()).isTrue(); + assertThat(response.failReason()).startsWith("500 Something braked"); + assertThat(response.hasElements()).isFalse(); verify(httpClient).call(httpRequestArgumentCaptor.capture()); final HttpRequest httpRequest = httpRequestArgumentCaptor.getValue(); |