diff options
author | tkogut <tomasz.kogut@nokia.com> | 2021-01-19 09:00:56 +0100 |
---|---|---|
committer | tkogut <tomasz.kogut@nokia.com> | 2021-01-20 12:20:55 +0100 |
commit | 9b309b5e3905cb25d5d661c4428cc9d4ad0402a6 (patch) | |
tree | 58c9e881f694fde8347762b6c237de9423f33f23 /rest-services/http-client | |
parent | 286637d4a801ab6e933684500509eab308d2e3a6 (diff) |
Support retry in DCAE-SDK DMaaP-Client
Issue-ID: DCAEGEN2-1483
Signed-off-by: tkogut <tomasz.kogut@nokia.com>
Change-Id: Id3f98c0a9367f7c7c2c53ed3eba8805a5a6ab87e
Diffstat (limited to 'rest-services/http-client')
6 files changed, 458 insertions, 35 deletions
diff --git a/rest-services/http-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/adapters/http/RxHttpClient.java b/rest-services/http-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/adapters/http/RxHttpClient.java index 77b842d7..d0bdf414 100644 --- a/rest-services/http-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/adapters/http/RxHttpClient.java +++ b/rest-services/http-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/adapters/http/RxHttpClient.java @@ -2,7 +2,7 @@ * ============LICENSE_START==================================== * DCAEGEN2-SERVICES-SDK * ========================================================= - * Copyright (C) 2019-2020 Nokia. All rights reserved. + * Copyright (C) 2019-2021 Nokia. All rights reserved. * ========================================================= * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -17,18 +17,25 @@ * limitations under the License. * ============LICENSE_END===================================== */ + package org.onap.dcaegen2.services.sdk.rest.services.adapters.http; +import io.netty.handler.codec.http.HttpResponseStatus; +import io.vavr.collection.HashSet; import io.vavr.collection.Stream; import io.vavr.control.Option; +import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.config.RetryConfig; import org.onap.dcaegen2.services.sdk.rest.services.model.logging.RequestDiagnosticContext; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.netty.http.client.HttpClient; import reactor.netty.http.client.HttpClient.ResponseReceiver; import reactor.netty.http.client.HttpClientRequest; import reactor.netty.http.client.HttpClientResponse; +import reactor.util.retry.Retry; +import reactor.util.retry.RetryBackoffSpec; import java.util.stream.Collectors; @@ -39,17 +46,23 @@ public class RxHttpClient { private static final Logger LOGGER = LoggerFactory.getLogger(RxHttpClient.class); private final HttpClient httpClient; + private RetryConfig retryConfig; RxHttpClient(HttpClient httpClient) { this.httpClient = httpClient; } + RxHttpClient(HttpClient httpClient, RetryConfig retryConfig) { + this(httpClient); + this.retryConfig = retryConfig; + } + public Mono<HttpResponse> call(HttpRequest request) { - return prepareRequest(request) - .responseSingle((resp, content) -> - content.asByteArray() - .defaultIfEmpty(new byte[0]) - .map(bytes -> new NettyHttpResponse(request.url(), resp.status(), bytes))); + Mono<HttpResponse> httpResponseMono = response(request); + return Option.of(retryConfig) + .map(rc -> retryConfig(rc, request.diagnosticContext())) + .map(httpResponseMono::retryWhen) + .getOrElse(() -> httpResponseMono); } ResponseReceiver<?> prepareRequest(HttpRequest request) { @@ -65,6 +78,27 @@ public class RxHttpClient { return prepareBody(request, theClient); } + private Mono<HttpResponse> response(HttpRequest request) { + return prepareRequest(request) + .responseSingle((resp, content) -> mapResponse(request.url(), resp.status(), content)); + } + + private Mono<HttpResponse> mapResponse(String url, HttpResponseStatus status, reactor.netty.ByteBufMono content) { + if (shouldRetry(status.code())) { + return Mono.error(new RetryConfig.RetryableException()); + } + return content.asByteArray() + .defaultIfEmpty(new byte[0]) + .map(bytes -> new NettyHttpResponse(url, status, bytes)); + } + + private boolean shouldRetry(int code) { + return Option.of(retryConfig) + .map(RetryConfig::retryableHttpResponseCodes) + .getOrElse(HashSet::empty) + .contains(code); + } + private ResponseReceiver<?> prepareBody(HttpRequest request, HttpClient theClient) { if (request.body() == null) { return prepareBodyWithoutContents(request, theClient); @@ -79,7 +113,7 @@ public class RxHttpClient { return theClient .headers(hdrs -> hdrs.set(HttpHeaders.TRANSFER_ENCODING_TYPE, HttpHeaders.CHUNKED)) .request(request.method().asNetty()) - .send(request.body().contents()) + .send(Flux.from(request.body().contents())) .uri(request.url()); } @@ -87,7 +121,7 @@ public class RxHttpClient { return theClient .headers(hdrs -> hdrs.set(HttpHeaders.CONTENT_LENGTH, request.body().length().toString())) .request(request.method().asNetty()) - .send(request.body().contents()) + .send(Flux.from(request.body().contents())) .uri(request.url()); } @@ -114,4 +148,22 @@ public class RxHttpClient { context.withSlf4jMdc(LOGGER.isDebugEnabled(), () -> LOGGER.debug("Response status: {}", httpClientResponse.status())); } + + private RetryBackoffSpec retryConfig(RetryConfig retryConfig, RequestDiagnosticContext context) { + RetryBackoffSpec retry = Retry + .fixedDelay(retryConfig.retryCount(), retryConfig.retryInterval()) + .doBeforeRetry(retrySignal -> context.withSlf4jMdc( + LOGGER.isTraceEnabled(), () -> LOGGER.trace("Retry: {}", retrySignal))) + .filter(ex -> isRetryable(retryConfig, ex)); + + return Option.of(retryConfig.onRetryExhaustedException()) + .map(ex -> retry.onRetryExhaustedThrow((retryBackoffSpec, retrySignal) -> ex)) + .getOrElse(retry); + } + + private boolean isRetryable(RetryConfig retryConfig, Throwable ex) { + return retryConfig.retryableExceptions() + .toStream() + .exists(clazz -> clazz.isAssignableFrom(ex.getClass())); + } } diff --git a/rest-services/http-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/adapters/http/RxHttpClientFactory.java b/rest-services/http-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/adapters/http/RxHttpClientFactory.java index 9b23f1d9..118df52b 100644 --- a/rest-services/http-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/adapters/http/RxHttpClientFactory.java +++ b/rest-services/http-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/adapters/http/RxHttpClientFactory.java @@ -2,7 +2,7 @@ * ============LICENSE_START==================================== * DCAEGEN2-SERVICES-SDK * ========================================================= - * Copyright (C) 2019 Nokia. All rights reserved. + * Copyright (C) 2019-2021 Nokia. All rights reserved. * ========================================================= * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -21,7 +21,9 @@ package org.onap.dcaegen2.services.sdk.rest.services.adapters.http; import io.netty.handler.ssl.SslContext; +import io.vavr.control.Option; import org.jetbrains.annotations.NotNull; +import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.config.RxHttpClientConfig; import org.onap.dcaegen2.services.sdk.security.ssl.SecurityKeys; import org.onap.dcaegen2.services.sdk.security.ssl.SslFactory; import org.onap.dcaegen2.services.sdk.security.ssl.TrustStoreKeys; @@ -42,23 +44,53 @@ public final class RxHttpClientFactory { return new RxHttpClient(HttpClient.create()); } + public static RxHttpClient create(RxHttpClientConfig config) { + return createWithConfig(HttpClient.create(), config); + } public static RxHttpClient create(SecurityKeys securityKeys) { final SslContext context = SSL_FACTORY.createSecureClientContext(securityKeys); return create(context); } + public static RxHttpClient create(SecurityKeys securityKeys, RxHttpClientConfig config) { + final SslContext context = SSL_FACTORY.createSecureClientContext(securityKeys); + return create(context, config); + } + public static RxHttpClient create(TrustStoreKeys trustStoreKeys) { final SslContext context = SSL_FACTORY.createSecureClientContext(trustStoreKeys); return create(context); } + public static RxHttpClient create(TrustStoreKeys trustStoreKeys, RxHttpClientConfig config) { + final SslContext context = SSL_FACTORY.createSecureClientContext(trustStoreKeys); + return create(context, config); + } + public static RxHttpClient createInsecure() { final SslContext context = SSL_FACTORY.createInsecureClientContext(); return create(context); } + public static RxHttpClient createInsecure(RxHttpClientConfig config) { + final SslContext context = SSL_FACTORY.createInsecureClientContext(); + return create(context, config); + } + private static RxHttpClient create(@NotNull SslContext sslContext) { - return new RxHttpClient(HttpClient.create().secure(sslContextSpec -> sslContextSpec.sslContext(sslContext))); + HttpClient secure = HttpClient.create().secure(sslContextSpec -> sslContextSpec.sslContext(sslContext)); + return new RxHttpClient(secure); + } + + private static RxHttpClient create(@NotNull SslContext sslContext, RxHttpClientConfig config) { + HttpClient secure = HttpClient.create().secure(sslContextSpec -> sslContextSpec.sslContext(sslContext)); + return createWithConfig(secure, config); + } + + private static RxHttpClient createWithConfig(HttpClient httpClient, RxHttpClientConfig config) { + return Option.of(config.retryConfig()) + .map(retryConfig -> new RxHttpClient(httpClient, retryConfig)) + .getOrElse(() -> new RxHttpClient(httpClient)); } } diff --git a/rest-services/http-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/adapters/http/config/RetryConfig.java b/rest-services/http-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/adapters/http/config/RetryConfig.java new file mode 100644 index 00000000..a0ae1991 --- /dev/null +++ b/rest-services/http-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/adapters/http/config/RetryConfig.java @@ -0,0 +1,59 @@ +/* + * ============LICENSE_START==================================== + * DCAEGEN2-SERVICES-SDK + * ========================================================= + * Copyright (C) 2021 Nokia. All rights reserved. + * ========================================================= + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END===================================== + */ + +package org.onap.dcaegen2.services.sdk.rest.services.adapters.http.config; + +import io.vavr.collection.HashSet; +import io.vavr.collection.Set; +import org.immutables.value.Value; +import org.jetbrains.annotations.Nullable; + +import java.time.Duration; + +@Value.Immutable +public interface RetryConfig { + + int retryCount(); + + Duration retryInterval(); + + @Value.Default + default Set<Integer> retryableHttpResponseCodes() { + return HashSet.empty(); + } + + @Value.Default + default Set<Class<? extends Throwable>> customRetryableExceptions() { + return HashSet.empty(); + } + + @Value.Derived + default Set<Class<? extends Throwable>> retryableExceptions() { + Set<Class<? extends Throwable>> result = customRetryableExceptions(); + if (retryableHttpResponseCodes().nonEmpty()) { + result = result.add(RetryableException.class); + } + return result; + } + + @Nullable RuntimeException onRetryExhaustedException(); + + class RetryableException extends RuntimeException {} +} diff --git a/rest-services/http-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/adapters/http/config/RxHttpClientConfig.java b/rest-services/http-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/adapters/http/config/RxHttpClientConfig.java new file mode 100644 index 00000000..78a88a47 --- /dev/null +++ b/rest-services/http-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/adapters/http/config/RxHttpClientConfig.java @@ -0,0 +1,29 @@ +/* + * ============LICENSE_START==================================== + * DCAEGEN2-SERVICES-SDK + * ========================================================= + * Copyright (C) 2021 Nokia. All rights reserved. + * ========================================================= + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END===================================== + */ + +package org.onap.dcaegen2.services.sdk.rest.services.adapters.http.config; + +import org.immutables.value.Value; +import org.jetbrains.annotations.Nullable; + +@Value.Immutable +public interface RxHttpClientConfig { + @Nullable RetryConfig retryConfig(); +} diff --git a/rest-services/http-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/adapters/http/test/DummyHttpServer.java b/rest-services/http-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/adapters/http/test/DummyHttpServer.java index 4795b00f..8ac0d1d5 100644 --- a/rest-services/http-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/adapters/http/test/DummyHttpServer.java +++ b/rest-services/http-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/adapters/http/test/DummyHttpServer.java @@ -2,7 +2,7 @@ * ============LICENSE_START==================================== * DCAEGEN2-SERVICES-SDK * ========================================================= - * Copyright (C) 2019 Nokia. All rights reserved. + * Copyright (C) 2019-2021 Nokia. All rights reserved. * ========================================================= * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -21,11 +21,8 @@ package org.onap.dcaegen2.services.sdk.rest.services.adapters.http.test; import io.vavr.CheckedFunction0; -import java.net.URL; -import java.nio.file.Files; -import java.nio.file.Paths; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.function.Consumer; +import io.vavr.Tuple3; +import io.vavr.control.Try; import org.reactivestreams.Publisher; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -35,6 +32,13 @@ import reactor.netty.http.server.HttpServer; import reactor.netty.http.server.HttpServerResponse; import reactor.netty.http.server.HttpServerRoutes; +import java.net.URL; +import java.nio.file.Files; +import java.nio.file.Paths; +import java.time.Duration; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Consumer; + /** * @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a> * @since February 2019 @@ -63,11 +67,26 @@ public class DummyHttpServer { return responses[state.getAndIncrement()]; } + public static Publisher<Void> sendInOrderWithDelay(AtomicInteger counter, Tuple3<HttpServerResponse, Integer, Duration>... responses) { + Tuple3<HttpServerResponse, Integer, Duration> tuple = responses[counter.get()]; + HttpServerResponse httpServerResponse = tuple._1; + Integer statusCode = tuple._2; + long timeout = tuple._3.toMillis(); + Try.run(() -> Thread.sleep(timeout)); + counter.incrementAndGet(); + return sendString(httpServerResponse.status(statusCode), Mono.just("OK")); + } + + public static Publisher<Void> sendWithDelay(HttpServerResponse response, int statusCode, Duration timeout) { + Try.run(() -> Thread.sleep(timeout.toMillis())); + return sendString(response.status(statusCode), Mono.just("OK")); + } + public static Publisher<Void> sendResource(HttpServerResponse httpServerResponse, String resourcePath) { return sendString(httpServerResponse, Mono.fromCallable(() -> readResource(resourcePath))); } - public static Publisher<Void> sendError(HttpServerResponse httpServerResponse, int statusCode, String message){ + public static Publisher<Void> sendError(HttpServerResponse httpServerResponse, int statusCode, String message) { return sendString(httpServerResponse.status(statusCode), Mono.just(message)); } @@ -79,6 +98,11 @@ public class DummyHttpServer { server.disposeNow(); } + public DummyHttpServer closeAndGet() { + server.disposeNow(); + return this; + } + public String host() { return server.host(); } diff --git a/rest-services/http-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/adapters/http/RxHttpClientIT.java b/rest-services/http-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/adapters/http/RxHttpClientIT.java index 6f3a0909..daf04c6e 100644 --- a/rest-services/http-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/adapters/http/RxHttpClientIT.java +++ b/rest-services/http-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/adapters/http/RxHttpClientIT.java @@ -2,7 +2,7 @@ * ============LICENSE_START==================================== * DCAEGEN2-SERVICES-SDK * ========================================================= - * Copyright (C) 2019-2020 Nokia. All rights reserved. + * Copyright (C) 2019-2021 Nokia. All rights reserved. * ========================================================= * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -22,33 +22,55 @@ package org.onap.dcaegen2.services.sdk.rest.services.adapters.http; import io.netty.handler.codec.http.HttpResponseStatus; import io.netty.handler.timeout.ReadTimeoutException; +import io.vavr.Tuple; +import io.vavr.collection.HashSet; import org.junit.jupiter.api.AfterAll; -import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; +import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.config.ImmutableRetryConfig; +import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.config.ImmutableRxHttpClientConfig; import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.exceptions.HttpException; import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.test.DummyHttpServer; import reactor.core.publisher.Mono; import reactor.test.StepVerifier; +import java.net.ConnectException; import java.net.MalformedURLException; import java.net.URL; import java.time.Duration; +import java.util.concurrent.atomic.AtomicInteger; import static org.assertj.core.api.Assertions.assertThat; +import static org.onap.dcaegen2.services.sdk.rest.services.adapters.http.test.DummyHttpServer.sendInOrderWithDelay; import static org.onap.dcaegen2.services.sdk.rest.services.adapters.http.test.DummyHttpServer.sendString; class RxHttpClientIT { private static final Duration TIMEOUT = Duration.ofHours(5); - private final RxHttpClient cut = RxHttpClientFactory.create(); - private static DummyHttpServer httpServer; - - @BeforeAll - static void setUpClass() { - httpServer = DummyHttpServer.start(routes -> routes - .get("/sample-get", (req, resp) -> sendString(resp, Mono.just("OK"))) - .get("/delayed-get", (req, resp) -> sendString(resp, Mono.just("OK").delayElement(Duration.ofMinutes(1)))) + private static final Duration NO_DELAY = Duration.ofSeconds(0); + private static final int RETRY_COUNT = 1; + private static final int EXPECTED_REQUESTS_WHEN_RETRY = RETRY_COUNT + 1; + private static final DummyHttpServer HTTP_SERVER = initialize(); + private static final DummyHttpServer DISPOSED_HTTP_SERVER = initialize().closeAndGet(); + private static final Mono<String> OK = Mono.just("OK"); + private static final Duration RETRY_INTERVAL = Duration.ofMillis(1); + private static AtomicInteger REQUEST_COUNTER; + + private static DummyHttpServer initialize() { + return DummyHttpServer.start(routes -> routes + .get("/sample-get", (req, resp) -> sendString(resp, OK)) + .get("/delay-get", (req, resp) -> + sendInOrderWithDelay(REQUEST_COUNTER, Tuple.of(resp, 200, Duration.ofSeconds(3)))) .get("/sample-get-500", (req, resp) -> resp.status(HttpResponseStatus.INTERNAL_SERVER_ERROR).send()) + .get("/retry-get-500", (req, resp) -> + sendInOrderWithDelay(REQUEST_COUNTER, + Tuple.of(resp, 500, NO_DELAY), Tuple.of(resp, 500, NO_DELAY))) + .get("/retry-get-400", (req, resp) -> + sendInOrderWithDelay(REQUEST_COUNTER, Tuple.of(resp, 400, NO_DELAY))) + .get("/retry-get-500-200", (req, resp) -> + sendInOrderWithDelay(REQUEST_COUNTER, + Tuple.of(resp, 500, NO_DELAY), Tuple.of(resp, 200, NO_DELAY))) + .get("/retry-get-200", (req, resp) -> + sendInOrderWithDelay(REQUEST_COUNTER, Tuple.of(resp, 200, NO_DELAY))) .post("/headers-post", (req, resp) -> resp .sendString(Mono.just(req.requestHeaders().toString()))) .post("/echo-post", (req, resp) -> resp.send(req.receive().retain())) @@ -57,12 +79,7 @@ class RxHttpClientIT { @AfterAll static void tearDownClass() { - httpServer.close(); - } - - private ImmutableHttpRequest.Builder requestFor(String path) throws MalformedURLException { - return ImmutableHttpRequest.builder() - .url(new URL("http", httpServer.host(), httpServer.port(), path).toString()); + HTTP_SERVER.close(); } @Test @@ -71,6 +88,7 @@ class RxHttpClientIT { final HttpRequest httpRequest = requestFor("/sample-get") .method(HttpMethod.GET) .build(); + final RxHttpClient cut = RxHttpClientFactory.create(); // when final Mono<String> bodyAsString = cut.call(httpRequest) @@ -90,6 +108,7 @@ class RxHttpClientIT { final HttpRequest httpRequest = requestFor("/sample-get-500") .method(HttpMethod.GET) .build(); + final RxHttpClient cut = RxHttpClientFactory.create(); // when final Mono<String> bodyAsString = cut.call(httpRequest) @@ -110,6 +129,7 @@ class RxHttpClientIT { .method(HttpMethod.POST) .body(RequestBody.fromString(requestBody)) .build(); + final RxHttpClient cut = RxHttpClientFactory.create(); // when final Mono<String> bodyAsString = cut.call(httpRequest) @@ -131,6 +151,7 @@ class RxHttpClientIT { .method(HttpMethod.POST) .body(RequestBody.chunkedFromString(Mono.just(requestBody))) .build(); + final RxHttpClient cut = RxHttpClientFactory.create(); // when final Mono<String> bodyAsString = cut.call(httpRequest) @@ -155,6 +176,7 @@ class RxHttpClientIT { .method(HttpMethod.POST) .body(RequestBody.fromString(requestBody)) .build(); + final RxHttpClient cut = RxHttpClientFactory.create(); // when final Mono<String> bodyAsString = cut.call(httpRequest) @@ -174,10 +196,12 @@ class RxHttpClientIT { @Test void getWithTimeoutError() throws Exception { // given - final HttpRequest httpRequest = requestFor("/delayed-get") + REQUEST_COUNTER = new AtomicInteger(); + final HttpRequest httpRequest = requestFor("/delay-get") .method(HttpMethod.GET) - .timeout(Duration.ofSeconds(1)) + .timeout(Duration.ofMillis(1)) .build(); + final RxHttpClient cut = RxHttpClientFactory.create(ImmutableRxHttpClientConfig.builder().build()); // when final Mono<HttpResponse> response = cut.call(httpRequest); @@ -186,5 +210,208 @@ class RxHttpClientIT { StepVerifier.create(response) .expectError(ReadTimeoutException.class) .verify(TIMEOUT); + assertNoServerResponse(); + } + + @Test + void getWithRetryExhaustedExceptionWhenClosedServer() throws Exception { + // given + REQUEST_COUNTER = new AtomicInteger(); + final HttpRequest httpRequest = requestForClosedServer("/sample-get") + .method(HttpMethod.GET) + .build(); + final RxHttpClient cut = RxHttpClientFactory.create(ImmutableRxHttpClientConfig.builder() + .retryConfig(defaultRetryConfig() + .customRetryableExceptions(HashSet.of(ConnectException.class)) + .build()) + .build()); + + // when + final Mono<HttpResponse> response = cut.call(httpRequest); + + // then + StepVerifier.create(response) + .expectError(IllegalStateException.class) + .verify(TIMEOUT); + assertNoServerResponse(); + } + + @Test + void getWithCustomRetryExhaustedExceptionWhenClosedServer() throws Exception { + // given + REQUEST_COUNTER = new AtomicInteger(); + final HttpRequest httpRequest = requestForClosedServer("/sample-get") + .method(HttpMethod.GET) + .build(); + final RxHttpClient cut = RxHttpClientFactory.create(ImmutableRxHttpClientConfig.builder() + .retryConfig(defaultRetryConfig() + .customRetryableExceptions(HashSet.of(ConnectException.class)) + .onRetryExhaustedException(ReadTimeoutException.INSTANCE) + .build()) + .build()); + + // when + final Mono<HttpResponse> response = cut.call(httpRequest); + + // then + StepVerifier.create(response) + .expectError(ReadTimeoutException.class) + .verify(TIMEOUT); + assertNoServerResponse(); + } + + @Test + void getWithRetryExhaustedExceptionWhen500() throws Exception { + // given + REQUEST_COUNTER = new AtomicInteger(); + final HttpRequest httpRequest = requestFor("/retry-get-500") + .method(HttpMethod.GET) + .build(); + final RxHttpClient cut = RxHttpClientFactory.create(ImmutableRxHttpClientConfig.builder() + .retryConfig(defaultRetryConfig() + .retryableHttpResponseCodes(HashSet.of(500)) + .build()) + .build()); + + // when + final Mono<HttpResponse> response = cut.call(httpRequest); + + // then + StepVerifier.create(response) + .expectError(IllegalStateException.class) + .verify(TIMEOUT); + assertRetry(); + } + + @Test + void getWithCustomRetryExhaustedExceptionWhen500() throws Exception { + // given + REQUEST_COUNTER = new AtomicInteger(); + final HttpRequest httpRequest = requestFor("/retry-get-500") + .method(HttpMethod.GET) + .build(); + final RxHttpClient cut = RxHttpClientFactory.create(ImmutableRxHttpClientConfig.builder() + .retryConfig(defaultRetryConfig() + .onRetryExhaustedException(ReadTimeoutException.INSTANCE) + .retryableHttpResponseCodes(HashSet.of(500)) + .build()) + .build()); + + // when + final Mono<HttpResponse> response = cut.call(httpRequest); + + // then + StepVerifier.create(response) + .expectError(ReadTimeoutException.class) + .verify(TIMEOUT); + assertRetry(); + } + + @Test + void getWithRetryWhen500AndThen200() throws Exception { + // given + REQUEST_COUNTER = new AtomicInteger(); + final HttpRequest httpRequest = requestFor("/retry-get-500-200") + .method(HttpMethod.GET) + .build(); + final RxHttpClient cut = RxHttpClientFactory.create(ImmutableRxHttpClientConfig.builder() + .retryConfig(defaultRetryConfig() + .retryableHttpResponseCodes(HashSet.of(500)) + .build()) + .build()); + + // when + final Mono<String> bodyAsString = cut.call(httpRequest) + .doOnNext(HttpResponse::throwIfUnsuccessful) + .map(HttpResponse::bodyAsString); + + // then + StepVerifier.create(bodyAsString) + .expectNext("OK") + .expectComplete() + .verify(TIMEOUT); + assertRetry(); + } + + @Test + void getWithoutRetryWhen200() throws Exception { + // given + REQUEST_COUNTER = new AtomicInteger(); + final HttpRequest httpRequest = requestFor("/retry-get-200") + .method(HttpMethod.GET) + .build(); + final RxHttpClient cut = RxHttpClientFactory.create(ImmutableRxHttpClientConfig.builder() + .retryConfig(defaultRetryConfig() + .retryableHttpResponseCodes(HashSet.of(500)) + .build()) + .build()); + + // when + final Mono<String> bodyAsString = cut.call(httpRequest) + .doOnNext(HttpResponse::throwIfUnsuccessful) + .map(HttpResponse::bodyAsString); + + // then + StepVerifier.create(bodyAsString) + .expectNext("OK") + .expectComplete() + .verify(TIMEOUT); + assertNoRetry(); + } + + @Test + void getWithoutRetryWhen400() throws Exception { + // given + REQUEST_COUNTER = new AtomicInteger(); + final HttpRequest httpRequest = requestFor("/retry-get-400") + .method(HttpMethod.GET) + .build(); + final RxHttpClient cut = RxHttpClientFactory.create(ImmutableRxHttpClientConfig.builder() + .retryConfig(defaultRetryConfig() + .retryableHttpResponseCodes(HashSet.of(500)) + .build()) + .build()); + + // when + Mono<HttpResponse> result = cut.call(httpRequest); + + // then + StepVerifier.create(result) + .consumeNextWith(this::assert400) + .expectComplete() + .verify(TIMEOUT); + assertNoRetry(); + } + + private ImmutableHttpRequest.Builder requestFor(String path) throws MalformedURLException { + return ImmutableHttpRequest.builder() + .url(new URL("http", HTTP_SERVER.host(), HTTP_SERVER.port(), path).toString()); + } + + private ImmutableHttpRequest.Builder requestForClosedServer(String path) throws MalformedURLException { + return ImmutableHttpRequest.builder() + .url(new URL("http", DISPOSED_HTTP_SERVER.host(), DISPOSED_HTTP_SERVER.port(), path).toString()); + } + + private ImmutableRetryConfig.Builder defaultRetryConfig() { + return ImmutableRetryConfig.builder() + .retryCount(RETRY_COUNT) + .retryInterval(RETRY_INTERVAL); + } + + private void assertRetry() { + assertThat(REQUEST_COUNTER.get()).isEqualTo(EXPECTED_REQUESTS_WHEN_RETRY); + } + + private void assertNoRetry() { + assertThat(REQUEST_COUNTER.get()).isOne(); + } + + private void assertNoServerResponse() { + assertThat(REQUEST_COUNTER.get()).isZero(); + } + + private void assert400(HttpResponse httpResponse) { + assertThat(httpResponse.statusCode()).isEqualTo(400); } } |