diff options
15 files changed, 464 insertions, 45 deletions
diff --git a/Changelog.md b/Changelog.md index 619ed7ba..bfaeb9f5 100644 --- a/Changelog.md +++ b/Changelog.md @@ -4,10 +4,11 @@ All notable changes to this project will be documented in this file. The format is based on [Keep a Changelog](http://keepachangelog.com/) and this project adheres to [Semantic Versioning](http://semver.org/). -## [1.7.0] - 10/02/2021 +## [1.7.0] - 25/02/2021 ### Added - [DCAEGEN2-1483] (https://jira.onap.org/browse/DCAEGEN2-1483) - VESCollector Event ordering - Add possibility to modify the configuration for persistent connection + - Support retry-after header in DCAE-SDK DMaaP-Client ## [1.6.0] ## - Add configurable timeout in dmaap-client diff --git a/rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/CbsClientImplTest.java b/rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/CbsClientImplTest.java index 40cf7100..c9f92717 100644 --- a/rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/CbsClientImplTest.java +++ b/rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/CbsClientImplTest.java @@ -2,7 +2,7 @@ * ============LICENSE_START==================================== * DCAEGEN2-SERVICES-SDK * ========================================================= - * Copyright (C) 2019 Nokia. All rights reserved. + * Copyright (C) 2019-2021 Nokia. All rights reserved. * ========================================================= * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -21,6 +21,7 @@ package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl; import com.google.gson.JsonObject; +import io.vavr.collection.HashMultimap; import org.junit.jupiter.api.Test; import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.HttpMethod; import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.HttpRequest; @@ -58,6 +59,7 @@ class CbsClientImplTest { .url("http://xxx") .statusCode(200) .rawBody("{}".getBytes()) + .headers(HashMultimap.withSeq().empty()) .build(); given(httpClient.call(any(HttpRequest.class))).willReturn(Mono.just(httpResponse)); RequestDiagnosticContext diagnosticContext = RequestDiagnosticContext.create(); @@ -74,4 +76,4 @@ class CbsClientImplTest { .build()); assertThat(result.toString()).isEqualTo(httpResponse.bodyAsString()); } -}
\ No newline at end of file +} diff --git a/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/impl/MessageRouterPublisherImplTest.java b/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/impl/MessageRouterPublisherImplTest.java index 2fde441d..6c6ded16 100644 --- a/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/impl/MessageRouterPublisherImplTest.java +++ b/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/impl/MessageRouterPublisherImplTest.java @@ -28,6 +28,7 @@ import com.google.gson.JsonPrimitive; import io.netty.buffer.ByteBufAllocator; import io.netty.handler.codec.http.HttpHeaderValues; import io.netty.handler.timeout.ReadTimeoutException; +import io.vavr.collection.HashMultimap; import io.vavr.collection.List; import org.junit.jupiter.api.Test; import org.mockito.ArgumentCaptor; @@ -545,6 +546,7 @@ class MessageRouterPublisherImplTest { .url(TOPIC_URL) .statusReason(statusReason) .rawBody("[]".getBytes()) + .headers(HashMultimap.withSeq().empty()) .build(); } diff --git a/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/impl/MessageRouterSubscriberImplTest.java b/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/impl/MessageRouterSubscriberImplTest.java index 74b21ad6..006965c2 100644 --- a/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/impl/MessageRouterSubscriberImplTest.java +++ b/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/impl/MessageRouterSubscriberImplTest.java @@ -30,6 +30,7 @@ import static org.mockito.Mockito.verify; import com.google.gson.JsonSyntaxException; import io.netty.handler.timeout.ReadTimeoutException; +import io.vavr.collection.HashMultimap; import org.junit.jupiter.api.Test; import org.mockito.ArgumentCaptor; import org.onap.dcaegen2.services.sdk.model.streams.dmaap.ImmutableMessageRouterSource; @@ -72,24 +73,28 @@ class MessageRouterSubscriberImplTest { .statusReason("OK") .url(sourceDefinition.topicUrl()) .rawBody("[]".getBytes()) + .headers(HashMultimap.withSeq().empty()) .build(); private final HttpResponse retryableHttpResponse = ImmutableHttpResponse.builder() .statusCode(500) .statusReason("Something braked") .url(sourceDefinition.topicUrl()) .rawBody("[]".getBytes()) + .headers(HashMultimap.withSeq().empty()) .build(); private final HttpResponse httpResponseWithWrongStatusCode = ImmutableHttpResponse.builder() .statusCode(301) .statusReason("Something braked") .url(sourceDefinition.topicUrl()) .rawBody("[]".getBytes()) + .headers(HashMultimap.withSeq().empty()) .build(); private final HttpResponse httpResponseWithIncorrectJson = ImmutableHttpResponse.builder() .statusCode(200) .statusReason("OK") .url(sourceDefinition.topicUrl()) .rawBody("{}".getBytes()) + .headers(HashMultimap.withSeq().empty()) .build(); @Test diff --git a/rest-services/http-client/pom.xml b/rest-services/http-client/pom.xml index 6b0ab0c1..4948fbdd 100644 --- a/rest-services/http-client/pom.xml +++ b/rest-services/http-client/pom.xml @@ -3,7 +3,7 @@ ~ ============LICENSE_START==================================== ~ DCAEGEN2-SERVICES-SDK ~ ========================================================= - ~ Copyright (C) 2019-2020 Nokia. All rights reserved. + ~ Copyright (C) 2019-2021 Nokia. All rights reserved. ~ ========================================================= ~ Licensed under the Apache License, Version 2.0 (the "License"); ~ you may not use this file except in compliance with the License. @@ -74,6 +74,11 @@ <scope>test</scope> </dependency> <dependency> + <groupId>org.junit.jupiter</groupId> + <artifactId>junit-jupiter-params</artifactId> + <scope>test</scope> + </dependency> + <dependency> <groupId>org.mockito</groupId> <artifactId>mockito-core</artifactId> <scope>test</scope> diff --git a/rest-services/http-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/adapters/http/HttpResponse.java b/rest-services/http-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/adapters/http/HttpResponse.java index ce100478..b6cc7c2d 100644 --- a/rest-services/http-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/adapters/http/HttpResponse.java +++ b/rest-services/http-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/adapters/http/HttpResponse.java @@ -2,7 +2,7 @@ * ============LICENSE_START==================================== * DCAEGEN2-SERVICES-SDK * ========================================================= - * Copyright (C) 2019 Nokia. All rights reserved. + * Copyright (C) 2019-2021 Nokia. All rights reserved. * ========================================================= * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -21,10 +21,11 @@ package org.onap.dcaegen2.services.sdk.rest.services.adapters.http; import com.google.gson.Gson; -import java.nio.charset.Charset; -import java.nio.charset.StandardCharsets; +import io.vavr.collection.Multimap; import org.immutables.value.Value; import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.exceptions.HttpException; +import java.nio.charset.Charset; +import java.nio.charset.StandardCharsets; /** * @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a> @@ -39,6 +40,8 @@ public interface HttpResponse { byte[] rawBody(); + Multimap<String, String> headers(); + @Value.Default default String statusReason() { return ""; diff --git a/rest-services/http-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/adapters/http/NettyHttpResponse.java b/rest-services/http-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/adapters/http/NettyHttpResponse.java index 3dcd7098..c4c8ac8d 100644 --- a/rest-services/http-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/adapters/http/NettyHttpResponse.java +++ b/rest-services/http-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/adapters/http/NettyHttpResponse.java @@ -2,7 +2,7 @@ * ============LICENSE_START==================================== * DCAEGEN2-SERVICES-SDK * ========================================================= - * Copyright (C) 2019 Nokia. All rights reserved. + * Copyright (C) 2019-2021 Nokia. All rights reserved. * ========================================================= * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -20,9 +20,18 @@ package org.onap.dcaegen2.services.sdk.rest.services.adapters.http; +import io.netty.handler.codec.http.HttpHeaders; import io.netty.handler.codec.http.HttpResponseStatus; import io.netty.handler.codec.http.HttpStatusClass; +import io.vavr.Tuple; +import io.vavr.Tuple2; +import io.vavr.collection.HashMultimap; +import io.vavr.collection.Multimap; +import reactor.netty.http.client.HttpClientResponse; + import java.nio.charset.Charset; +import java.util.List; +import java.util.stream.Collectors; /** * @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a> @@ -32,11 +41,13 @@ class NettyHttpResponse implements HttpResponse { private final String url; private final HttpResponseStatus status; + private final HttpHeaders headers; private final byte[] body; - NettyHttpResponse(String url, HttpResponseStatus status, byte[] body) { + public NettyHttpResponse(String url, HttpClientResponse response, byte[] body) { this.url = url; - this.status = status; + this.status = response.status(); + this.headers = response.responseHeaders(); this.body = body; } @@ -66,6 +77,14 @@ class NettyHttpResponse implements HttpResponse { } @Override + public Multimap<String, String> headers() { + List<Tuple2<String, String>> httpHeaders = headers.entries().stream() + .map(entry -> Tuple.of(entry.getKey(), entry.getValue())) + .collect(Collectors.toList()); + return HashMultimap.withSeq().ofEntries(httpHeaders); + } + + @Override public String bodyAsString(Charset charset) { return new String(body, charset); } diff --git a/rest-services/http-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/adapters/http/RxHttpClient.java b/rest-services/http-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/adapters/http/RxHttpClient.java index d25d7469..341aaf56 100644 --- a/rest-services/http-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/adapters/http/RxHttpClient.java +++ b/rest-services/http-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/adapters/http/RxHttpClient.java @@ -20,23 +20,21 @@ package org.onap.dcaegen2.services.sdk.rest.services.adapters.http; -import io.netty.handler.codec.http.HttpResponseStatus; -import io.vavr.collection.HashSet; import io.vavr.collection.Stream; import io.vavr.control.Option; -import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.config.RetryConfig; import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.exceptions.RetryableException; +import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.retry.RetryLogic; import org.onap.dcaegen2.services.sdk.rest.services.model.logging.RequestDiagnosticContext; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import reactor.core.publisher.Mono; +import reactor.netty.ByteBufMono; import reactor.netty.http.client.HttpClient; import reactor.netty.http.client.HttpClient.ResponseReceiver; import reactor.netty.http.client.HttpClientRequest; import reactor.netty.http.client.HttpClientResponse; -import reactor.util.retry.Retry; -import reactor.util.retry.RetryBackoffSpec; +import java.util.Objects; import java.util.stream.Collectors; /** @@ -46,21 +44,21 @@ public class RxHttpClient { private static final Logger LOGGER = LoggerFactory.getLogger(RxHttpClient.class); private final HttpClient httpClient; - private RetryConfig retryConfig; + private RetryLogic retryLogic; RxHttpClient(HttpClient httpClient) { - this.httpClient = httpClient; + this.httpClient = Objects.requireNonNull(httpClient, "httpClient must not be null"); } - RxHttpClient(HttpClient httpClient, RetryConfig retryConfig) { + RxHttpClient(HttpClient httpClient, RetryLogic retryLogic) { this(httpClient); - this.retryConfig = retryConfig; + this.retryLogic = retryLogic; } public Mono<HttpResponse> call(HttpRequest request) { Mono<HttpResponse> httpResponseMono = response(request); - return Option.of(retryConfig) - .map(rc -> retryConfig(rc, request.diagnosticContext())) + return Option.of(retryLogic) + .map(rc -> rc.retry(request.diagnosticContext())) .map(httpResponseMono::retryWhen) .getOrElse(() -> httpResponseMono); } @@ -80,13 +78,13 @@ public class RxHttpClient { private Mono<HttpResponse> response(HttpRequest request) { return prepareRequest(request) - .responseSingle((resp, content) -> mapResponse(request.url(), resp.status(), content)); + .responseSingle((resp, content) -> mapResponse(request.url(), resp, content)); } - private Mono<HttpResponse> mapResponse(String url, HttpResponseStatus status, reactor.netty.ByteBufMono content) { + private Mono<HttpResponse> mapResponse(String url, HttpClientResponse response, ByteBufMono content) { return content.asByteArray() .defaultIfEmpty(new byte[0]) - .map(bytes -> new NettyHttpResponse(url, status, bytes)) + .map(bytes -> new NettyHttpResponse(url, response, bytes)) .map(this::validatedResponse); } @@ -98,10 +96,9 @@ public class RxHttpClient { } private boolean shouldRetry(int code) { - return Option.of(retryConfig) - .map(RetryConfig::retryableHttpResponseCodes) - .getOrElse(HashSet::empty) - .contains(code); + return Option.of(retryLogic) + .map(rc -> rc.shouldRetry(code)) + .getOrElse(Boolean.FALSE); } private ResponseReceiver<?> prepareBody(HttpRequest request, HttpClient theClient) { @@ -153,19 +150,4 @@ public class RxHttpClient { context.withSlf4jMdc(LOGGER.isDebugEnabled(), () -> LOGGER.debug("Response status: {}", httpClientResponse.status())); } - - private RetryBackoffSpec retryConfig(RetryConfig retryConfig, RequestDiagnosticContext context) { - return Retry - .fixedDelay(retryConfig.retryCount(), retryConfig.retryInterval()) - .doBeforeRetry(retrySignal -> context.withSlf4jMdc( - LOGGER.isTraceEnabled(), () -> LOGGER.trace("Retry: {}", retrySignal))) - .filter(ex -> isRetryable(retryConfig, ex)) - .onRetryExhaustedThrow((retryBackoffSpec, retrySignal) -> retrySignal.failure()); - } - - private boolean isRetryable(RetryConfig retryConfig, Throwable ex) { - return retryConfig.retryableExceptions() - .toStream() - .exists(clazz -> clazz.isAssignableFrom(ex.getClass())); - } } diff --git a/rest-services/http-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/adapters/http/RxHttpClientFactory.java b/rest-services/http-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/adapters/http/RxHttpClientFactory.java index 90b8ff16..8634f146 100644 --- a/rest-services/http-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/adapters/http/RxHttpClientFactory.java +++ b/rest-services/http-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/adapters/http/RxHttpClientFactory.java @@ -22,6 +22,7 @@ package org.onap.dcaegen2.services.sdk.rest.services.adapters.http; import io.vavr.control.Option; import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.config.RxHttpClientConfig; +import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.retry.RetryLogicFactory; import org.onap.dcaegen2.services.sdk.security.ssl.SecurityKeys; import org.onap.dcaegen2.services.sdk.security.ssl.TrustStoreKeys; import reactor.netty.http.client.HttpClient; @@ -81,7 +82,7 @@ public final class RxHttpClientFactory { private static RxHttpClient createWithConfig(HttpClient httpClient, RxHttpClientConfig config) { return Option.of(config.retryConfig()) - .map(retryConfig -> new RxHttpClient(httpClient, retryConfig)) + .map(retryConfig -> new RxHttpClient(httpClient, RetryLogicFactory.create(retryConfig))) .getOrElse(() -> new RxHttpClient(httpClient)); } } diff --git a/rest-services/http-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/adapters/http/exceptions/RetryableException.java b/rest-services/http-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/adapters/http/exceptions/RetryableException.java index aa48497a..6d286a6f 100644 --- a/rest-services/http-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/adapters/http/exceptions/RetryableException.java +++ b/rest-services/http-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/adapters/http/exceptions/RetryableException.java @@ -22,12 +22,14 @@ package org.onap.dcaegen2.services.sdk.rest.services.adapters.http.exceptions; import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.HttpResponse; +import java.util.Objects; + public class RetryableException extends RuntimeException { private final HttpResponse response; public RetryableException(HttpResponse response) { - this.response = response; + this.response = Objects.requireNonNull(response, "response must not be null"); } public HttpResponse getResponse() { diff --git a/rest-services/http-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/adapters/http/retry/RetryIntervalExtractor.java b/rest-services/http-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/adapters/http/retry/RetryIntervalExtractor.java new file mode 100644 index 00000000..0e2c875a --- /dev/null +++ b/rest-services/http-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/adapters/http/retry/RetryIntervalExtractor.java @@ -0,0 +1,59 @@ +/* + * ============LICENSE_START==================================== + * DCAEGEN2-SERVICES-SDK + * ========================================================= + * Copyright (C) 2021 Nokia. All rights reserved. + * ========================================================= + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END===================================== + */ + +package org.onap.dcaegen2.services.sdk.rest.services.adapters.http.retry; + +import io.netty.handler.codec.http.HttpHeaderNames; +import io.netty.handler.codec.http.HttpResponseStatus; +import io.vavr.Tuple; +import io.vavr.Value; +import io.vavr.collection.Multimap; +import io.vavr.control.Option; +import io.vavr.control.Try; +import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.HttpResponse; + +import java.time.Duration; + +class RetryIntervalExtractor { + + private static final String RETRY_AFTER_HEADER = HttpHeaderNames.RETRY_AFTER.toString(); + private static final int PAYLOAD_TOO_LARGE_HTTP_CODE = HttpResponseStatus.REQUEST_ENTITY_TOO_LARGE.code(); + + Option<Duration> extractDelay(HttpResponse response) { + return response.statusCode() == PAYLOAD_TOO_LARGE_HTTP_CODE + ? extractDelay(response.headers()) + : Option.none(); + } + + private Option<Duration> extractDelay(Multimap<String, String> headers) { + return headers + .map((key, value) -> Tuple.of(key.toLowerCase(), value)) + .get(RETRY_AFTER_HEADER) + .toStream() + .flatMap(Value::toStream) + .map(this::parse) + .find(d -> d >= 0) + .map(Duration::ofSeconds); + } + + private int parse(String str) { + return Try.of(() -> Integer.parseInt(str)).getOrElse(-1); + } +} diff --git a/rest-services/http-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/adapters/http/retry/RetryLogic.java b/rest-services/http-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/adapters/http/retry/RetryLogic.java new file mode 100644 index 00000000..fd420843 --- /dev/null +++ b/rest-services/http-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/adapters/http/retry/RetryLogic.java @@ -0,0 +1,75 @@ +/* + * ============LICENSE_START==================================== + * DCAEGEN2-SERVICES-SDK + * ========================================================= + * Copyright (C) 2021 Nokia. All rights reserved. + * ========================================================= + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END===================================== + */ + +package org.onap.dcaegen2.services.sdk.rest.services.adapters.http.retry; + +import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.config.RetryConfig; +import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.exceptions.RetryableException; +import org.onap.dcaegen2.services.sdk.rest.services.model.logging.RequestDiagnosticContext; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import reactor.core.publisher.Mono; +import reactor.util.retry.Retry; + +import java.time.Duration; +import java.util.Objects; + +public class RetryLogic { + + private static final Logger LOGGER = LoggerFactory.getLogger(RetryLogic.class); + + private final RetryConfig retryConfig; + private final RetryIntervalExtractor delayExtractor; + + public RetryLogic(RetryConfig retryConfig, RetryIntervalExtractor delayExtractor) { + this.retryConfig = Objects.requireNonNull(retryConfig, "retryConfig must not be null"); + this.delayExtractor = Objects.requireNonNull(delayExtractor, "delayExtractor must not be null"); + } + + public Retry retry(RequestDiagnosticContext requestDiagnosticContext) { + return Retry + .max(retryConfig.retryCount()) + .doAfterRetryAsync(rc -> Mono.delay(calculateDelay(rc.failure())).then()) + .doBeforeRetry(retrySignal -> requestDiagnosticContext.withSlf4jMdc( + LOGGER.isTraceEnabled(), () -> LOGGER.trace("Retry: {}", retrySignal))) + .filter(ex -> isRetryable(retryConfig, ex)) + .onRetryExhaustedThrow((retryBackoffSpec, retrySignal) -> retrySignal.failure()); + } + + public boolean shouldRetry(int code) { + return retryConfig.retryableHttpResponseCodes().contains(code); + } + + private Duration calculateDelay(Throwable tx) { + Duration retryInterval = retryConfig.retryInterval(); + if (tx instanceof RetryableException) { + RetryableException ex = (RetryableException) tx; + retryInterval = delayExtractor.extractDelay(ex.getResponse()) + .getOrElse(retryInterval); + } + return retryInterval; + } + + private boolean isRetryable(RetryConfig retryConfig, Throwable ex) { + return retryConfig.retryableExceptions() + .toStream() + .exists(clazz -> clazz.isAssignableFrom(ex.getClass())); + } +} diff --git a/rest-services/http-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/adapters/http/retry/RetryLogicFactory.java b/rest-services/http-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/adapters/http/retry/RetryLogicFactory.java new file mode 100644 index 00000000..51acbe12 --- /dev/null +++ b/rest-services/http-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/adapters/http/retry/RetryLogicFactory.java @@ -0,0 +1,33 @@ +/* + * ============LICENSE_START==================================== + * DCAEGEN2-SERVICES-SDK + * ========================================================= + * Copyright (C) 2021 Nokia. All rights reserved. + * ========================================================= + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END===================================== + */ + +package org.onap.dcaegen2.services.sdk.rest.services.adapters.http.retry; + +import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.config.RetryConfig; + +public final class RetryLogicFactory { + + private RetryLogicFactory() { + } + + public static RetryLogic create(RetryConfig config) { + return new RetryLogic(config, new RetryIntervalExtractor()); + } +} diff --git a/rest-services/http-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/adapters/http/retry/DelayExtractorTest.java b/rest-services/http-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/adapters/http/retry/DelayExtractorTest.java new file mode 100644 index 00000000..d5759b26 --- /dev/null +++ b/rest-services/http-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/adapters/http/retry/DelayExtractorTest.java @@ -0,0 +1,98 @@ +/* + * ============LICENSE_START==================================== + * DCAEGEN2-SERVICES-SDK + * ========================================================= + * Copyright (C) 2021 Nokia. All rights reserved. + * ========================================================= + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END===================================== + */ + +package org.onap.dcaegen2.services.sdk.rest.services.adapters.http.retry; + +import io.vavr.Tuple; +import io.vavr.collection.HashMultimap; +import io.vavr.control.Option; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.CsvSource; +import org.junit.jupiter.params.provider.ValueSource; +import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.HttpResponse; +import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.ImmutableHttpResponse; + +import java.time.Duration; + +import static org.assertj.core.api.Assertions.assertThat; + +class DelayExtractorTest { + + private static final HttpResponse DEFAULT = ImmutableHttpResponse.builder() + .url("") + .statusCode(0) + .rawBody("".getBytes()) + .headers(HashMultimap.withSeq().empty()) + .build(); + + private static final RetryIntervalExtractor DELAY_EXTRACTOR = new RetryIntervalExtractor(); + + @Test + void shouldExtractValueFromFirstValidHeaderWhenStatusCode413() { + // given + HttpResponse response = ImmutableHttpResponse.copyOf(DEFAULT) + .withStatusCode(413) + .withHeaders(HashMultimap.withSeq().ofEntries( + Tuple.of("Any", "12"), + Tuple.of("Retry-After", "15"), + Tuple.of("Retry-After", "100") + )); + + // when + Option<Duration> delay = DELAY_EXTRACTOR.extractDelay(response); + + // then + assertThat(delay.get()).isEqualTo(Duration.ofSeconds(15)); + } + + @ParameterizedTest + @ValueSource(ints = {100, 200, 300, 400, 500}) + void shouldExtractNoValueWhenStatusCodeDifferentThan413(int statusCode) { + // given + HttpResponse response = ImmutableHttpResponse.copyOf(DEFAULT) + .withStatusCode(statusCode); + + // when + Option<Duration> delay = DELAY_EXTRACTOR.extractDelay(response); + + // then + assertThat(delay).isEqualTo(Option.none()); + } + + @ParameterizedTest + @CsvSource({ + "Retry-After,", + "Retry-After,invalid", + "Retry-After,999999999999", + "Any,12"}) + void shouldExtractNoValueWhenStatusCode413AndNoValidHeader(String key, String value) { + // given + HttpResponse response = ImmutableHttpResponse.copyOf(DEFAULT) + .withStatusCode(413) + .withHeaders(HashMultimap.withSeq().ofEntries(Tuple.of(key, value))); + + // when + Option<Duration> delay = DELAY_EXTRACTOR.extractDelay(response); + + // then + assertThat(delay).isEqualTo(Option.none()); + } +} diff --git a/rest-services/http-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/adapters/http/retry/RetryLogicTest.java b/rest-services/http-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/adapters/http/retry/RetryLogicTest.java new file mode 100644 index 00000000..8319d3ae --- /dev/null +++ b/rest-services/http-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/adapters/http/retry/RetryLogicTest.java @@ -0,0 +1,132 @@ +/* + * ============LICENSE_START==================================== + * DCAEGEN2-SERVICES-SDK + * ========================================================= + * Copyright (C) 2021 Nokia. All rights reserved. + * ========================================================= + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END===================================== + */ + +package org.onap.dcaegen2.services.sdk.rest.services.adapters.http.retry; + +import io.netty.handler.timeout.ReadTimeoutException; +import io.vavr.collection.HashMultimap; +import io.vavr.collection.HashSet; +import io.vavr.control.Option; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.HttpResponse; +import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.ImmutableHttpResponse; +import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.config.ImmutableRetryConfig; +import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.config.RetryConfig; +import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.exceptions.RetryableException; +import org.onap.dcaegen2.services.sdk.rest.services.model.logging.RequestDiagnosticContext; +import reactor.core.publisher.Mono; +import reactor.test.StepVerifier; + +import java.net.ConnectException; +import java.time.Duration; + +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +class RetryLogicTest { + + private static final HashSet<Integer> RETRYABLE_HTTP_RESPONSE_CODES = + HashSet.of(404, 408, 413, 429, 500, 502, 503, 504); + private static final HashSet<Class<? extends Throwable>> RETRYABLE_EXCEPTIONS = + HashSet.of(ReadTimeoutException.class, ConnectException.class); + private static final Duration RETRY_INTERVAL = Duration.ofSeconds(5); + private static final int RETRY_COUNT = 3; + private static final Duration RETRY_EXHAUSTED = Duration.ofSeconds(RETRY_COUNT * RETRY_INTERVAL.getSeconds()); + private static final RetryConfig RETRY_CONFIG = ImmutableRetryConfig.builder() + .retryCount(RETRY_COUNT) + .retryInterval(RETRY_INTERVAL) + .retryableHttpResponseCodes(RETRYABLE_HTTP_RESPONSE_CODES) + .customRetryableExceptions(RETRYABLE_EXCEPTIONS) + .build(); + + private final RequestDiagnosticContext dummyContext = mock(RequestDiagnosticContext.class); + private final RetryIntervalExtractor retryIntervalExtractor = mock(RetryIntervalExtractor.class); + private RetryLogic retryLogic; + + @BeforeEach + void setUp() { + retryLogic = new RetryLogic(RETRY_CONFIG, retryIntervalExtractor); + } + + @Test + void shouldRetryWhenRetryableException() { + // when + Mono<?> mono = Mono + .error(ReadTimeoutException.INSTANCE) + .retryWhen(retryLogic.retry(dummyContext)); + + // then + StepVerifier.withVirtualTime(() -> mono) + .expectSubscription() + .expectNoEvent(RETRY_EXHAUSTED) + .expectError(ReadTimeoutException.class) + .verify(); + } + + @Test + void shouldNotRetryWhenUnretryableException() { + // when + Mono<?> mono = Mono + .error(RuntimeException::new) + .retryWhen(retryLogic.retry(dummyContext)); + + // then + StepVerifier.withVirtualTime(() -> mono) + .expectSubscription() + .expectError(RuntimeException.class) + .verify(); + } + + @Test + void shouldUseRetryIntervalFromExtractorWhenRetryableStatusCode() { + // given + HttpResponse httpResponse = httpResponse413(); + Duration retryInterval = Duration.ofSeconds(10); + when(retryIntervalExtractor.extractDelay(httpResponse)) + .thenReturn(Option.of(retryInterval)); + + // when + Mono<?> mono = Mono + .error(() -> new RetryableException(httpResponse)) + .retryWhen(retryLogic.retry(dummyContext)); + + // then + long noEvents = RETRY_COUNT * retryInterval.getSeconds(); + StepVerifier.withVirtualTime(() -> mono) + .expectSubscription() + .expectNoEvent(Duration.ofSeconds(noEvents)) + .expectError(RetryableException.class) + .verify(); + verify(retryIntervalExtractor, times(RETRY_COUNT)).extractDelay(httpResponse); + } + + private ImmutableHttpResponse httpResponse413() { + return ImmutableHttpResponse.builder() + .url("") + .statusCode(413) + .rawBody("".getBytes()) + .headers(HashMultimap.withSeq().empty()) + .build(); + } + +} |