diff options
Diffstat (limited to 'rest-services/http-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/adapters/http/RxHttpClient.java')
-rw-r--r-- | rest-services/http-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/adapters/http/RxHttpClient.java | 68 |
1 files changed, 60 insertions, 8 deletions
diff --git a/rest-services/http-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/adapters/http/RxHttpClient.java b/rest-services/http-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/adapters/http/RxHttpClient.java index 77b842d7..d0bdf414 100644 --- a/rest-services/http-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/adapters/http/RxHttpClient.java +++ b/rest-services/http-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/adapters/http/RxHttpClient.java @@ -2,7 +2,7 @@ * ============LICENSE_START==================================== * DCAEGEN2-SERVICES-SDK * ========================================================= - * Copyright (C) 2019-2020 Nokia. All rights reserved. + * Copyright (C) 2019-2021 Nokia. All rights reserved. * ========================================================= * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -17,18 +17,25 @@ * limitations under the License. * ============LICENSE_END===================================== */ + package org.onap.dcaegen2.services.sdk.rest.services.adapters.http; +import io.netty.handler.codec.http.HttpResponseStatus; +import io.vavr.collection.HashSet; import io.vavr.collection.Stream; import io.vavr.control.Option; +import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.config.RetryConfig; import org.onap.dcaegen2.services.sdk.rest.services.model.logging.RequestDiagnosticContext; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.netty.http.client.HttpClient; import reactor.netty.http.client.HttpClient.ResponseReceiver; import reactor.netty.http.client.HttpClientRequest; import reactor.netty.http.client.HttpClientResponse; +import reactor.util.retry.Retry; +import reactor.util.retry.RetryBackoffSpec; import java.util.stream.Collectors; @@ -39,17 +46,23 @@ public class RxHttpClient { private static final Logger LOGGER = LoggerFactory.getLogger(RxHttpClient.class); private final HttpClient httpClient; + private RetryConfig retryConfig; RxHttpClient(HttpClient httpClient) { this.httpClient = httpClient; } + RxHttpClient(HttpClient httpClient, RetryConfig retryConfig) { + this(httpClient); + this.retryConfig = retryConfig; + } + public Mono<HttpResponse> call(HttpRequest request) { - return prepareRequest(request) - .responseSingle((resp, content) -> - content.asByteArray() - .defaultIfEmpty(new byte[0]) - .map(bytes -> new NettyHttpResponse(request.url(), resp.status(), bytes))); + Mono<HttpResponse> httpResponseMono = response(request); + return Option.of(retryConfig) + .map(rc -> retryConfig(rc, request.diagnosticContext())) + .map(httpResponseMono::retryWhen) + .getOrElse(() -> httpResponseMono); } ResponseReceiver<?> prepareRequest(HttpRequest request) { @@ -65,6 +78,27 @@ public class RxHttpClient { return prepareBody(request, theClient); } + private Mono<HttpResponse> response(HttpRequest request) { + return prepareRequest(request) + .responseSingle((resp, content) -> mapResponse(request.url(), resp.status(), content)); + } + + private Mono<HttpResponse> mapResponse(String url, HttpResponseStatus status, reactor.netty.ByteBufMono content) { + if (shouldRetry(status.code())) { + return Mono.error(new RetryConfig.RetryableException()); + } + return content.asByteArray() + .defaultIfEmpty(new byte[0]) + .map(bytes -> new NettyHttpResponse(url, status, bytes)); + } + + private boolean shouldRetry(int code) { + return Option.of(retryConfig) + .map(RetryConfig::retryableHttpResponseCodes) + .getOrElse(HashSet::empty) + .contains(code); + } + private ResponseReceiver<?> prepareBody(HttpRequest request, HttpClient theClient) { if (request.body() == null) { return prepareBodyWithoutContents(request, theClient); @@ -79,7 +113,7 @@ public class RxHttpClient { return theClient .headers(hdrs -> hdrs.set(HttpHeaders.TRANSFER_ENCODING_TYPE, HttpHeaders.CHUNKED)) .request(request.method().asNetty()) - .send(request.body().contents()) + .send(Flux.from(request.body().contents())) .uri(request.url()); } @@ -87,7 +121,7 @@ public class RxHttpClient { return theClient .headers(hdrs -> hdrs.set(HttpHeaders.CONTENT_LENGTH, request.body().length().toString())) .request(request.method().asNetty()) - .send(request.body().contents()) + .send(Flux.from(request.body().contents())) .uri(request.url()); } @@ -114,4 +148,22 @@ public class RxHttpClient { context.withSlf4jMdc(LOGGER.isDebugEnabled(), () -> LOGGER.debug("Response status: {}", httpClientResponse.status())); } + + private RetryBackoffSpec retryConfig(RetryConfig retryConfig, RequestDiagnosticContext context) { + RetryBackoffSpec retry = Retry + .fixedDelay(retryConfig.retryCount(), retryConfig.retryInterval()) + .doBeforeRetry(retrySignal -> context.withSlf4jMdc( + LOGGER.isTraceEnabled(), () -> LOGGER.trace("Retry: {}", retrySignal))) + .filter(ex -> isRetryable(retryConfig, ex)); + + return Option.of(retryConfig.onRetryExhaustedException()) + .map(ex -> retry.onRetryExhaustedThrow((retryBackoffSpec, retrySignal) -> ex)) + .getOrElse(retry); + } + + private boolean isRetryable(RetryConfig retryConfig, Throwable ex) { + return retryConfig.retryableExceptions() + .toStream() + .exists(clazz -> clazz.isAssignableFrom(ex.getClass())); + } } |