diff options
15 files changed, 319 insertions, 112 deletions
diff --git a/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/DmaapClientFactory.java b/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/DmaapClientFactory.java index 9d255559..95850078 100644 --- a/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/DmaapClientFactory.java +++ b/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/DmaapClientFactory.java @@ -36,7 +36,6 @@ import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.Me import java.time.Duration; -import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.DmaapRetryConfig.ON_RETRY_EXHAUSTED_EXCEPTION; import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.DmaapRetryConfig.RETRYABLE_EXCEPTIONS; import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.DmaapRetryConfig.RETRYABLE_HTTP_CODES; @@ -83,7 +82,6 @@ public final class DmaapClientFactory { .retryCount(rc.retryCount()) .retryableHttpResponseCodes(RETRYABLE_HTTP_CODES) .customRetryableExceptions(RETRYABLE_EXCEPTIONS) - .onRetryExhaustedException(ON_RETRY_EXHAUSTED_EXCEPTION) .build()) .getOrNull(); } diff --git a/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/error/ClientErrorReasons.java b/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/error/ClientErrorReasons.java index 5a51e5f2..431843ae 100644 --- a/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/error/ClientErrorReasons.java +++ b/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/error/ClientErrorReasons.java @@ -2,7 +2,7 @@ * ============LICENSE_START==================================== * DCAEGEN2-SERVICES-SDK * ========================================================= - * Copyright (C) 2020 Nokia. All rights reserved. + * Copyright (C) 2021 Nokia. All rights reserved. * ========================================================= * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -32,4 +32,10 @@ public class ClientErrorReasons { .messageId("SVC0001") .variables(Collections.singletonList("408")).build(); + public static final ClientErrorReason SERVICE_UNAVAILABLE = ImmutableClientErrorReason.builder() + .header("503 Service unavailable") + .text("DMaaP MR is unavailable") + .messageId("SVC2001") + .build(); + } diff --git a/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/impl/MessageRouterPublisherImpl.java b/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/impl/MessageRouterPublisherImpl.java index 7d1b0a93..5f72808e 100644 --- a/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/impl/MessageRouterPublisherImpl.java +++ b/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/impl/MessageRouterPublisherImpl.java @@ -34,6 +34,7 @@ import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.HttpResponse; import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.ImmutableHttpRequest; import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.RequestBody; import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.RxHttpClient; +import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.exceptions.RetryableException; import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.ContentType; import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.MessageRouterPublisher; import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.error.ClientErrorReason; @@ -48,6 +49,7 @@ import org.slf4j.LoggerFactory; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; +import java.net.ConnectException; import java.time.Duration; import java.util.stream.Collectors; @@ -87,8 +89,12 @@ public class MessageRouterPublisherImpl implements MessageRouterPublisher { LOGGER.trace("The items to be sent: {}", batch); return httpClient.call(buildHttpRequest(request, createBody(batch, request.contentType()))) .map(httpResponse -> buildResponse(httpResponse, batch)) - .doOnError(ReadTimeoutException.class, e -> LOGGER.error("Timeout exception occurred when sending items to DMaaP MR", e)) - .onErrorResume(ReadTimeoutException.class, e -> createErrorResponse(ClientErrorReasons.TIMEOUT)); + .doOnError(ReadTimeoutException.class, + e -> LOGGER.error("Timeout exception occurred when sending items to DMaaP MR", e)) + .onErrorResume(ReadTimeoutException.class, e -> buildErrorResponse(ClientErrorReasons.TIMEOUT)) + .doOnError(ConnectException.class, e -> LOGGER.error("DMaaP MR is unavailable, {}", e.getMessage())) + .onErrorResume(ConnectException.class, e -> buildErrorResponse(ClientErrorReasons.SERVICE_UNAVAILABLE)) + .onErrorResume(RetryableException.class, e -> Mono.just(buildResponse(e.getResponse(), batch))); } private @NotNull RequestBody createBody(List<? extends JsonElement> subItems, ContentType contentType) { @@ -126,7 +132,7 @@ public class MessageRouterPublisherImpl implements MessageRouterPublisher { : builder.failReason(extractFailReason(httpResponse)).build(); } - private Mono<MessageRouterPublishResponse> createErrorResponse(ClientErrorReason clientErrorReason) { + private Mono<MessageRouterPublishResponse> buildErrorResponse(ClientErrorReason clientErrorReason) { String failReason = clientErrorReasonPresenter.present(clientErrorReason); return Mono.just(ImmutableMessageRouterPublishResponse.builder() .failReason(failReason) diff --git a/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/impl/MessageRouterSubscriberImpl.java b/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/impl/MessageRouterSubscriberImpl.java index 292a7157..acb297ab 100644 --- a/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/impl/MessageRouterSubscriberImpl.java +++ b/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/impl/MessageRouterSubscriberImpl.java @@ -33,6 +33,7 @@ import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.HttpRequest; import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.HttpResponse; import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.ImmutableHttpRequest; import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.RxHttpClient; +import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.exceptions.RetryableException; import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.MessageRouterSubscriber; import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.error.ClientErrorReason; import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.error.ClientErrorReasonPresenter; @@ -44,6 +45,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import reactor.core.publisher.Mono; +import java.net.ConnectException; import java.nio.charset.StandardCharsets; import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.impl.Commons.extractFailReason; @@ -72,10 +74,12 @@ public class MessageRouterSubscriberImpl implements MessageRouterSubscriber { .map(this::buildGetResponse) .doOnError(ReadTimeoutException.class, e -> LOGGER.error("Timeout exception occurred when subscribe items from DMaaP MR", e)) - .onErrorResume(ReadTimeoutException.class, e -> createErrorResponse(ClientErrorReasons.TIMEOUT)); + .onErrorResume(ReadTimeoutException.class, e -> buildErrorResponse(ClientErrorReasons.TIMEOUT)) + .doOnError(ConnectException.class, e -> LOGGER.error("DMaaP MR is unavailable, {}", e.getMessage())) + .onErrorResume(ConnectException.class, e -> buildErrorResponse(ClientErrorReasons.SERVICE_UNAVAILABLE)) + .onErrorResume(RetryableException.class, e -> Mono.just(buildGetResponse(e.getResponse()))); } - private @NotNull HttpRequest buildGetHttpRequest(MessageRouterSubscribeRequest request) { ImmutableHttpRequest.Builder requestBuilder = ImmutableHttpRequest.builder() .method(HttpMethod.GET) @@ -107,7 +111,7 @@ public class MessageRouterSubscriberImpl implements MessageRouterSubscriber { request.consumerId()); } - private Mono<MessageRouterSubscribeResponse> createErrorResponse(ClientErrorReason clientErrorReason) { + private Mono<MessageRouterSubscribeResponse> buildErrorResponse(ClientErrorReason clientErrorReason) { String failReason = clientErrorReasonPresenter.present(clientErrorReason); return Mono.just(ImmutableMessageRouterSubscribeResponse.builder() .failReason(failReason) diff --git a/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/model/config/DmaapRetryConfig.java b/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/model/config/DmaapRetryConfig.java index f82edfc9..bb662206 100644 --- a/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/model/config/DmaapRetryConfig.java +++ b/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/model/config/DmaapRetryConfig.java @@ -31,7 +31,6 @@ import java.net.ConnectException; public interface DmaapRetryConfig { Set<Class<? extends Throwable>> RETRYABLE_EXCEPTIONS = HashSet.of(ReadTimeoutException.class, ConnectException.class); - RuntimeException ON_RETRY_EXHAUSTED_EXCEPTION = ReadTimeoutException.INSTANCE; Set<Integer> RETRYABLE_HTTP_CODES = HashSet.of(404, 408, 413, 429, 500, 502, 503, 504); @Value.Default diff --git a/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterPublisherIT.java b/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterPublisherIT.java index 159fc598..825ea395 100644 --- a/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterPublisherIT.java +++ b/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterPublisherIT.java @@ -438,6 +438,41 @@ class MessageRouterPublisherIT { MOCK_SERVER_CLIENT.verify(request().withPath(path), VerificationTimes.exactly(5)); } + @Test + void publisher_shouldHandleLastRetryError500() { + final String topic = "TOPIC14"; + final String topicUrl = String.format("%s/%s", PROXY_MOCK_EVENTS_PATH, topic); + + final List<String> twoJsonMessages = List.of("{\"message\":\"message1\"}", + "{\"differentMessage\":\"message2\"}"); + final Flux<JsonElement> plainBatch = plainBatch(twoJsonMessages); + + final MessageRouterPublishRequest publishRequest = createPublishRequest(topicUrl, Duration.ofSeconds(1)); + final String responseMessage = "Response Message"; + final MessageRouterPublishResponse expectedResponse = errorPublishResponse( + "500 Internal Server Error\n%s", responseMessage); + + final String path = String.format("/events/%s", topic); + MOCK_SERVER_CLIENT + .when(request().withPath(path), Times.once()) + .respond(response().withStatusCode(404)); + MOCK_SERVER_CLIENT + .when(request().withPath(path), Times.once()) + .respond(response().withStatusCode(500).withBody(responseMessage)); + final MessageRouterPublisher publisher = DmaapClientFactory.createMessageRouterPublisher(retryConfig(1, 1)); + + //when + final Flux<MessageRouterPublishResponse> result = publisher.put(publishRequest, plainBatch); + + //then + StepVerifier.create(result) + .expectNext(expectedResponse) + .expectComplete() + .verify(); + + MOCK_SERVER_CLIENT.verify(request().withPath(path), VerificationTimes.exactly(2)); + } + private MessageRouterPublisherConfig retryConfig(int retryInterval, int retryCount) { return ImmutableMessageRouterPublisherConfig.builder() .retryConfig(ImmutableDmaapRetryConfig.builder() diff --git a/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterPublisherTest.java b/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterPublisherTest.java index 82b6661c..82a2b008 100644 --- a/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterPublisherTest.java +++ b/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterPublisherTest.java @@ -23,7 +23,6 @@ package org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api; import com.google.gson.JsonElement; import com.google.gson.JsonPrimitive; import io.vavr.collection.List; -import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.CsvSource; @@ -57,6 +56,7 @@ class MessageRouterPublisherTest { private static final String ERROR_MESSAGE = "Something went wrong"; private static final String TIMEOUT_ERROR_MESSAGE = "408 Request Timeout"; + private static final String CONNECTION_ERROR_MESSAGE = "503 Service unavailable"; private static final String SUCCESS_RESP_TOPIC_PATH = "/events/TOPIC"; private static final String DELAY_RESP_TOPIC_PATH = "/events/DELAY"; private static final String FAILING_WITH_400_RESP_PATH = "/events/TOPIC400"; @@ -68,15 +68,13 @@ class MessageRouterPublisherTest { private static final Flux<JsonPrimitive> messageBatch = Flux.just("ala", "ma", "kota") .map(JsonPrimitive::new); private static final List<String> messageBatchItems = List.of("ala", "ma", "kota"); - - private static DummyHttpServer server; + private static final DummyHttpServer DISPOSED_HTTP_SERVER = initialize().closeAndGet(); + private static final DummyHttpServer SERVER = initialize(); private MessageRouterPublisher sut = DmaapClientFactory .createMessageRouterPublisher(MessageRouterPublisherConfig.createDefault()); - - @BeforeAll - static void setUp() { - server = DummyHttpServer.start(routes -> routes + private static DummyHttpServer initialize() { + return DummyHttpServer.start(routes -> routes .post(SUCCESS_RESP_TOPIC_PATH, (req, resp) -> sendString(resp, Mono.just("OK"))) .post(DELAY_RESP_TOPIC_PATH, (req, resp) -> sendWithDelay(resp, 200, TIMEOUT)) .post(FAILING_WITH_400_RESP_PATH, (req, resp) -> sendError(resp, 400, ERROR_MESSAGE)) @@ -90,7 +88,7 @@ class MessageRouterPublisherTest { @Test void test_put_givenMessageBatch_shouldMakeSuccessfulPostRequestReturningBatch() { //given - final MessageRouterPublishRequest mrRequest = createTextPlainMRRequest(SUCCESS_RESP_TOPIC_PATH); + final MessageRouterPublishRequest mrRequest = createTextPlainMRRequest(SUCCESS_RESP_TOPIC_PATH, SERVER); final List<JsonElement> expectedItems = messageBatchItems.map(JsonPrimitive::new); //when @@ -113,7 +111,7 @@ class MessageRouterPublisherTest { }) void publisher_shouldHandleError(String failingPath, String failReason) { //given - final MessageRouterPublishRequest mrRequest = createTextPlainMRRequest(failingPath); + final MessageRouterPublishRequest mrRequest = createTextPlainMRRequest(failingPath, SERVER); final MessageRouterPublishResponse expectedResponse = createErrorResponse(failReason); //when @@ -142,8 +140,24 @@ class MessageRouterPublisherTest { .verify(TIMEOUT); } - private static MessageRouterPublishRequest createTextPlainMRRequest(String topicPath) { - final MessageRouterSink sinkDefinition = createMRSink(topicPath); + @Test + void publisher_shouldHandleConnectionError() { + //given + final MessageRouterPublishRequest mrRequest = createTextPlainMRRequest( + SUCCESS_RESP_TOPIC_PATH, DISPOSED_HTTP_SERVER); + + //when + final Flux<MessageRouterPublishResponse> result = sut.put(mrRequest, messageBatch); + + //then + StepVerifier.create(result) + .consumeNextWith(this::assertConnectionError) + .expectComplete() + .verify(TIMEOUT); + } + + private static MessageRouterPublishRequest createTextPlainMRRequest(String topicPath, DummyHttpServer dummyHttpServer) { + final MessageRouterSink sinkDefinition = createMRSink(topicPath, dummyHttpServer); return ImmutableMessageRouterPublishRequest.builder() .sinkDefinition(sinkDefinition) .contentType(ContentType.TEXT_PLAIN) @@ -151,7 +165,7 @@ class MessageRouterPublisherTest { } private static MessageRouterPublishRequest createTextPlainMRRequest(String topicPath, Duration timeout) { - final MessageRouterSink sinkDefinition = createMRSink(topicPath); + final MessageRouterSink sinkDefinition = createMRSink(topicPath, SERVER); return ImmutableMessageRouterPublishRequest.builder() .sinkDefinition(sinkDefinition) .contentType(ContentType.TEXT_PLAIN) @@ -159,12 +173,12 @@ class MessageRouterPublisherTest { .build(); } - private static MessageRouterSink createMRSink(String topicPath) { + private static MessageRouterSink createMRSink(String topicPath, DummyHttpServer dummyHttpServer) { return ImmutableMessageRouterSink.builder() .name("the topic") .topicUrl(String.format("http://%s:%d%s", - server.host(), - server.port(), + dummyHttpServer.host(), + dummyHttpServer.port(), topicPath) ) .build(); @@ -182,5 +196,10 @@ class MessageRouterPublisherTest { assertThat(response.failed()).isTrue(); assertThat(response.failReason()).startsWith(TIMEOUT_ERROR_MESSAGE); } + + private void assertConnectionError(DmaapResponse response) { + assertThat(response.failed()).isTrue(); + assertThat(response.failReason()).startsWith(CONNECTION_ERROR_MESSAGE); + } } diff --git a/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterSubscriberIT.java b/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterSubscriberIT.java index 15c3bd8e..2cc6d339 100644 --- a/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterSubscriberIT.java +++ b/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterSubscriberIT.java @@ -23,6 +23,7 @@ package org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api; import com.google.gson.JsonElement; import com.google.gson.JsonObject; import io.vavr.collection.List; +import io.vavr.control.Try; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -382,6 +383,39 @@ class MessageRouterSubscriberIT { MOCK_SERVER_CLIENT.verify(request().withPath(path), VerificationTimes.exactly(5)); } + @Test + void subscriber_shouldHandleLastRetryError500() { + //given + final String topic = "TOPIC9"; + final String proxyTopicUrl = String.format("%s/%s", PROXY_MOCK_EVENTS_PATH, topic); + final MessageRouterSubscribeRequest subscribeRequest = createMRSubscribeRequest( + proxyTopicUrl, CONSUMER_GROUP, CONSUMER_ID); + final String responseMessage = "Response Message"; + final MessageRouterSubscribeResponse expectedResponse = errorSubscribeResponse( + "500 Internal Server Error\n%s", responseMessage); + + final String path = String.format("/events/%s/%s/%s", topic, CONSUMER_GROUP, CONSUMER_ID); + MOCK_SERVER_CLIENT + .when(request().withPath(path), Times.once()) + .respond(response().withStatusCode(404)); + MOCK_SERVER_CLIENT + .when(request().withPath(path), Times.once()) + .respond(response().withStatusCode(500).withBody(responseMessage)); + final MessageRouterSubscriber subscriber = DmaapClientFactory.createMessageRouterSubscriber( + retryConfig(1, 1)); + + //when + Mono<MessageRouterSubscribeResponse> response = subscriber.get(subscribeRequest); + + //then + StepVerifier.create(response) + .expectNext(expectedResponse) + .expectComplete() + .verify(); + + MOCK_SERVER_CLIENT.verify(request().withPath(path), VerificationTimes.exactly(2)); + } + private MessageRouterSubscriberConfig retryConfig(int retryInterval, int retryCount) { return ImmutableMessageRouterSubscriberConfig.builder() .retryConfig(ImmutableDmaapRetryConfig.builder() diff --git a/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterSubscriberTest.java b/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterSubscriberTest.java index 06875394..e928f03c 100644 --- a/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterSubscriberTest.java +++ b/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterSubscriberTest.java @@ -23,7 +23,6 @@ package org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api; import com.google.gson.JsonElement; import com.google.gson.JsonPrimitive; import io.vavr.collection.List; -import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.CsvSource; @@ -55,6 +54,7 @@ import static org.onap.dcaegen2.services.sdk.rest.services.adapters.http.test.Du class MessageRouterSubscriberTest { private static final Duration TIMEOUT = Duration.ofSeconds(10); private static final String ERROR_MESSAGE = "Something went wrong"; + private static final String CONNECTION_ERROR_MESSAGE = "503 Service unavailable"; private static final String TIMEOUT_ERROR_MESSAGE = "408 Request Timeout"; private static final String CONSUMER_GROUP = "group1"; private static final String SUCCESS_CONSUMER_ID = "consumer200"; @@ -82,16 +82,18 @@ class MessageRouterSubscriberTest { private static final String FAILING_WITH_500_RESP_PATH = String .format("%s/%s", CONSUMER_PATH, FAILING_WITH_500_CONSUMER_ID); - private static MessageRouterSubscribeRequest mrSuccessRequest; - private static MessageRouterSubscribeRequest mrFailingRequest; + private static final DummyHttpServer DISPOSED_HTTP_SERVER = initialize().closeAndGet(); + private static final DummyHttpServer SERVER = initialize(); + private MessageRouterSubscriber sut = DmaapClientFactory .createMessageRouterSubscriber(MessageRouterSubscriberConfig.createDefault()); - private static MessageRouterSource sourceDefinition; - + private static MessageRouterSource sourceDefinition = createMessageRouterSource(SERVER); + private static MessageRouterSource failingSourceDefinition = createMessageRouterSource(DISPOSED_HTTP_SERVER); + private static MessageRouterSubscribeRequest mrSuccessRequest = createSuccessRequest(sourceDefinition); + private static MessageRouterSubscribeRequest mrFailingRequest = createFailingRequest(FAILING_WITH_500_CONSUMER_ID); - @BeforeAll - static void setUp() { - DummyHttpServer server = DummyHttpServer.start(routes -> routes + private static DummyHttpServer initialize() { + return DummyHttpServer.start(routes -> routes .get(SUCCESS_RESP_PATH, (req, resp) -> sendResource(resp, "/sample-mr-subscribe-response.json")) .get(DELAY_RESP_PATH, (req, resp) -> sendWithDelay(resp, 200, TIMEOUT)) @@ -100,12 +102,6 @@ class MessageRouterSubscriberTest { .get(FAILING_WITH_409_RESP_PATH, (req, resp) -> sendError(resp, 409, ERROR_MESSAGE)) .get(FAILING_WITH_429_RESP_PATH, (req, resp) -> sendError(resp, 429, ERROR_MESSAGE)) .get(FAILING_WITH_500_RESP_PATH, (req, resp) -> sendError(resp, 500, ERROR_MESSAGE))); - - sourceDefinition = createMessageRouterSource(server); - - mrSuccessRequest = createSuccessRequest(); - - mrFailingRequest = createFailingRequest(FAILING_WITH_500_CONSUMER_ID); } @Test @@ -204,6 +200,17 @@ class MessageRouterSubscriberTest { .verify(TIMEOUT); } + @Test + void subscriber_shouldHandleConnectionError() { + MessageRouterSubscribeRequest request = createSuccessRequest(failingSourceDefinition); + Mono<MessageRouterSubscribeResponse> response = sut.get(request); + + StepVerifier.create(response) + .consumeNextWith(this::assertConnectionError) + .expectComplete() + .verify(TIMEOUT); + } + private static MessageRouterSource createMessageRouterSource(DummyHttpServer server) { return ImmutableMessageRouterSource.builder() .name("the topic") @@ -211,9 +218,9 @@ class MessageRouterSubscriberTest { .build(); } - private static MessageRouterSubscribeRequest createSuccessRequest() { + private static MessageRouterSubscribeRequest createSuccessRequest(MessageRouterSource source) { return ImmutableMessageRouterSubscribeRequest.builder() - .sourceDefinition(sourceDefinition) + .sourceDefinition(source) .consumerGroup(CONSUMER_GROUP) .consumerId(SUCCESS_CONSUMER_ID) .build(); @@ -249,5 +256,10 @@ class MessageRouterSubscriberTest { assertThat(response.failed()).isTrue(); assertThat(response.failReason()).startsWith(TIMEOUT_ERROR_MESSAGE); } + + private void assertConnectionError(DmaapResponse response) { + assertThat(response.failed()).isTrue(); + assertThat(response.failReason()).startsWith(CONNECTION_ERROR_MESSAGE); + } } diff --git a/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/impl/MessageRouterPublisherImplTest.java b/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/impl/MessageRouterPublisherImplTest.java index 2825a87c..2fde441d 100644 --- a/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/impl/MessageRouterPublisherImplTest.java +++ b/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/impl/MessageRouterPublisherImplTest.java @@ -46,6 +46,7 @@ import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.test.StepVerifier; +import java.net.ConnectException; import java.nio.charset.StandardCharsets; import java.time.Duration; @@ -79,6 +80,7 @@ class MessageRouterPublisherImplTest { private final MessageRouterPublishRequest plainPublishRequest = createPublishRequest(TOPIC_URL, ContentType.TEXT_PLAIN); private final MessageRouterPublishRequest jsonPublishRequest = createPublishRequest(TOPIC_URL); private final HttpResponse successHttpResponse = createHttpResponse("OK", 200); + private final HttpResponse retryableHttpResponse = createHttpResponse("ERROR", 500); @Test void puttingElementsShouldYieldNonChunkedHttpRequest() { @@ -431,7 +433,49 @@ class MessageRouterPublisherImplTest { // then StepVerifier.create(responses) - .consumeNextWith(this::assertTimeoutError) + .consumeNextWith(this::assertFailedResponse) + .expectComplete() + .verify(TIMEOUT); + } + + @Test + void onPut_whenConnectionExceptionOccurs_shouldReturnOneConnectionException() { + // given + final List<String> plainMessage = List.of("I", "like", "cookies"); + + final Flux<JsonElement> plainMessagesMaxBatch = plainBatch(plainMessage); + given(clientErrorReasonPresenter.present(any())) + .willReturn(ERROR_MESSAGE); + given(httpClient.call(any(HttpRequest.class))) + .willReturn(Mono.error(new ConnectException())); + + // when + final Flux<MessageRouterPublishResponse> responses = cut + .put(plainPublishRequest, plainMessagesMaxBatch); + + // then + StepVerifier.create(responses) + .consumeNextWith(this::assertFailedResponse) + .expectComplete() + .verify(TIMEOUT); + } + + @Test + void onPut_whenRetryableExceptionOccurs_shouldReturnCertainFailedResponse() { + // given + final List<String> plainMessage = List.of("I", "like", "cookies"); + + final Flux<JsonElement> plainMessagesMaxBatch = plainBatch(plainMessage); + given(httpClient.call(any(HttpRequest.class))) + .willReturn(Mono.just(retryableHttpResponse)); + + // when + final Flux<MessageRouterPublishResponse> responses = cut + .put(plainPublishRequest, plainMessagesMaxBatch); + + // then + StepVerifier.create(responses) + .consumeNextWith(this::assertRetryableFailedResponse) .expectComplete() .verify(TIMEOUT); } @@ -458,7 +502,7 @@ class MessageRouterPublisherImplTest { // then StepVerifier.create(responses) .consumeNextWith(response -> verifySuccessfulResponses(parsedThreeMessages, response)) - .consumeNextWith(this::assertTimeoutError) + .consumeNextWith(this::assertFailedResponse) .expectComplete() .verify(TIMEOUT); } @@ -484,7 +528,7 @@ class MessageRouterPublisherImplTest { // then StepVerifier.create(responses) - .consumeNextWith(this::assertTimeoutError) + .consumeNextWith(this::assertFailedResponse) .consumeNextWith(response -> verifySuccessfulResponses(parsedTwoMessages, response)) .expectComplete() .verify(TIMEOUT); @@ -549,12 +593,18 @@ class MessageRouterPublisherImplTest { } } - private void assertTimeoutError(MessageRouterPublishResponse response) { + private void assertFailedResponse(MessageRouterPublishResponse response) { assertThat(response.failed()).isTrue(); assertThat(response.items()).isEmpty(); assertThat(response.failReason()).isEqualTo(ERROR_MESSAGE); } + private void assertRetryableFailedResponse(MessageRouterPublishResponse response) { + assertThat(response.failed()).isTrue(); + assertThat(response.items()).isEmpty(); + assertThat(response.failReason()).startsWith("500 ERROR"); + } + private void verifySingleResponse(List<? extends JsonElement> threeMessages, Flux<MessageRouterPublishResponse> responses) { StepVerifier.create(responses) diff --git a/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/impl/MessageRouterSubscriberImplTest.java b/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/impl/MessageRouterSubscriberImplTest.java index 0396eff9..74b21ad6 100644 --- a/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/impl/MessageRouterSubscriberImplTest.java +++ b/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/impl/MessageRouterSubscriberImplTest.java @@ -25,6 +25,7 @@ import static org.assertj.core.api.Assertions.assertThatExceptionOfType; import static org.mockito.ArgumentMatchers.any; import static org.mockito.BDDMockito.given; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import com.google.gson.JsonSyntaxException; @@ -42,6 +43,8 @@ import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRo import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.MessageRouterSubscriberConfig; import reactor.core.publisher.Mono; +import java.net.ConnectException; + /** * @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a> * @since May 2019 @@ -70,6 +73,12 @@ class MessageRouterSubscriberImplTest { .url(sourceDefinition.topicUrl()) .rawBody("[]".getBytes()) .build(); + private final HttpResponse retryableHttpResponse = ImmutableHttpResponse.builder() + .statusCode(500) + .statusReason("Something braked") + .url(sourceDefinition.topicUrl()) + .rawBody("[]".getBytes()) + .build(); private final HttpResponse httpResponseWithWrongStatusCode = ImmutableHttpResponse.builder() .statusCode(301) .statusReason("Something braked") @@ -154,6 +163,53 @@ class MessageRouterSubscriberImplTest { assertThat(response.failReason()).isEqualTo(ERROR_MESSAGE); assertThat(response.hasElements()).isFalse(); + verify(clientErrorReasonPresenter, times(1)).present(any()); + verify(httpClient).call(httpRequestArgumentCaptor.capture()); + final HttpRequest httpRequest = httpRequestArgumentCaptor.getValue(); + assertThat(httpRequest.method()).isEqualTo(HttpMethod.GET); + assertThat(httpRequest.url()).isEqualTo(String.format("%s/%s/%s", sourceDefinition.topicUrl(), + mrRequest.consumerGroup(), mrRequest.consumerId())); + assertThat(httpRequest.body()).isNull(); + } + + @Test + void getWithProperRequest_shouldReturnConnectionException() { + given(clientErrorReasonPresenter.present(any())) + .willReturn(ERROR_MESSAGE); + given(httpClient.call(any(HttpRequest.class))) + .willReturn(Mono.error(new ConnectException())); + + // when + final Mono<MessageRouterSubscribeResponse> responses = cut + .get(mrRequest); + final MessageRouterSubscribeResponse response = responses.block(); + + assertThat(response.failed()).isTrue(); + assertThat(response.failReason()).isEqualTo(ERROR_MESSAGE); + assertThat(response.hasElements()).isFalse(); + + verify(clientErrorReasonPresenter, times(1)).present(any()); + verify(httpClient).call(httpRequestArgumentCaptor.capture()); + final HttpRequest httpRequest = httpRequestArgumentCaptor.getValue(); + assertThat(httpRequest.method()).isEqualTo(HttpMethod.GET); + assertThat(httpRequest.url()).isEqualTo(String.format("%s/%s/%s", sourceDefinition.topicUrl(), + mrRequest.consumerGroup(), mrRequest.consumerId())); + assertThat(httpRequest.body()).isNull(); + } + + @Test + void getWithProperRequest_shouldReturnCertainFailedResponse() { + given(httpClient.call(any(HttpRequest.class))) + .willReturn(Mono.just(retryableHttpResponse)); + + // when + final Mono<MessageRouterSubscribeResponse> responses = cut + .get(mrRequest); + final MessageRouterSubscribeResponse response = responses.block(); + + assertThat(response.failed()).isTrue(); + assertThat(response.failReason()).startsWith("500 Something braked"); + assertThat(response.hasElements()).isFalse(); verify(httpClient).call(httpRequestArgumentCaptor.capture()); final HttpRequest httpRequest = httpRequestArgumentCaptor.getValue(); diff --git a/rest-services/http-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/adapters/http/RxHttpClient.java b/rest-services/http-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/adapters/http/RxHttpClient.java index 76bde27e..d25d7469 100644 --- a/rest-services/http-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/adapters/http/RxHttpClient.java +++ b/rest-services/http-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/adapters/http/RxHttpClient.java @@ -25,6 +25,7 @@ import io.vavr.collection.HashSet; import io.vavr.collection.Stream; import io.vavr.control.Option; import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.config.RetryConfig; +import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.exceptions.RetryableException; import org.onap.dcaegen2.services.sdk.rest.services.model.logging.RequestDiagnosticContext; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -83,12 +84,17 @@ public class RxHttpClient { } private Mono<HttpResponse> mapResponse(String url, HttpResponseStatus status, reactor.netty.ByteBufMono content) { - if (shouldRetry(status.code())) { - return Mono.error(new RetryConfig.RetryableException()); - } return content.asByteArray() .defaultIfEmpty(new byte[0]) - .map(bytes -> new NettyHttpResponse(url, status, bytes)); + .map(bytes -> new NettyHttpResponse(url, status, bytes)) + .map(this::validatedResponse); + } + + private HttpResponse validatedResponse(HttpResponse response) { + if (shouldRetry(response.statusCode())) { + throw new RetryableException(response); + } + return response; } private boolean shouldRetry(int code) { @@ -149,15 +155,12 @@ public class RxHttpClient { } private RetryBackoffSpec retryConfig(RetryConfig retryConfig, RequestDiagnosticContext context) { - RetryBackoffSpec retry = Retry + return Retry .fixedDelay(retryConfig.retryCount(), retryConfig.retryInterval()) .doBeforeRetry(retrySignal -> context.withSlf4jMdc( LOGGER.isTraceEnabled(), () -> LOGGER.trace("Retry: {}", retrySignal))) - .filter(ex -> isRetryable(retryConfig, ex)); - - return Option.of(retryConfig.onRetryExhaustedException()) - .map(ex -> retry.onRetryExhaustedThrow((retryBackoffSpec, retrySignal) -> ex)) - .getOrElse(retry); + .filter(ex -> isRetryable(retryConfig, ex)) + .onRetryExhaustedThrow((retryBackoffSpec, retrySignal) -> retrySignal.failure()); } private boolean isRetryable(RetryConfig retryConfig, Throwable ex) { diff --git a/rest-services/http-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/adapters/http/config/RetryConfig.java b/rest-services/http-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/adapters/http/config/RetryConfig.java index a0ae1991..e4584905 100644 --- a/rest-services/http-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/adapters/http/config/RetryConfig.java +++ b/rest-services/http-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/adapters/http/config/RetryConfig.java @@ -23,7 +23,7 @@ package org.onap.dcaegen2.services.sdk.rest.services.adapters.http.config; import io.vavr.collection.HashSet; import io.vavr.collection.Set; import org.immutables.value.Value; -import org.jetbrains.annotations.Nullable; +import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.exceptions.RetryableException; import java.time.Duration; @@ -52,8 +52,4 @@ public interface RetryConfig { } return result; } - - @Nullable RuntimeException onRetryExhaustedException(); - - class RetryableException extends RuntimeException {} } diff --git a/rest-services/http-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/adapters/http/exceptions/RetryableException.java b/rest-services/http-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/adapters/http/exceptions/RetryableException.java new file mode 100644 index 00000000..aa48497a --- /dev/null +++ b/rest-services/http-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/adapters/http/exceptions/RetryableException.java @@ -0,0 +1,36 @@ +/* + * ============LICENSE_START==================================== + * DCAEGEN2-SERVICES-SDK + * ========================================================= + * Copyright (C) 2021 Nokia. All rights reserved. + * ========================================================= + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END===================================== + */ + +package org.onap.dcaegen2.services.sdk.rest.services.adapters.http.exceptions; + +import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.HttpResponse; + +public class RetryableException extends RuntimeException { + + private final HttpResponse response; + + public RetryableException(HttpResponse response) { + this.response = response; + } + + public HttpResponse getResponse() { + return response; + } +} diff --git a/rest-services/http-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/adapters/http/RxHttpClientIT.java b/rest-services/http-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/adapters/http/RxHttpClientIT.java index daf04c6e..8d076e02 100644 --- a/rest-services/http-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/adapters/http/RxHttpClientIT.java +++ b/rest-services/http-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/adapters/http/RxHttpClientIT.java @@ -29,6 +29,7 @@ import org.junit.jupiter.api.Test; import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.config.ImmutableRetryConfig; import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.config.ImmutableRxHttpClientConfig; import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.exceptions.HttpException; +import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.exceptions.RetryableException; import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.test.DummyHttpServer; import reactor.core.publisher.Mono; import reactor.test.StepVerifier; @@ -214,7 +215,7 @@ class RxHttpClientIT { } @Test - void getWithRetryExhaustedExceptionWhenClosedServer() throws Exception { + void getWithConnectExceptionWhenClosedServer() throws Exception { // given REQUEST_COUNTER = new AtomicInteger(); final HttpRequest httpRequest = requestForClosedServer("/sample-get") @@ -231,37 +232,13 @@ class RxHttpClientIT { // then StepVerifier.create(response) - .expectError(IllegalStateException.class) + .expectError(ConnectException.class) .verify(TIMEOUT); assertNoServerResponse(); } @Test - void getWithCustomRetryExhaustedExceptionWhenClosedServer() throws Exception { - // given - REQUEST_COUNTER = new AtomicInteger(); - final HttpRequest httpRequest = requestForClosedServer("/sample-get") - .method(HttpMethod.GET) - .build(); - final RxHttpClient cut = RxHttpClientFactory.create(ImmutableRxHttpClientConfig.builder() - .retryConfig(defaultRetryConfig() - .customRetryableExceptions(HashSet.of(ConnectException.class)) - .onRetryExhaustedException(ReadTimeoutException.INSTANCE) - .build()) - .build()); - - // when - final Mono<HttpResponse> response = cut.call(httpRequest); - - // then - StepVerifier.create(response) - .expectError(ReadTimeoutException.class) - .verify(TIMEOUT); - assertNoServerResponse(); - } - - @Test - void getWithRetryExhaustedExceptionWhen500() throws Exception { + void getWithRetryableExceptionWhen500() throws Exception { // given REQUEST_COUNTER = new AtomicInteger(); final HttpRequest httpRequest = requestFor("/retry-get-500") @@ -278,31 +255,7 @@ class RxHttpClientIT { // then StepVerifier.create(response) - .expectError(IllegalStateException.class) - .verify(TIMEOUT); - assertRetry(); - } - - @Test - void getWithCustomRetryExhaustedExceptionWhen500() throws Exception { - // given - REQUEST_COUNTER = new AtomicInteger(); - final HttpRequest httpRequest = requestFor("/retry-get-500") - .method(HttpMethod.GET) - .build(); - final RxHttpClient cut = RxHttpClientFactory.create(ImmutableRxHttpClientConfig.builder() - .retryConfig(defaultRetryConfig() - .onRetryExhaustedException(ReadTimeoutException.INSTANCE) - .retryableHttpResponseCodes(HashSet.of(500)) - .build()) - .build()); - - // when - final Mono<HttpResponse> response = cut.call(httpRequest); - - // then - StepVerifier.create(response) - .expectError(ReadTimeoutException.class) + .expectError(RetryableException.class) .verify(TIMEOUT); assertRetry(); } |