aboutsummaryrefslogtreecommitdiffstats
path: root/rest-services/http-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/adapters/http/RxHttpClient.java
diff options
context:
space:
mode:
Diffstat (limited to 'rest-services/http-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/adapters/http/RxHttpClient.java')
-rw-r--r--rest-services/http-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/adapters/http/RxHttpClient.java68
1 files changed, 60 insertions, 8 deletions
diff --git a/rest-services/http-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/adapters/http/RxHttpClient.java b/rest-services/http-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/adapters/http/RxHttpClient.java
index 77b842d7..d0bdf414 100644
--- a/rest-services/http-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/adapters/http/RxHttpClient.java
+++ b/rest-services/http-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/adapters/http/RxHttpClient.java
@@ -2,7 +2,7 @@
* ============LICENSE_START====================================
* DCAEGEN2-SERVICES-SDK
* =========================================================
- * Copyright (C) 2019-2020 Nokia. All rights reserved.
+ * Copyright (C) 2019-2021 Nokia. All rights reserved.
* =========================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -17,18 +17,25 @@
* limitations under the License.
* ============LICENSE_END=====================================
*/
+
package org.onap.dcaegen2.services.sdk.rest.services.adapters.http;
+import io.netty.handler.codec.http.HttpResponseStatus;
+import io.vavr.collection.HashSet;
import io.vavr.collection.Stream;
import io.vavr.control.Option;
+import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.config.RetryConfig;
import org.onap.dcaegen2.services.sdk.rest.services.model.logging.RequestDiagnosticContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.netty.http.client.HttpClient;
import reactor.netty.http.client.HttpClient.ResponseReceiver;
import reactor.netty.http.client.HttpClientRequest;
import reactor.netty.http.client.HttpClientResponse;
+import reactor.util.retry.Retry;
+import reactor.util.retry.RetryBackoffSpec;
import java.util.stream.Collectors;
@@ -39,17 +46,23 @@ public class RxHttpClient {
private static final Logger LOGGER = LoggerFactory.getLogger(RxHttpClient.class);
private final HttpClient httpClient;
+ private RetryConfig retryConfig;
RxHttpClient(HttpClient httpClient) {
this.httpClient = httpClient;
}
+ RxHttpClient(HttpClient httpClient, RetryConfig retryConfig) {
+ this(httpClient);
+ this.retryConfig = retryConfig;
+ }
+
public Mono<HttpResponse> call(HttpRequest request) {
- return prepareRequest(request)
- .responseSingle((resp, content) ->
- content.asByteArray()
- .defaultIfEmpty(new byte[0])
- .map(bytes -> new NettyHttpResponse(request.url(), resp.status(), bytes)));
+ Mono<HttpResponse> httpResponseMono = response(request);
+ return Option.of(retryConfig)
+ .map(rc -> retryConfig(rc, request.diagnosticContext()))
+ .map(httpResponseMono::retryWhen)
+ .getOrElse(() -> httpResponseMono);
}
ResponseReceiver<?> prepareRequest(HttpRequest request) {
@@ -65,6 +78,27 @@ public class RxHttpClient {
return prepareBody(request, theClient);
}
+ private Mono<HttpResponse> response(HttpRequest request) {
+ return prepareRequest(request)
+ .responseSingle((resp, content) -> mapResponse(request.url(), resp.status(), content));
+ }
+
+ private Mono<HttpResponse> mapResponse(String url, HttpResponseStatus status, reactor.netty.ByteBufMono content) {
+ if (shouldRetry(status.code())) {
+ return Mono.error(new RetryConfig.RetryableException());
+ }
+ return content.asByteArray()
+ .defaultIfEmpty(new byte[0])
+ .map(bytes -> new NettyHttpResponse(url, status, bytes));
+ }
+
+ private boolean shouldRetry(int code) {
+ return Option.of(retryConfig)
+ .map(RetryConfig::retryableHttpResponseCodes)
+ .getOrElse(HashSet::empty)
+ .contains(code);
+ }
+
private ResponseReceiver<?> prepareBody(HttpRequest request, HttpClient theClient) {
if (request.body() == null) {
return prepareBodyWithoutContents(request, theClient);
@@ -79,7 +113,7 @@ public class RxHttpClient {
return theClient
.headers(hdrs -> hdrs.set(HttpHeaders.TRANSFER_ENCODING_TYPE, HttpHeaders.CHUNKED))
.request(request.method().asNetty())
- .send(request.body().contents())
+ .send(Flux.from(request.body().contents()))
.uri(request.url());
}
@@ -87,7 +121,7 @@ public class RxHttpClient {
return theClient
.headers(hdrs -> hdrs.set(HttpHeaders.CONTENT_LENGTH, request.body().length().toString()))
.request(request.method().asNetty())
- .send(request.body().contents())
+ .send(Flux.from(request.body().contents()))
.uri(request.url());
}
@@ -114,4 +148,22 @@ public class RxHttpClient {
context.withSlf4jMdc(LOGGER.isDebugEnabled(),
() -> LOGGER.debug("Response status: {}", httpClientResponse.status()));
}
+
+ private RetryBackoffSpec retryConfig(RetryConfig retryConfig, RequestDiagnosticContext context) {
+ RetryBackoffSpec retry = Retry
+ .fixedDelay(retryConfig.retryCount(), retryConfig.retryInterval())
+ .doBeforeRetry(retrySignal -> context.withSlf4jMdc(
+ LOGGER.isTraceEnabled(), () -> LOGGER.trace("Retry: {}", retrySignal)))
+ .filter(ex -> isRetryable(retryConfig, ex));
+
+ return Option.of(retryConfig.onRetryExhaustedException())
+ .map(ex -> retry.onRetryExhaustedThrow((retryBackoffSpec, retrySignal) -> ex))
+ .getOrElse(retry);
+ }
+
+ private boolean isRetryable(RetryConfig retryConfig, Throwable ex) {
+ return retryConfig.retryableExceptions()
+ .toStream()
+ .exists(clazz -> clazz.isAssignableFrom(ex.getClass()));
+ }
}