diff options
Diffstat (limited to 'rest-services/http-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/adapters/http/RxHttpClient.java')
-rw-r--r-- | rest-services/http-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/adapters/http/RxHttpClient.java | 48 |
1 files changed, 15 insertions, 33 deletions
diff --git a/rest-services/http-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/adapters/http/RxHttpClient.java b/rest-services/http-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/adapters/http/RxHttpClient.java index d25d7469..341aaf56 100644 --- a/rest-services/http-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/adapters/http/RxHttpClient.java +++ b/rest-services/http-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/adapters/http/RxHttpClient.java @@ -20,23 +20,21 @@ package org.onap.dcaegen2.services.sdk.rest.services.adapters.http; -import io.netty.handler.codec.http.HttpResponseStatus; -import io.vavr.collection.HashSet; import io.vavr.collection.Stream; import io.vavr.control.Option; -import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.config.RetryConfig; import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.exceptions.RetryableException; +import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.retry.RetryLogic; import org.onap.dcaegen2.services.sdk.rest.services.model.logging.RequestDiagnosticContext; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import reactor.core.publisher.Mono; +import reactor.netty.ByteBufMono; import reactor.netty.http.client.HttpClient; import reactor.netty.http.client.HttpClient.ResponseReceiver; import reactor.netty.http.client.HttpClientRequest; import reactor.netty.http.client.HttpClientResponse; -import reactor.util.retry.Retry; -import reactor.util.retry.RetryBackoffSpec; +import java.util.Objects; import java.util.stream.Collectors; /** @@ -46,21 +44,21 @@ public class RxHttpClient { private static final Logger LOGGER = LoggerFactory.getLogger(RxHttpClient.class); private final HttpClient httpClient; - private RetryConfig retryConfig; + private RetryLogic retryLogic; RxHttpClient(HttpClient httpClient) { - this.httpClient = httpClient; + this.httpClient = Objects.requireNonNull(httpClient, "httpClient must not be null"); } - RxHttpClient(HttpClient httpClient, RetryConfig retryConfig) { + RxHttpClient(HttpClient httpClient, RetryLogic retryLogic) { this(httpClient); - this.retryConfig = retryConfig; + this.retryLogic = retryLogic; } public Mono<HttpResponse> call(HttpRequest request) { Mono<HttpResponse> httpResponseMono = response(request); - return Option.of(retryConfig) - .map(rc -> retryConfig(rc, request.diagnosticContext())) + return Option.of(retryLogic) + .map(rc -> rc.retry(request.diagnosticContext())) .map(httpResponseMono::retryWhen) .getOrElse(() -> httpResponseMono); } @@ -80,13 +78,13 @@ public class RxHttpClient { private Mono<HttpResponse> response(HttpRequest request) { return prepareRequest(request) - .responseSingle((resp, content) -> mapResponse(request.url(), resp.status(), content)); + .responseSingle((resp, content) -> mapResponse(request.url(), resp, content)); } - private Mono<HttpResponse> mapResponse(String url, HttpResponseStatus status, reactor.netty.ByteBufMono content) { + private Mono<HttpResponse> mapResponse(String url, HttpClientResponse response, ByteBufMono content) { return content.asByteArray() .defaultIfEmpty(new byte[0]) - .map(bytes -> new NettyHttpResponse(url, status, bytes)) + .map(bytes -> new NettyHttpResponse(url, response, bytes)) .map(this::validatedResponse); } @@ -98,10 +96,9 @@ public class RxHttpClient { } private boolean shouldRetry(int code) { - return Option.of(retryConfig) - .map(RetryConfig::retryableHttpResponseCodes) - .getOrElse(HashSet::empty) - .contains(code); + return Option.of(retryLogic) + .map(rc -> rc.shouldRetry(code)) + .getOrElse(Boolean.FALSE); } private ResponseReceiver<?> prepareBody(HttpRequest request, HttpClient theClient) { @@ -153,19 +150,4 @@ public class RxHttpClient { context.withSlf4jMdc(LOGGER.isDebugEnabled(), () -> LOGGER.debug("Response status: {}", httpClientResponse.status())); } - - private RetryBackoffSpec retryConfig(RetryConfig retryConfig, RequestDiagnosticContext context) { - return Retry - .fixedDelay(retryConfig.retryCount(), retryConfig.retryInterval()) - .doBeforeRetry(retrySignal -> context.withSlf4jMdc( - LOGGER.isTraceEnabled(), () -> LOGGER.trace("Retry: {}", retrySignal))) - .filter(ex -> isRetryable(retryConfig, ex)) - .onRetryExhaustedThrow((retryBackoffSpec, retrySignal) -> retrySignal.failure()); - } - - private boolean isRetryable(RetryConfig retryConfig, Throwable ex) { - return retryConfig.retryableExceptions() - .toStream() - .exists(clazz -> clazz.isAssignableFrom(ex.getClass())); - } } |