summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--Changelog.md3
-rw-r--r--rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/CbsClientImplTest.java6
-rw-r--r--rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/impl/MessageRouterPublisherImplTest.java2
-rw-r--r--rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/impl/MessageRouterSubscriberImplTest.java5
-rw-r--r--rest-services/http-client/pom.xml7
-rw-r--r--rest-services/http-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/adapters/http/HttpResponse.java9
-rw-r--r--rest-services/http-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/adapters/http/NettyHttpResponse.java25
-rw-r--r--rest-services/http-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/adapters/http/RxHttpClient.java48
-rw-r--r--rest-services/http-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/adapters/http/RxHttpClientFactory.java3
-rw-r--r--rest-services/http-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/adapters/http/exceptions/RetryableException.java4
-rw-r--r--rest-services/http-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/adapters/http/retry/RetryIntervalExtractor.java59
-rw-r--r--rest-services/http-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/adapters/http/retry/RetryLogic.java75
-rw-r--r--rest-services/http-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/adapters/http/retry/RetryLogicFactory.java33
-rw-r--r--rest-services/http-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/adapters/http/retry/DelayExtractorTest.java98
-rw-r--r--rest-services/http-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/adapters/http/retry/RetryLogicTest.java132
15 files changed, 464 insertions, 45 deletions
diff --git a/Changelog.md b/Changelog.md
index 619ed7ba..bfaeb9f5 100644
--- a/Changelog.md
+++ b/Changelog.md
@@ -4,10 +4,11 @@ All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](http://keepachangelog.com/)
and this project adheres to [Semantic Versioning](http://semver.org/).
-## [1.7.0] - 10/02/2021
+## [1.7.0] - 25/02/2021
### Added
- [DCAEGEN2-1483] (https://jira.onap.org/browse/DCAEGEN2-1483) - VESCollector Event ordering
- Add possibility to modify the configuration for persistent connection
+ - Support retry-after header in DCAE-SDK DMaaP-Client
## [1.6.0] ##
- Add configurable timeout in dmaap-client
diff --git a/rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/CbsClientImplTest.java b/rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/CbsClientImplTest.java
index 40cf7100..c9f92717 100644
--- a/rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/CbsClientImplTest.java
+++ b/rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/CbsClientImplTest.java
@@ -2,7 +2,7 @@
* ============LICENSE_START====================================
* DCAEGEN2-SERVICES-SDK
* =========================================================
- * Copyright (C) 2019 Nokia. All rights reserved.
+ * Copyright (C) 2019-2021 Nokia. All rights reserved.
* =========================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -21,6 +21,7 @@
package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl;
import com.google.gson.JsonObject;
+import io.vavr.collection.HashMultimap;
import org.junit.jupiter.api.Test;
import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.HttpMethod;
import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.HttpRequest;
@@ -58,6 +59,7 @@ class CbsClientImplTest {
.url("http://xxx")
.statusCode(200)
.rawBody("{}".getBytes())
+ .headers(HashMultimap.withSeq().empty())
.build();
given(httpClient.call(any(HttpRequest.class))).willReturn(Mono.just(httpResponse));
RequestDiagnosticContext diagnosticContext = RequestDiagnosticContext.create();
@@ -74,4 +76,4 @@ class CbsClientImplTest {
.build());
assertThat(result.toString()).isEqualTo(httpResponse.bodyAsString());
}
-} \ No newline at end of file
+}
diff --git a/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/impl/MessageRouterPublisherImplTest.java b/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/impl/MessageRouterPublisherImplTest.java
index 2fde441d..6c6ded16 100644
--- a/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/impl/MessageRouterPublisherImplTest.java
+++ b/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/impl/MessageRouterPublisherImplTest.java
@@ -28,6 +28,7 @@ import com.google.gson.JsonPrimitive;
import io.netty.buffer.ByteBufAllocator;
import io.netty.handler.codec.http.HttpHeaderValues;
import io.netty.handler.timeout.ReadTimeoutException;
+import io.vavr.collection.HashMultimap;
import io.vavr.collection.List;
import org.junit.jupiter.api.Test;
import org.mockito.ArgumentCaptor;
@@ -545,6 +546,7 @@ class MessageRouterPublisherImplTest {
.url(TOPIC_URL)
.statusReason(statusReason)
.rawBody("[]".getBytes())
+ .headers(HashMultimap.withSeq().empty())
.build();
}
diff --git a/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/impl/MessageRouterSubscriberImplTest.java b/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/impl/MessageRouterSubscriberImplTest.java
index 74b21ad6..006965c2 100644
--- a/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/impl/MessageRouterSubscriberImplTest.java
+++ b/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/impl/MessageRouterSubscriberImplTest.java
@@ -30,6 +30,7 @@ import static org.mockito.Mockito.verify;
import com.google.gson.JsonSyntaxException;
import io.netty.handler.timeout.ReadTimeoutException;
+import io.vavr.collection.HashMultimap;
import org.junit.jupiter.api.Test;
import org.mockito.ArgumentCaptor;
import org.onap.dcaegen2.services.sdk.model.streams.dmaap.ImmutableMessageRouterSource;
@@ -72,24 +73,28 @@ class MessageRouterSubscriberImplTest {
.statusReason("OK")
.url(sourceDefinition.topicUrl())
.rawBody("[]".getBytes())
+ .headers(HashMultimap.withSeq().empty())
.build();
private final HttpResponse retryableHttpResponse = ImmutableHttpResponse.builder()
.statusCode(500)
.statusReason("Something braked")
.url(sourceDefinition.topicUrl())
.rawBody("[]".getBytes())
+ .headers(HashMultimap.withSeq().empty())
.build();
private final HttpResponse httpResponseWithWrongStatusCode = ImmutableHttpResponse.builder()
.statusCode(301)
.statusReason("Something braked")
.url(sourceDefinition.topicUrl())
.rawBody("[]".getBytes())
+ .headers(HashMultimap.withSeq().empty())
.build();
private final HttpResponse httpResponseWithIncorrectJson = ImmutableHttpResponse.builder()
.statusCode(200)
.statusReason("OK")
.url(sourceDefinition.topicUrl())
.rawBody("{}".getBytes())
+ .headers(HashMultimap.withSeq().empty())
.build();
@Test
diff --git a/rest-services/http-client/pom.xml b/rest-services/http-client/pom.xml
index 6b0ab0c1..4948fbdd 100644
--- a/rest-services/http-client/pom.xml
+++ b/rest-services/http-client/pom.xml
@@ -3,7 +3,7 @@
~ ============LICENSE_START====================================
~ DCAEGEN2-SERVICES-SDK
~ =========================================================
- ~ Copyright (C) 2019-2020 Nokia. All rights reserved.
+ ~ Copyright (C) 2019-2021 Nokia. All rights reserved.
~ =========================================================
~ Licensed under the Apache License, Version 2.0 (the "License");
~ you may not use this file except in compliance with the License.
@@ -74,6 +74,11 @@
<scope>test</scope>
</dependency>
<dependency>
+ <groupId>org.junit.jupiter</groupId>
+ <artifactId>junit-jupiter-params</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
<scope>test</scope>
diff --git a/rest-services/http-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/adapters/http/HttpResponse.java b/rest-services/http-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/adapters/http/HttpResponse.java
index ce100478..b6cc7c2d 100644
--- a/rest-services/http-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/adapters/http/HttpResponse.java
+++ b/rest-services/http-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/adapters/http/HttpResponse.java
@@ -2,7 +2,7 @@
* ============LICENSE_START====================================
* DCAEGEN2-SERVICES-SDK
* =========================================================
- * Copyright (C) 2019 Nokia. All rights reserved.
+ * Copyright (C) 2019-2021 Nokia. All rights reserved.
* =========================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -21,10 +21,11 @@
package org.onap.dcaegen2.services.sdk.rest.services.adapters.http;
import com.google.gson.Gson;
-import java.nio.charset.Charset;
-import java.nio.charset.StandardCharsets;
+import io.vavr.collection.Multimap;
import org.immutables.value.Value;
import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.exceptions.HttpException;
+import java.nio.charset.Charset;
+import java.nio.charset.StandardCharsets;
/**
* @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a>
@@ -39,6 +40,8 @@ public interface HttpResponse {
byte[] rawBody();
+ Multimap<String, String> headers();
+
@Value.Default
default String statusReason() {
return "";
diff --git a/rest-services/http-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/adapters/http/NettyHttpResponse.java b/rest-services/http-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/adapters/http/NettyHttpResponse.java
index 3dcd7098..c4c8ac8d 100644
--- a/rest-services/http-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/adapters/http/NettyHttpResponse.java
+++ b/rest-services/http-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/adapters/http/NettyHttpResponse.java
@@ -2,7 +2,7 @@
* ============LICENSE_START====================================
* DCAEGEN2-SERVICES-SDK
* =========================================================
- * Copyright (C) 2019 Nokia. All rights reserved.
+ * Copyright (C) 2019-2021 Nokia. All rights reserved.
* =========================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -20,9 +20,18 @@
package org.onap.dcaegen2.services.sdk.rest.services.adapters.http;
+import io.netty.handler.codec.http.HttpHeaders;
import io.netty.handler.codec.http.HttpResponseStatus;
import io.netty.handler.codec.http.HttpStatusClass;
+import io.vavr.Tuple;
+import io.vavr.Tuple2;
+import io.vavr.collection.HashMultimap;
+import io.vavr.collection.Multimap;
+import reactor.netty.http.client.HttpClientResponse;
+
import java.nio.charset.Charset;
+import java.util.List;
+import java.util.stream.Collectors;
/**
* @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a>
@@ -32,11 +41,13 @@ class NettyHttpResponse implements HttpResponse {
private final String url;
private final HttpResponseStatus status;
+ private final HttpHeaders headers;
private final byte[] body;
- NettyHttpResponse(String url, HttpResponseStatus status, byte[] body) {
+ public NettyHttpResponse(String url, HttpClientResponse response, byte[] body) {
this.url = url;
- this.status = status;
+ this.status = response.status();
+ this.headers = response.responseHeaders();
this.body = body;
}
@@ -66,6 +77,14 @@ class NettyHttpResponse implements HttpResponse {
}
@Override
+ public Multimap<String, String> headers() {
+ List<Tuple2<String, String>> httpHeaders = headers.entries().stream()
+ .map(entry -> Tuple.of(entry.getKey(), entry.getValue()))
+ .collect(Collectors.toList());
+ return HashMultimap.withSeq().ofEntries(httpHeaders);
+ }
+
+ @Override
public String bodyAsString(Charset charset) {
return new String(body, charset);
}
diff --git a/rest-services/http-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/adapters/http/RxHttpClient.java b/rest-services/http-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/adapters/http/RxHttpClient.java
index d25d7469..341aaf56 100644
--- a/rest-services/http-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/adapters/http/RxHttpClient.java
+++ b/rest-services/http-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/adapters/http/RxHttpClient.java
@@ -20,23 +20,21 @@
package org.onap.dcaegen2.services.sdk.rest.services.adapters.http;
-import io.netty.handler.codec.http.HttpResponseStatus;
-import io.vavr.collection.HashSet;
import io.vavr.collection.Stream;
import io.vavr.control.Option;
-import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.config.RetryConfig;
import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.exceptions.RetryableException;
+import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.retry.RetryLogic;
import org.onap.dcaegen2.services.sdk.rest.services.model.logging.RequestDiagnosticContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Mono;
+import reactor.netty.ByteBufMono;
import reactor.netty.http.client.HttpClient;
import reactor.netty.http.client.HttpClient.ResponseReceiver;
import reactor.netty.http.client.HttpClientRequest;
import reactor.netty.http.client.HttpClientResponse;
-import reactor.util.retry.Retry;
-import reactor.util.retry.RetryBackoffSpec;
+import java.util.Objects;
import java.util.stream.Collectors;
/**
@@ -46,21 +44,21 @@ public class RxHttpClient {
private static final Logger LOGGER = LoggerFactory.getLogger(RxHttpClient.class);
private final HttpClient httpClient;
- private RetryConfig retryConfig;
+ private RetryLogic retryLogic;
RxHttpClient(HttpClient httpClient) {
- this.httpClient = httpClient;
+ this.httpClient = Objects.requireNonNull(httpClient, "httpClient must not be null");
}
- RxHttpClient(HttpClient httpClient, RetryConfig retryConfig) {
+ RxHttpClient(HttpClient httpClient, RetryLogic retryLogic) {
this(httpClient);
- this.retryConfig = retryConfig;
+ this.retryLogic = retryLogic;
}
public Mono<HttpResponse> call(HttpRequest request) {
Mono<HttpResponse> httpResponseMono = response(request);
- return Option.of(retryConfig)
- .map(rc -> retryConfig(rc, request.diagnosticContext()))
+ return Option.of(retryLogic)
+ .map(rc -> rc.retry(request.diagnosticContext()))
.map(httpResponseMono::retryWhen)
.getOrElse(() -> httpResponseMono);
}
@@ -80,13 +78,13 @@ public class RxHttpClient {
private Mono<HttpResponse> response(HttpRequest request) {
return prepareRequest(request)
- .responseSingle((resp, content) -> mapResponse(request.url(), resp.status(), content));
+ .responseSingle((resp, content) -> mapResponse(request.url(), resp, content));
}
- private Mono<HttpResponse> mapResponse(String url, HttpResponseStatus status, reactor.netty.ByteBufMono content) {
+ private Mono<HttpResponse> mapResponse(String url, HttpClientResponse response, ByteBufMono content) {
return content.asByteArray()
.defaultIfEmpty(new byte[0])
- .map(bytes -> new NettyHttpResponse(url, status, bytes))
+ .map(bytes -> new NettyHttpResponse(url, response, bytes))
.map(this::validatedResponse);
}
@@ -98,10 +96,9 @@ public class RxHttpClient {
}
private boolean shouldRetry(int code) {
- return Option.of(retryConfig)
- .map(RetryConfig::retryableHttpResponseCodes)
- .getOrElse(HashSet::empty)
- .contains(code);
+ return Option.of(retryLogic)
+ .map(rc -> rc.shouldRetry(code))
+ .getOrElse(Boolean.FALSE);
}
private ResponseReceiver<?> prepareBody(HttpRequest request, HttpClient theClient) {
@@ -153,19 +150,4 @@ public class RxHttpClient {
context.withSlf4jMdc(LOGGER.isDebugEnabled(),
() -> LOGGER.debug("Response status: {}", httpClientResponse.status()));
}
-
- private RetryBackoffSpec retryConfig(RetryConfig retryConfig, RequestDiagnosticContext context) {
- return Retry
- .fixedDelay(retryConfig.retryCount(), retryConfig.retryInterval())
- .doBeforeRetry(retrySignal -> context.withSlf4jMdc(
- LOGGER.isTraceEnabled(), () -> LOGGER.trace("Retry: {}", retrySignal)))
- .filter(ex -> isRetryable(retryConfig, ex))
- .onRetryExhaustedThrow((retryBackoffSpec, retrySignal) -> retrySignal.failure());
- }
-
- private boolean isRetryable(RetryConfig retryConfig, Throwable ex) {
- return retryConfig.retryableExceptions()
- .toStream()
- .exists(clazz -> clazz.isAssignableFrom(ex.getClass()));
- }
}
diff --git a/rest-services/http-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/adapters/http/RxHttpClientFactory.java b/rest-services/http-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/adapters/http/RxHttpClientFactory.java
index 90b8ff16..8634f146 100644
--- a/rest-services/http-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/adapters/http/RxHttpClientFactory.java
+++ b/rest-services/http-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/adapters/http/RxHttpClientFactory.java
@@ -22,6 +22,7 @@ package org.onap.dcaegen2.services.sdk.rest.services.adapters.http;
import io.vavr.control.Option;
import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.config.RxHttpClientConfig;
+import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.retry.RetryLogicFactory;
import org.onap.dcaegen2.services.sdk.security.ssl.SecurityKeys;
import org.onap.dcaegen2.services.sdk.security.ssl.TrustStoreKeys;
import reactor.netty.http.client.HttpClient;
@@ -81,7 +82,7 @@ public final class RxHttpClientFactory {
private static RxHttpClient createWithConfig(HttpClient httpClient, RxHttpClientConfig config) {
return Option.of(config.retryConfig())
- .map(retryConfig -> new RxHttpClient(httpClient, retryConfig))
+ .map(retryConfig -> new RxHttpClient(httpClient, RetryLogicFactory.create(retryConfig)))
.getOrElse(() -> new RxHttpClient(httpClient));
}
}
diff --git a/rest-services/http-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/adapters/http/exceptions/RetryableException.java b/rest-services/http-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/adapters/http/exceptions/RetryableException.java
index aa48497a..6d286a6f 100644
--- a/rest-services/http-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/adapters/http/exceptions/RetryableException.java
+++ b/rest-services/http-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/adapters/http/exceptions/RetryableException.java
@@ -22,12 +22,14 @@ package org.onap.dcaegen2.services.sdk.rest.services.adapters.http.exceptions;
import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.HttpResponse;
+import java.util.Objects;
+
public class RetryableException extends RuntimeException {
private final HttpResponse response;
public RetryableException(HttpResponse response) {
- this.response = response;
+ this.response = Objects.requireNonNull(response, "response must not be null");
}
public HttpResponse getResponse() {
diff --git a/rest-services/http-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/adapters/http/retry/RetryIntervalExtractor.java b/rest-services/http-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/adapters/http/retry/RetryIntervalExtractor.java
new file mode 100644
index 00000000..0e2c875a
--- /dev/null
+++ b/rest-services/http-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/adapters/http/retry/RetryIntervalExtractor.java
@@ -0,0 +1,59 @@
+/*
+ * ============LICENSE_START====================================
+ * DCAEGEN2-SERVICES-SDK
+ * =========================================================
+ * Copyright (C) 2021 Nokia. All rights reserved.
+ * =========================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=====================================
+ */
+
+package org.onap.dcaegen2.services.sdk.rest.services.adapters.http.retry;
+
+import io.netty.handler.codec.http.HttpHeaderNames;
+import io.netty.handler.codec.http.HttpResponseStatus;
+import io.vavr.Tuple;
+import io.vavr.Value;
+import io.vavr.collection.Multimap;
+import io.vavr.control.Option;
+import io.vavr.control.Try;
+import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.HttpResponse;
+
+import java.time.Duration;
+
+class RetryIntervalExtractor {
+
+ private static final String RETRY_AFTER_HEADER = HttpHeaderNames.RETRY_AFTER.toString();
+ private static final int PAYLOAD_TOO_LARGE_HTTP_CODE = HttpResponseStatus.REQUEST_ENTITY_TOO_LARGE.code();
+
+ Option<Duration> extractDelay(HttpResponse response) {
+ return response.statusCode() == PAYLOAD_TOO_LARGE_HTTP_CODE
+ ? extractDelay(response.headers())
+ : Option.none();
+ }
+
+ private Option<Duration> extractDelay(Multimap<String, String> headers) {
+ return headers
+ .map((key, value) -> Tuple.of(key.toLowerCase(), value))
+ .get(RETRY_AFTER_HEADER)
+ .toStream()
+ .flatMap(Value::toStream)
+ .map(this::parse)
+ .find(d -> d >= 0)
+ .map(Duration::ofSeconds);
+ }
+
+ private int parse(String str) {
+ return Try.of(() -> Integer.parseInt(str)).getOrElse(-1);
+ }
+}
diff --git a/rest-services/http-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/adapters/http/retry/RetryLogic.java b/rest-services/http-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/adapters/http/retry/RetryLogic.java
new file mode 100644
index 00000000..fd420843
--- /dev/null
+++ b/rest-services/http-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/adapters/http/retry/RetryLogic.java
@@ -0,0 +1,75 @@
+/*
+ * ============LICENSE_START====================================
+ * DCAEGEN2-SERVICES-SDK
+ * =========================================================
+ * Copyright (C) 2021 Nokia. All rights reserved.
+ * =========================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=====================================
+ */
+
+package org.onap.dcaegen2.services.sdk.rest.services.adapters.http.retry;
+
+import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.config.RetryConfig;
+import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.exceptions.RetryableException;
+import org.onap.dcaegen2.services.sdk.rest.services.model.logging.RequestDiagnosticContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import reactor.core.publisher.Mono;
+import reactor.util.retry.Retry;
+
+import java.time.Duration;
+import java.util.Objects;
+
+public class RetryLogic {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(RetryLogic.class);
+
+ private final RetryConfig retryConfig;
+ private final RetryIntervalExtractor delayExtractor;
+
+ public RetryLogic(RetryConfig retryConfig, RetryIntervalExtractor delayExtractor) {
+ this.retryConfig = Objects.requireNonNull(retryConfig, "retryConfig must not be null");
+ this.delayExtractor = Objects.requireNonNull(delayExtractor, "delayExtractor must not be null");
+ }
+
+ public Retry retry(RequestDiagnosticContext requestDiagnosticContext) {
+ return Retry
+ .max(retryConfig.retryCount())
+ .doAfterRetryAsync(rc -> Mono.delay(calculateDelay(rc.failure())).then())
+ .doBeforeRetry(retrySignal -> requestDiagnosticContext.withSlf4jMdc(
+ LOGGER.isTraceEnabled(), () -> LOGGER.trace("Retry: {}", retrySignal)))
+ .filter(ex -> isRetryable(retryConfig, ex))
+ .onRetryExhaustedThrow((retryBackoffSpec, retrySignal) -> retrySignal.failure());
+ }
+
+ public boolean shouldRetry(int code) {
+ return retryConfig.retryableHttpResponseCodes().contains(code);
+ }
+
+ private Duration calculateDelay(Throwable tx) {
+ Duration retryInterval = retryConfig.retryInterval();
+ if (tx instanceof RetryableException) {
+ RetryableException ex = (RetryableException) tx;
+ retryInterval = delayExtractor.extractDelay(ex.getResponse())
+ .getOrElse(retryInterval);
+ }
+ return retryInterval;
+ }
+
+ private boolean isRetryable(RetryConfig retryConfig, Throwable ex) {
+ return retryConfig.retryableExceptions()
+ .toStream()
+ .exists(clazz -> clazz.isAssignableFrom(ex.getClass()));
+ }
+}
diff --git a/rest-services/http-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/adapters/http/retry/RetryLogicFactory.java b/rest-services/http-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/adapters/http/retry/RetryLogicFactory.java
new file mode 100644
index 00000000..51acbe12
--- /dev/null
+++ b/rest-services/http-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/adapters/http/retry/RetryLogicFactory.java
@@ -0,0 +1,33 @@
+/*
+ * ============LICENSE_START====================================
+ * DCAEGEN2-SERVICES-SDK
+ * =========================================================
+ * Copyright (C) 2021 Nokia. All rights reserved.
+ * =========================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=====================================
+ */
+
+package org.onap.dcaegen2.services.sdk.rest.services.adapters.http.retry;
+
+import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.config.RetryConfig;
+
+public final class RetryLogicFactory {
+
+ private RetryLogicFactory() {
+ }
+
+ public static RetryLogic create(RetryConfig config) {
+ return new RetryLogic(config, new RetryIntervalExtractor());
+ }
+}
diff --git a/rest-services/http-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/adapters/http/retry/DelayExtractorTest.java b/rest-services/http-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/adapters/http/retry/DelayExtractorTest.java
new file mode 100644
index 00000000..d5759b26
--- /dev/null
+++ b/rest-services/http-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/adapters/http/retry/DelayExtractorTest.java
@@ -0,0 +1,98 @@
+/*
+ * ============LICENSE_START====================================
+ * DCAEGEN2-SERVICES-SDK
+ * =========================================================
+ * Copyright (C) 2021 Nokia. All rights reserved.
+ * =========================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=====================================
+ */
+
+package org.onap.dcaegen2.services.sdk.rest.services.adapters.http.retry;
+
+import io.vavr.Tuple;
+import io.vavr.collection.HashMultimap;
+import io.vavr.control.Option;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.CsvSource;
+import org.junit.jupiter.params.provider.ValueSource;
+import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.HttpResponse;
+import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.ImmutableHttpResponse;
+
+import java.time.Duration;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+class DelayExtractorTest {
+
+ private static final HttpResponse DEFAULT = ImmutableHttpResponse.builder()
+ .url("")
+ .statusCode(0)
+ .rawBody("".getBytes())
+ .headers(HashMultimap.withSeq().empty())
+ .build();
+
+ private static final RetryIntervalExtractor DELAY_EXTRACTOR = new RetryIntervalExtractor();
+
+ @Test
+ void shouldExtractValueFromFirstValidHeaderWhenStatusCode413() {
+ // given
+ HttpResponse response = ImmutableHttpResponse.copyOf(DEFAULT)
+ .withStatusCode(413)
+ .withHeaders(HashMultimap.withSeq().ofEntries(
+ Tuple.of("Any", "12"),
+ Tuple.of("Retry-After", "15"),
+ Tuple.of("Retry-After", "100")
+ ));
+
+ // when
+ Option<Duration> delay = DELAY_EXTRACTOR.extractDelay(response);
+
+ // then
+ assertThat(delay.get()).isEqualTo(Duration.ofSeconds(15));
+ }
+
+ @ParameterizedTest
+ @ValueSource(ints = {100, 200, 300, 400, 500})
+ void shouldExtractNoValueWhenStatusCodeDifferentThan413(int statusCode) {
+ // given
+ HttpResponse response = ImmutableHttpResponse.copyOf(DEFAULT)
+ .withStatusCode(statusCode);
+
+ // when
+ Option<Duration> delay = DELAY_EXTRACTOR.extractDelay(response);
+
+ // then
+ assertThat(delay).isEqualTo(Option.none());
+ }
+
+ @ParameterizedTest
+ @CsvSource({
+ "Retry-After,",
+ "Retry-After,invalid",
+ "Retry-After,999999999999",
+ "Any,12"})
+ void shouldExtractNoValueWhenStatusCode413AndNoValidHeader(String key, String value) {
+ // given
+ HttpResponse response = ImmutableHttpResponse.copyOf(DEFAULT)
+ .withStatusCode(413)
+ .withHeaders(HashMultimap.withSeq().ofEntries(Tuple.of(key, value)));
+
+ // when
+ Option<Duration> delay = DELAY_EXTRACTOR.extractDelay(response);
+
+ // then
+ assertThat(delay).isEqualTo(Option.none());
+ }
+}
diff --git a/rest-services/http-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/adapters/http/retry/RetryLogicTest.java b/rest-services/http-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/adapters/http/retry/RetryLogicTest.java
new file mode 100644
index 00000000..8319d3ae
--- /dev/null
+++ b/rest-services/http-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/adapters/http/retry/RetryLogicTest.java
@@ -0,0 +1,132 @@
+/*
+ * ============LICENSE_START====================================
+ * DCAEGEN2-SERVICES-SDK
+ * =========================================================
+ * Copyright (C) 2021 Nokia. All rights reserved.
+ * =========================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=====================================
+ */
+
+package org.onap.dcaegen2.services.sdk.rest.services.adapters.http.retry;
+
+import io.netty.handler.timeout.ReadTimeoutException;
+import io.vavr.collection.HashMultimap;
+import io.vavr.collection.HashSet;
+import io.vavr.control.Option;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.HttpResponse;
+import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.ImmutableHttpResponse;
+import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.config.ImmutableRetryConfig;
+import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.config.RetryConfig;
+import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.exceptions.RetryableException;
+import org.onap.dcaegen2.services.sdk.rest.services.model.logging.RequestDiagnosticContext;
+import reactor.core.publisher.Mono;
+import reactor.test.StepVerifier;
+
+import java.net.ConnectException;
+import java.time.Duration;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+class RetryLogicTest {
+
+ private static final HashSet<Integer> RETRYABLE_HTTP_RESPONSE_CODES =
+ HashSet.of(404, 408, 413, 429, 500, 502, 503, 504);
+ private static final HashSet<Class<? extends Throwable>> RETRYABLE_EXCEPTIONS =
+ HashSet.of(ReadTimeoutException.class, ConnectException.class);
+ private static final Duration RETRY_INTERVAL = Duration.ofSeconds(5);
+ private static final int RETRY_COUNT = 3;
+ private static final Duration RETRY_EXHAUSTED = Duration.ofSeconds(RETRY_COUNT * RETRY_INTERVAL.getSeconds());
+ private static final RetryConfig RETRY_CONFIG = ImmutableRetryConfig.builder()
+ .retryCount(RETRY_COUNT)
+ .retryInterval(RETRY_INTERVAL)
+ .retryableHttpResponseCodes(RETRYABLE_HTTP_RESPONSE_CODES)
+ .customRetryableExceptions(RETRYABLE_EXCEPTIONS)
+ .build();
+
+ private final RequestDiagnosticContext dummyContext = mock(RequestDiagnosticContext.class);
+ private final RetryIntervalExtractor retryIntervalExtractor = mock(RetryIntervalExtractor.class);
+ private RetryLogic retryLogic;
+
+ @BeforeEach
+ void setUp() {
+ retryLogic = new RetryLogic(RETRY_CONFIG, retryIntervalExtractor);
+ }
+
+ @Test
+ void shouldRetryWhenRetryableException() {
+ // when
+ Mono<?> mono = Mono
+ .error(ReadTimeoutException.INSTANCE)
+ .retryWhen(retryLogic.retry(dummyContext));
+
+ // then
+ StepVerifier.withVirtualTime(() -> mono)
+ .expectSubscription()
+ .expectNoEvent(RETRY_EXHAUSTED)
+ .expectError(ReadTimeoutException.class)
+ .verify();
+ }
+
+ @Test
+ void shouldNotRetryWhenUnretryableException() {
+ // when
+ Mono<?> mono = Mono
+ .error(RuntimeException::new)
+ .retryWhen(retryLogic.retry(dummyContext));
+
+ // then
+ StepVerifier.withVirtualTime(() -> mono)
+ .expectSubscription()
+ .expectError(RuntimeException.class)
+ .verify();
+ }
+
+ @Test
+ void shouldUseRetryIntervalFromExtractorWhenRetryableStatusCode() {
+ // given
+ HttpResponse httpResponse = httpResponse413();
+ Duration retryInterval = Duration.ofSeconds(10);
+ when(retryIntervalExtractor.extractDelay(httpResponse))
+ .thenReturn(Option.of(retryInterval));
+
+ // when
+ Mono<?> mono = Mono
+ .error(() -> new RetryableException(httpResponse))
+ .retryWhen(retryLogic.retry(dummyContext));
+
+ // then
+ long noEvents = RETRY_COUNT * retryInterval.getSeconds();
+ StepVerifier.withVirtualTime(() -> mono)
+ .expectSubscription()
+ .expectNoEvent(Duration.ofSeconds(noEvents))
+ .expectError(RetryableException.class)
+ .verify();
+ verify(retryIntervalExtractor, times(RETRY_COUNT)).extractDelay(httpResponse);
+ }
+
+ private ImmutableHttpResponse httpResponse413() {
+ return ImmutableHttpResponse.builder()
+ .url("")
+ .statusCode(413)
+ .rawBody("".getBytes())
+ .headers(HashMultimap.withSeq().empty())
+ .build();
+ }
+
+}