aboutsummaryrefslogtreecommitdiffstats
path: root/rest-services/http-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/adapters/http/RxHttpClient.java
diff options
context:
space:
mode:
Diffstat (limited to 'rest-services/http-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/adapters/http/RxHttpClient.java')
-rw-r--r--rest-services/http-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/adapters/http/RxHttpClient.java48
1 files changed, 15 insertions, 33 deletions
diff --git a/rest-services/http-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/adapters/http/RxHttpClient.java b/rest-services/http-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/adapters/http/RxHttpClient.java
index d25d7469..341aaf56 100644
--- a/rest-services/http-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/adapters/http/RxHttpClient.java
+++ b/rest-services/http-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/adapters/http/RxHttpClient.java
@@ -20,23 +20,21 @@
package org.onap.dcaegen2.services.sdk.rest.services.adapters.http;
-import io.netty.handler.codec.http.HttpResponseStatus;
-import io.vavr.collection.HashSet;
import io.vavr.collection.Stream;
import io.vavr.control.Option;
-import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.config.RetryConfig;
import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.exceptions.RetryableException;
+import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.retry.RetryLogic;
import org.onap.dcaegen2.services.sdk.rest.services.model.logging.RequestDiagnosticContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Mono;
+import reactor.netty.ByteBufMono;
import reactor.netty.http.client.HttpClient;
import reactor.netty.http.client.HttpClient.ResponseReceiver;
import reactor.netty.http.client.HttpClientRequest;
import reactor.netty.http.client.HttpClientResponse;
-import reactor.util.retry.Retry;
-import reactor.util.retry.RetryBackoffSpec;
+import java.util.Objects;
import java.util.stream.Collectors;
/**
@@ -46,21 +44,21 @@ public class RxHttpClient {
private static final Logger LOGGER = LoggerFactory.getLogger(RxHttpClient.class);
private final HttpClient httpClient;
- private RetryConfig retryConfig;
+ private RetryLogic retryLogic;
RxHttpClient(HttpClient httpClient) {
- this.httpClient = httpClient;
+ this.httpClient = Objects.requireNonNull(httpClient, "httpClient must not be null");
}
- RxHttpClient(HttpClient httpClient, RetryConfig retryConfig) {
+ RxHttpClient(HttpClient httpClient, RetryLogic retryLogic) {
this(httpClient);
- this.retryConfig = retryConfig;
+ this.retryLogic = retryLogic;
}
public Mono<HttpResponse> call(HttpRequest request) {
Mono<HttpResponse> httpResponseMono = response(request);
- return Option.of(retryConfig)
- .map(rc -> retryConfig(rc, request.diagnosticContext()))
+ return Option.of(retryLogic)
+ .map(rc -> rc.retry(request.diagnosticContext()))
.map(httpResponseMono::retryWhen)
.getOrElse(() -> httpResponseMono);
}
@@ -80,13 +78,13 @@ public class RxHttpClient {
private Mono<HttpResponse> response(HttpRequest request) {
return prepareRequest(request)
- .responseSingle((resp, content) -> mapResponse(request.url(), resp.status(), content));
+ .responseSingle((resp, content) -> mapResponse(request.url(), resp, content));
}
- private Mono<HttpResponse> mapResponse(String url, HttpResponseStatus status, reactor.netty.ByteBufMono content) {
+ private Mono<HttpResponse> mapResponse(String url, HttpClientResponse response, ByteBufMono content) {
return content.asByteArray()
.defaultIfEmpty(new byte[0])
- .map(bytes -> new NettyHttpResponse(url, status, bytes))
+ .map(bytes -> new NettyHttpResponse(url, response, bytes))
.map(this::validatedResponse);
}
@@ -98,10 +96,9 @@ public class RxHttpClient {
}
private boolean shouldRetry(int code) {
- return Option.of(retryConfig)
- .map(RetryConfig::retryableHttpResponseCodes)
- .getOrElse(HashSet::empty)
- .contains(code);
+ return Option.of(retryLogic)
+ .map(rc -> rc.shouldRetry(code))
+ .getOrElse(Boolean.FALSE);
}
private ResponseReceiver<?> prepareBody(HttpRequest request, HttpClient theClient) {
@@ -153,19 +150,4 @@ public class RxHttpClient {
context.withSlf4jMdc(LOGGER.isDebugEnabled(),
() -> LOGGER.debug("Response status: {}", httpClientResponse.status()));
}
-
- private RetryBackoffSpec retryConfig(RetryConfig retryConfig, RequestDiagnosticContext context) {
- return Retry
- .fixedDelay(retryConfig.retryCount(), retryConfig.retryInterval())
- .doBeforeRetry(retrySignal -> context.withSlf4jMdc(
- LOGGER.isTraceEnabled(), () -> LOGGER.trace("Retry: {}", retrySignal)))
- .filter(ex -> isRetryable(retryConfig, ex))
- .onRetryExhaustedThrow((retryBackoffSpec, retrySignal) -> retrySignal.failure());
- }
-
- private boolean isRetryable(RetryConfig retryConfig, Throwable ex) {
- return retryConfig.retryableExceptions()
- .toStream()
- .exists(clazz -> clazz.isAssignableFrom(ex.getClass()));
- }
}