summaryrefslogtreecommitdiffstats
path: root/rest-services/http-client
diff options
context:
space:
mode:
authortkogut <tomasz.kogut@nokia.com>2021-01-19 09:00:56 +0100
committertkogut <tomasz.kogut@nokia.com>2021-01-20 12:20:55 +0100
commit9b309b5e3905cb25d5d661c4428cc9d4ad0402a6 (patch)
tree58c9e881f694fde8347762b6c237de9423f33f23 /rest-services/http-client
parent286637d4a801ab6e933684500509eab308d2e3a6 (diff)
Support retry in DCAE-SDK DMaaP-Client
Issue-ID: DCAEGEN2-1483 Signed-off-by: tkogut <tomasz.kogut@nokia.com> Change-Id: Id3f98c0a9367f7c7c2c53ed3eba8805a5a6ab87e
Diffstat (limited to 'rest-services/http-client')
-rw-r--r--rest-services/http-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/adapters/http/RxHttpClient.java68
-rw-r--r--rest-services/http-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/adapters/http/RxHttpClientFactory.java36
-rw-r--r--rest-services/http-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/adapters/http/config/RetryConfig.java59
-rw-r--r--rest-services/http-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/adapters/http/config/RxHttpClientConfig.java29
-rw-r--r--rest-services/http-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/adapters/http/test/DummyHttpServer.java38
-rw-r--r--rest-services/http-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/adapters/http/RxHttpClientIT.java263
6 files changed, 458 insertions, 35 deletions
diff --git a/rest-services/http-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/adapters/http/RxHttpClient.java b/rest-services/http-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/adapters/http/RxHttpClient.java
index 77b842d7..d0bdf414 100644
--- a/rest-services/http-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/adapters/http/RxHttpClient.java
+++ b/rest-services/http-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/adapters/http/RxHttpClient.java
@@ -2,7 +2,7 @@
* ============LICENSE_START====================================
* DCAEGEN2-SERVICES-SDK
* =========================================================
- * Copyright (C) 2019-2020 Nokia. All rights reserved.
+ * Copyright (C) 2019-2021 Nokia. All rights reserved.
* =========================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -17,18 +17,25 @@
* limitations under the License.
* ============LICENSE_END=====================================
*/
+
package org.onap.dcaegen2.services.sdk.rest.services.adapters.http;
+import io.netty.handler.codec.http.HttpResponseStatus;
+import io.vavr.collection.HashSet;
import io.vavr.collection.Stream;
import io.vavr.control.Option;
+import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.config.RetryConfig;
import org.onap.dcaegen2.services.sdk.rest.services.model.logging.RequestDiagnosticContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.netty.http.client.HttpClient;
import reactor.netty.http.client.HttpClient.ResponseReceiver;
import reactor.netty.http.client.HttpClientRequest;
import reactor.netty.http.client.HttpClientResponse;
+import reactor.util.retry.Retry;
+import reactor.util.retry.RetryBackoffSpec;
import java.util.stream.Collectors;
@@ -39,17 +46,23 @@ public class RxHttpClient {
private static final Logger LOGGER = LoggerFactory.getLogger(RxHttpClient.class);
private final HttpClient httpClient;
+ private RetryConfig retryConfig;
RxHttpClient(HttpClient httpClient) {
this.httpClient = httpClient;
}
+ RxHttpClient(HttpClient httpClient, RetryConfig retryConfig) {
+ this(httpClient);
+ this.retryConfig = retryConfig;
+ }
+
public Mono<HttpResponse> call(HttpRequest request) {
- return prepareRequest(request)
- .responseSingle((resp, content) ->
- content.asByteArray()
- .defaultIfEmpty(new byte[0])
- .map(bytes -> new NettyHttpResponse(request.url(), resp.status(), bytes)));
+ Mono<HttpResponse> httpResponseMono = response(request);
+ return Option.of(retryConfig)
+ .map(rc -> retryConfig(rc, request.diagnosticContext()))
+ .map(httpResponseMono::retryWhen)
+ .getOrElse(() -> httpResponseMono);
}
ResponseReceiver<?> prepareRequest(HttpRequest request) {
@@ -65,6 +78,27 @@ public class RxHttpClient {
return prepareBody(request, theClient);
}
+ private Mono<HttpResponse> response(HttpRequest request) {
+ return prepareRequest(request)
+ .responseSingle((resp, content) -> mapResponse(request.url(), resp.status(), content));
+ }
+
+ private Mono<HttpResponse> mapResponse(String url, HttpResponseStatus status, reactor.netty.ByteBufMono content) {
+ if (shouldRetry(status.code())) {
+ return Mono.error(new RetryConfig.RetryableException());
+ }
+ return content.asByteArray()
+ .defaultIfEmpty(new byte[0])
+ .map(bytes -> new NettyHttpResponse(url, status, bytes));
+ }
+
+ private boolean shouldRetry(int code) {
+ return Option.of(retryConfig)
+ .map(RetryConfig::retryableHttpResponseCodes)
+ .getOrElse(HashSet::empty)
+ .contains(code);
+ }
+
private ResponseReceiver<?> prepareBody(HttpRequest request, HttpClient theClient) {
if (request.body() == null) {
return prepareBodyWithoutContents(request, theClient);
@@ -79,7 +113,7 @@ public class RxHttpClient {
return theClient
.headers(hdrs -> hdrs.set(HttpHeaders.TRANSFER_ENCODING_TYPE, HttpHeaders.CHUNKED))
.request(request.method().asNetty())
- .send(request.body().contents())
+ .send(Flux.from(request.body().contents()))
.uri(request.url());
}
@@ -87,7 +121,7 @@ public class RxHttpClient {
return theClient
.headers(hdrs -> hdrs.set(HttpHeaders.CONTENT_LENGTH, request.body().length().toString()))
.request(request.method().asNetty())
- .send(request.body().contents())
+ .send(Flux.from(request.body().contents()))
.uri(request.url());
}
@@ -114,4 +148,22 @@ public class RxHttpClient {
context.withSlf4jMdc(LOGGER.isDebugEnabled(),
() -> LOGGER.debug("Response status: {}", httpClientResponse.status()));
}
+
+ private RetryBackoffSpec retryConfig(RetryConfig retryConfig, RequestDiagnosticContext context) {
+ RetryBackoffSpec retry = Retry
+ .fixedDelay(retryConfig.retryCount(), retryConfig.retryInterval())
+ .doBeforeRetry(retrySignal -> context.withSlf4jMdc(
+ LOGGER.isTraceEnabled(), () -> LOGGER.trace("Retry: {}", retrySignal)))
+ .filter(ex -> isRetryable(retryConfig, ex));
+
+ return Option.of(retryConfig.onRetryExhaustedException())
+ .map(ex -> retry.onRetryExhaustedThrow((retryBackoffSpec, retrySignal) -> ex))
+ .getOrElse(retry);
+ }
+
+ private boolean isRetryable(RetryConfig retryConfig, Throwable ex) {
+ return retryConfig.retryableExceptions()
+ .toStream()
+ .exists(clazz -> clazz.isAssignableFrom(ex.getClass()));
+ }
}
diff --git a/rest-services/http-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/adapters/http/RxHttpClientFactory.java b/rest-services/http-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/adapters/http/RxHttpClientFactory.java
index 9b23f1d9..118df52b 100644
--- a/rest-services/http-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/adapters/http/RxHttpClientFactory.java
+++ b/rest-services/http-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/adapters/http/RxHttpClientFactory.java
@@ -2,7 +2,7 @@
* ============LICENSE_START====================================
* DCAEGEN2-SERVICES-SDK
* =========================================================
- * Copyright (C) 2019 Nokia. All rights reserved.
+ * Copyright (C) 2019-2021 Nokia. All rights reserved.
* =========================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -21,7 +21,9 @@
package org.onap.dcaegen2.services.sdk.rest.services.adapters.http;
import io.netty.handler.ssl.SslContext;
+import io.vavr.control.Option;
import org.jetbrains.annotations.NotNull;
+import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.config.RxHttpClientConfig;
import org.onap.dcaegen2.services.sdk.security.ssl.SecurityKeys;
import org.onap.dcaegen2.services.sdk.security.ssl.SslFactory;
import org.onap.dcaegen2.services.sdk.security.ssl.TrustStoreKeys;
@@ -42,23 +44,53 @@ public final class RxHttpClientFactory {
return new RxHttpClient(HttpClient.create());
}
+ public static RxHttpClient create(RxHttpClientConfig config) {
+ return createWithConfig(HttpClient.create(), config);
+ }
public static RxHttpClient create(SecurityKeys securityKeys) {
final SslContext context = SSL_FACTORY.createSecureClientContext(securityKeys);
return create(context);
}
+ public static RxHttpClient create(SecurityKeys securityKeys, RxHttpClientConfig config) {
+ final SslContext context = SSL_FACTORY.createSecureClientContext(securityKeys);
+ return create(context, config);
+ }
+
public static RxHttpClient create(TrustStoreKeys trustStoreKeys) {
final SslContext context = SSL_FACTORY.createSecureClientContext(trustStoreKeys);
return create(context);
}
+ public static RxHttpClient create(TrustStoreKeys trustStoreKeys, RxHttpClientConfig config) {
+ final SslContext context = SSL_FACTORY.createSecureClientContext(trustStoreKeys);
+ return create(context, config);
+ }
+
public static RxHttpClient createInsecure() {
final SslContext context = SSL_FACTORY.createInsecureClientContext();
return create(context);
}
+ public static RxHttpClient createInsecure(RxHttpClientConfig config) {
+ final SslContext context = SSL_FACTORY.createInsecureClientContext();
+ return create(context, config);
+ }
+
private static RxHttpClient create(@NotNull SslContext sslContext) {
- return new RxHttpClient(HttpClient.create().secure(sslContextSpec -> sslContextSpec.sslContext(sslContext)));
+ HttpClient secure = HttpClient.create().secure(sslContextSpec -> sslContextSpec.sslContext(sslContext));
+ return new RxHttpClient(secure);
+ }
+
+ private static RxHttpClient create(@NotNull SslContext sslContext, RxHttpClientConfig config) {
+ HttpClient secure = HttpClient.create().secure(sslContextSpec -> sslContextSpec.sslContext(sslContext));
+ return createWithConfig(secure, config);
+ }
+
+ private static RxHttpClient createWithConfig(HttpClient httpClient, RxHttpClientConfig config) {
+ return Option.of(config.retryConfig())
+ .map(retryConfig -> new RxHttpClient(httpClient, retryConfig))
+ .getOrElse(() -> new RxHttpClient(httpClient));
}
}
diff --git a/rest-services/http-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/adapters/http/config/RetryConfig.java b/rest-services/http-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/adapters/http/config/RetryConfig.java
new file mode 100644
index 00000000..a0ae1991
--- /dev/null
+++ b/rest-services/http-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/adapters/http/config/RetryConfig.java
@@ -0,0 +1,59 @@
+/*
+ * ============LICENSE_START====================================
+ * DCAEGEN2-SERVICES-SDK
+ * =========================================================
+ * Copyright (C) 2021 Nokia. All rights reserved.
+ * =========================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=====================================
+ */
+
+package org.onap.dcaegen2.services.sdk.rest.services.adapters.http.config;
+
+import io.vavr.collection.HashSet;
+import io.vavr.collection.Set;
+import org.immutables.value.Value;
+import org.jetbrains.annotations.Nullable;
+
+import java.time.Duration;
+
+@Value.Immutable
+public interface RetryConfig {
+
+ int retryCount();
+
+ Duration retryInterval();
+
+ @Value.Default
+ default Set<Integer> retryableHttpResponseCodes() {
+ return HashSet.empty();
+ }
+
+ @Value.Default
+ default Set<Class<? extends Throwable>> customRetryableExceptions() {
+ return HashSet.empty();
+ }
+
+ @Value.Derived
+ default Set<Class<? extends Throwable>> retryableExceptions() {
+ Set<Class<? extends Throwable>> result = customRetryableExceptions();
+ if (retryableHttpResponseCodes().nonEmpty()) {
+ result = result.add(RetryableException.class);
+ }
+ return result;
+ }
+
+ @Nullable RuntimeException onRetryExhaustedException();
+
+ class RetryableException extends RuntimeException {}
+}
diff --git a/rest-services/http-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/adapters/http/config/RxHttpClientConfig.java b/rest-services/http-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/adapters/http/config/RxHttpClientConfig.java
new file mode 100644
index 00000000..78a88a47
--- /dev/null
+++ b/rest-services/http-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/adapters/http/config/RxHttpClientConfig.java
@@ -0,0 +1,29 @@
+/*
+ * ============LICENSE_START====================================
+ * DCAEGEN2-SERVICES-SDK
+ * =========================================================
+ * Copyright (C) 2021 Nokia. All rights reserved.
+ * =========================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=====================================
+ */
+
+package org.onap.dcaegen2.services.sdk.rest.services.adapters.http.config;
+
+import org.immutables.value.Value;
+import org.jetbrains.annotations.Nullable;
+
+@Value.Immutable
+public interface RxHttpClientConfig {
+ @Nullable RetryConfig retryConfig();
+}
diff --git a/rest-services/http-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/adapters/http/test/DummyHttpServer.java b/rest-services/http-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/adapters/http/test/DummyHttpServer.java
index 4795b00f..8ac0d1d5 100644
--- a/rest-services/http-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/adapters/http/test/DummyHttpServer.java
+++ b/rest-services/http-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/adapters/http/test/DummyHttpServer.java
@@ -2,7 +2,7 @@
* ============LICENSE_START====================================
* DCAEGEN2-SERVICES-SDK
* =========================================================
- * Copyright (C) 2019 Nokia. All rights reserved.
+ * Copyright (C) 2019-2021 Nokia. All rights reserved.
* =========================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -21,11 +21,8 @@
package org.onap.dcaegen2.services.sdk.rest.services.adapters.http.test;
import io.vavr.CheckedFunction0;
-import java.net.URL;
-import java.nio.file.Files;
-import java.nio.file.Paths;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.function.Consumer;
+import io.vavr.Tuple3;
+import io.vavr.control.Try;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -35,6 +32,13 @@ import reactor.netty.http.server.HttpServer;
import reactor.netty.http.server.HttpServerResponse;
import reactor.netty.http.server.HttpServerRoutes;
+import java.net.URL;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.time.Duration;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Consumer;
+
/**
* @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a>
* @since February 2019
@@ -63,11 +67,26 @@ public class DummyHttpServer {
return responses[state.getAndIncrement()];
}
+ public static Publisher<Void> sendInOrderWithDelay(AtomicInteger counter, Tuple3<HttpServerResponse, Integer, Duration>... responses) {
+ Tuple3<HttpServerResponse, Integer, Duration> tuple = responses[counter.get()];
+ HttpServerResponse httpServerResponse = tuple._1;
+ Integer statusCode = tuple._2;
+ long timeout = tuple._3.toMillis();
+ Try.run(() -> Thread.sleep(timeout));
+ counter.incrementAndGet();
+ return sendString(httpServerResponse.status(statusCode), Mono.just("OK"));
+ }
+
+ public static Publisher<Void> sendWithDelay(HttpServerResponse response, int statusCode, Duration timeout) {
+ Try.run(() -> Thread.sleep(timeout.toMillis()));
+ return sendString(response.status(statusCode), Mono.just("OK"));
+ }
+
public static Publisher<Void> sendResource(HttpServerResponse httpServerResponse, String resourcePath) {
return sendString(httpServerResponse, Mono.fromCallable(() -> readResource(resourcePath)));
}
- public static Publisher<Void> sendError(HttpServerResponse httpServerResponse, int statusCode, String message){
+ public static Publisher<Void> sendError(HttpServerResponse httpServerResponse, int statusCode, String message) {
return sendString(httpServerResponse.status(statusCode), Mono.just(message));
}
@@ -79,6 +98,11 @@ public class DummyHttpServer {
server.disposeNow();
}
+ public DummyHttpServer closeAndGet() {
+ server.disposeNow();
+ return this;
+ }
+
public String host() {
return server.host();
}
diff --git a/rest-services/http-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/adapters/http/RxHttpClientIT.java b/rest-services/http-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/adapters/http/RxHttpClientIT.java
index 6f3a0909..daf04c6e 100644
--- a/rest-services/http-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/adapters/http/RxHttpClientIT.java
+++ b/rest-services/http-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/adapters/http/RxHttpClientIT.java
@@ -2,7 +2,7 @@
* ============LICENSE_START====================================
* DCAEGEN2-SERVICES-SDK
* =========================================================
- * Copyright (C) 2019-2020 Nokia. All rights reserved.
+ * Copyright (C) 2019-2021 Nokia. All rights reserved.
* =========================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -22,33 +22,55 @@ package org.onap.dcaegen2.services.sdk.rest.services.adapters.http;
import io.netty.handler.codec.http.HttpResponseStatus;
import io.netty.handler.timeout.ReadTimeoutException;
+import io.vavr.Tuple;
+import io.vavr.collection.HashSet;
import org.junit.jupiter.api.AfterAll;
-import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
+import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.config.ImmutableRetryConfig;
+import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.config.ImmutableRxHttpClientConfig;
import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.exceptions.HttpException;
import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.test.DummyHttpServer;
import reactor.core.publisher.Mono;
import reactor.test.StepVerifier;
+import java.net.ConnectException;
import java.net.MalformedURLException;
import java.net.URL;
import java.time.Duration;
+import java.util.concurrent.atomic.AtomicInteger;
import static org.assertj.core.api.Assertions.assertThat;
+import static org.onap.dcaegen2.services.sdk.rest.services.adapters.http.test.DummyHttpServer.sendInOrderWithDelay;
import static org.onap.dcaegen2.services.sdk.rest.services.adapters.http.test.DummyHttpServer.sendString;
class RxHttpClientIT {
private static final Duration TIMEOUT = Duration.ofHours(5);
- private final RxHttpClient cut = RxHttpClientFactory.create();
- private static DummyHttpServer httpServer;
-
- @BeforeAll
- static void setUpClass() {
- httpServer = DummyHttpServer.start(routes -> routes
- .get("/sample-get", (req, resp) -> sendString(resp, Mono.just("OK")))
- .get("/delayed-get", (req, resp) -> sendString(resp, Mono.just("OK").delayElement(Duration.ofMinutes(1))))
+ private static final Duration NO_DELAY = Duration.ofSeconds(0);
+ private static final int RETRY_COUNT = 1;
+ private static final int EXPECTED_REQUESTS_WHEN_RETRY = RETRY_COUNT + 1;
+ private static final DummyHttpServer HTTP_SERVER = initialize();
+ private static final DummyHttpServer DISPOSED_HTTP_SERVER = initialize().closeAndGet();
+ private static final Mono<String> OK = Mono.just("OK");
+ private static final Duration RETRY_INTERVAL = Duration.ofMillis(1);
+ private static AtomicInteger REQUEST_COUNTER;
+
+ private static DummyHttpServer initialize() {
+ return DummyHttpServer.start(routes -> routes
+ .get("/sample-get", (req, resp) -> sendString(resp, OK))
+ .get("/delay-get", (req, resp) ->
+ sendInOrderWithDelay(REQUEST_COUNTER, Tuple.of(resp, 200, Duration.ofSeconds(3))))
.get("/sample-get-500", (req, resp) -> resp.status(HttpResponseStatus.INTERNAL_SERVER_ERROR).send())
+ .get("/retry-get-500", (req, resp) ->
+ sendInOrderWithDelay(REQUEST_COUNTER,
+ Tuple.of(resp, 500, NO_DELAY), Tuple.of(resp, 500, NO_DELAY)))
+ .get("/retry-get-400", (req, resp) ->
+ sendInOrderWithDelay(REQUEST_COUNTER, Tuple.of(resp, 400, NO_DELAY)))
+ .get("/retry-get-500-200", (req, resp) ->
+ sendInOrderWithDelay(REQUEST_COUNTER,
+ Tuple.of(resp, 500, NO_DELAY), Tuple.of(resp, 200, NO_DELAY)))
+ .get("/retry-get-200", (req, resp) ->
+ sendInOrderWithDelay(REQUEST_COUNTER, Tuple.of(resp, 200, NO_DELAY)))
.post("/headers-post", (req, resp) -> resp
.sendString(Mono.just(req.requestHeaders().toString())))
.post("/echo-post", (req, resp) -> resp.send(req.receive().retain()))
@@ -57,12 +79,7 @@ class RxHttpClientIT {
@AfterAll
static void tearDownClass() {
- httpServer.close();
- }
-
- private ImmutableHttpRequest.Builder requestFor(String path) throws MalformedURLException {
- return ImmutableHttpRequest.builder()
- .url(new URL("http", httpServer.host(), httpServer.port(), path).toString());
+ HTTP_SERVER.close();
}
@Test
@@ -71,6 +88,7 @@ class RxHttpClientIT {
final HttpRequest httpRequest = requestFor("/sample-get")
.method(HttpMethod.GET)
.build();
+ final RxHttpClient cut = RxHttpClientFactory.create();
// when
final Mono<String> bodyAsString = cut.call(httpRequest)
@@ -90,6 +108,7 @@ class RxHttpClientIT {
final HttpRequest httpRequest = requestFor("/sample-get-500")
.method(HttpMethod.GET)
.build();
+ final RxHttpClient cut = RxHttpClientFactory.create();
// when
final Mono<String> bodyAsString = cut.call(httpRequest)
@@ -110,6 +129,7 @@ class RxHttpClientIT {
.method(HttpMethod.POST)
.body(RequestBody.fromString(requestBody))
.build();
+ final RxHttpClient cut = RxHttpClientFactory.create();
// when
final Mono<String> bodyAsString = cut.call(httpRequest)
@@ -131,6 +151,7 @@ class RxHttpClientIT {
.method(HttpMethod.POST)
.body(RequestBody.chunkedFromString(Mono.just(requestBody)))
.build();
+ final RxHttpClient cut = RxHttpClientFactory.create();
// when
final Mono<String> bodyAsString = cut.call(httpRequest)
@@ -155,6 +176,7 @@ class RxHttpClientIT {
.method(HttpMethod.POST)
.body(RequestBody.fromString(requestBody))
.build();
+ final RxHttpClient cut = RxHttpClientFactory.create();
// when
final Mono<String> bodyAsString = cut.call(httpRequest)
@@ -174,10 +196,12 @@ class RxHttpClientIT {
@Test
void getWithTimeoutError() throws Exception {
// given
- final HttpRequest httpRequest = requestFor("/delayed-get")
+ REQUEST_COUNTER = new AtomicInteger();
+ final HttpRequest httpRequest = requestFor("/delay-get")
.method(HttpMethod.GET)
- .timeout(Duration.ofSeconds(1))
+ .timeout(Duration.ofMillis(1))
.build();
+ final RxHttpClient cut = RxHttpClientFactory.create(ImmutableRxHttpClientConfig.builder().build());
// when
final Mono<HttpResponse> response = cut.call(httpRequest);
@@ -186,5 +210,208 @@ class RxHttpClientIT {
StepVerifier.create(response)
.expectError(ReadTimeoutException.class)
.verify(TIMEOUT);
+ assertNoServerResponse();
+ }
+
+ @Test
+ void getWithRetryExhaustedExceptionWhenClosedServer() throws Exception {
+ // given
+ REQUEST_COUNTER = new AtomicInteger();
+ final HttpRequest httpRequest = requestForClosedServer("/sample-get")
+ .method(HttpMethod.GET)
+ .build();
+ final RxHttpClient cut = RxHttpClientFactory.create(ImmutableRxHttpClientConfig.builder()
+ .retryConfig(defaultRetryConfig()
+ .customRetryableExceptions(HashSet.of(ConnectException.class))
+ .build())
+ .build());
+
+ // when
+ final Mono<HttpResponse> response = cut.call(httpRequest);
+
+ // then
+ StepVerifier.create(response)
+ .expectError(IllegalStateException.class)
+ .verify(TIMEOUT);
+ assertNoServerResponse();
+ }
+
+ @Test
+ void getWithCustomRetryExhaustedExceptionWhenClosedServer() throws Exception {
+ // given
+ REQUEST_COUNTER = new AtomicInteger();
+ final HttpRequest httpRequest = requestForClosedServer("/sample-get")
+ .method(HttpMethod.GET)
+ .build();
+ final RxHttpClient cut = RxHttpClientFactory.create(ImmutableRxHttpClientConfig.builder()
+ .retryConfig(defaultRetryConfig()
+ .customRetryableExceptions(HashSet.of(ConnectException.class))
+ .onRetryExhaustedException(ReadTimeoutException.INSTANCE)
+ .build())
+ .build());
+
+ // when
+ final Mono<HttpResponse> response = cut.call(httpRequest);
+
+ // then
+ StepVerifier.create(response)
+ .expectError(ReadTimeoutException.class)
+ .verify(TIMEOUT);
+ assertNoServerResponse();
+ }
+
+ @Test
+ void getWithRetryExhaustedExceptionWhen500() throws Exception {
+ // given
+ REQUEST_COUNTER = new AtomicInteger();
+ final HttpRequest httpRequest = requestFor("/retry-get-500")
+ .method(HttpMethod.GET)
+ .build();
+ final RxHttpClient cut = RxHttpClientFactory.create(ImmutableRxHttpClientConfig.builder()
+ .retryConfig(defaultRetryConfig()
+ .retryableHttpResponseCodes(HashSet.of(500))
+ .build())
+ .build());
+
+ // when
+ final Mono<HttpResponse> response = cut.call(httpRequest);
+
+ // then
+ StepVerifier.create(response)
+ .expectError(IllegalStateException.class)
+ .verify(TIMEOUT);
+ assertRetry();
+ }
+
+ @Test
+ void getWithCustomRetryExhaustedExceptionWhen500() throws Exception {
+ // given
+ REQUEST_COUNTER = new AtomicInteger();
+ final HttpRequest httpRequest = requestFor("/retry-get-500")
+ .method(HttpMethod.GET)
+ .build();
+ final RxHttpClient cut = RxHttpClientFactory.create(ImmutableRxHttpClientConfig.builder()
+ .retryConfig(defaultRetryConfig()
+ .onRetryExhaustedException(ReadTimeoutException.INSTANCE)
+ .retryableHttpResponseCodes(HashSet.of(500))
+ .build())
+ .build());
+
+ // when
+ final Mono<HttpResponse> response = cut.call(httpRequest);
+
+ // then
+ StepVerifier.create(response)
+ .expectError(ReadTimeoutException.class)
+ .verify(TIMEOUT);
+ assertRetry();
+ }
+
+ @Test
+ void getWithRetryWhen500AndThen200() throws Exception {
+ // given
+ REQUEST_COUNTER = new AtomicInteger();
+ final HttpRequest httpRequest = requestFor("/retry-get-500-200")
+ .method(HttpMethod.GET)
+ .build();
+ final RxHttpClient cut = RxHttpClientFactory.create(ImmutableRxHttpClientConfig.builder()
+ .retryConfig(defaultRetryConfig()
+ .retryableHttpResponseCodes(HashSet.of(500))
+ .build())
+ .build());
+
+ // when
+ final Mono<String> bodyAsString = cut.call(httpRequest)
+ .doOnNext(HttpResponse::throwIfUnsuccessful)
+ .map(HttpResponse::bodyAsString);
+
+ // then
+ StepVerifier.create(bodyAsString)
+ .expectNext("OK")
+ .expectComplete()
+ .verify(TIMEOUT);
+ assertRetry();
+ }
+
+ @Test
+ void getWithoutRetryWhen200() throws Exception {
+ // given
+ REQUEST_COUNTER = new AtomicInteger();
+ final HttpRequest httpRequest = requestFor("/retry-get-200")
+ .method(HttpMethod.GET)
+ .build();
+ final RxHttpClient cut = RxHttpClientFactory.create(ImmutableRxHttpClientConfig.builder()
+ .retryConfig(defaultRetryConfig()
+ .retryableHttpResponseCodes(HashSet.of(500))
+ .build())
+ .build());
+
+ // when
+ final Mono<String> bodyAsString = cut.call(httpRequest)
+ .doOnNext(HttpResponse::throwIfUnsuccessful)
+ .map(HttpResponse::bodyAsString);
+
+ // then
+ StepVerifier.create(bodyAsString)
+ .expectNext("OK")
+ .expectComplete()
+ .verify(TIMEOUT);
+ assertNoRetry();
+ }
+
+ @Test
+ void getWithoutRetryWhen400() throws Exception {
+ // given
+ REQUEST_COUNTER = new AtomicInteger();
+ final HttpRequest httpRequest = requestFor("/retry-get-400")
+ .method(HttpMethod.GET)
+ .build();
+ final RxHttpClient cut = RxHttpClientFactory.create(ImmutableRxHttpClientConfig.builder()
+ .retryConfig(defaultRetryConfig()
+ .retryableHttpResponseCodes(HashSet.of(500))
+ .build())
+ .build());
+
+ // when
+ Mono<HttpResponse> result = cut.call(httpRequest);
+
+ // then
+ StepVerifier.create(result)
+ .consumeNextWith(this::assert400)
+ .expectComplete()
+ .verify(TIMEOUT);
+ assertNoRetry();
+ }
+
+ private ImmutableHttpRequest.Builder requestFor(String path) throws MalformedURLException {
+ return ImmutableHttpRequest.builder()
+ .url(new URL("http", HTTP_SERVER.host(), HTTP_SERVER.port(), path).toString());
+ }
+
+ private ImmutableHttpRequest.Builder requestForClosedServer(String path) throws MalformedURLException {
+ return ImmutableHttpRequest.builder()
+ .url(new URL("http", DISPOSED_HTTP_SERVER.host(), DISPOSED_HTTP_SERVER.port(), path).toString());
+ }
+
+ private ImmutableRetryConfig.Builder defaultRetryConfig() {
+ return ImmutableRetryConfig.builder()
+ .retryCount(RETRY_COUNT)
+ .retryInterval(RETRY_INTERVAL);
+ }
+
+ private void assertRetry() {
+ assertThat(REQUEST_COUNTER.get()).isEqualTo(EXPECTED_REQUESTS_WHEN_RETRY);
+ }
+
+ private void assertNoRetry() {
+ assertThat(REQUEST_COUNTER.get()).isOne();
+ }
+
+ private void assertNoServerResponse() {
+ assertThat(REQUEST_COUNTER.get()).isZero();
+ }
+
+ private void assert400(HttpResponse httpResponse) {
+ assertThat(httpResponse.statusCode()).isEqualTo(400);
}
}