diff options
author | tkogut <tomasz.kogut@nokia.com> | 2021-01-19 09:00:56 +0100 |
---|---|---|
committer | tkogut <tomasz.kogut@nokia.com> | 2021-01-20 12:20:55 +0100 |
commit | 9b309b5e3905cb25d5d661c4428cc9d4ad0402a6 (patch) | |
tree | 58c9e881f694fde8347762b6c237de9423f33f23 /rest-services/dmaap-client | |
parent | 286637d4a801ab6e933684500509eab308d2e3a6 (diff) |
Support retry in DCAE-SDK DMaaP-Client
Issue-ID: DCAEGEN2-1483
Signed-off-by: tkogut <tomasz.kogut@nokia.com>
Change-Id: Id3f98c0a9367f7c7c2c53ed3eba8805a5a6ab87e
Diffstat (limited to 'rest-services/dmaap-client')
23 files changed, 596 insertions, 299 deletions
diff --git a/rest-services/dmaap-client/pom.xml b/rest-services/dmaap-client/pom.xml index d619590f..b8620311 100644 --- a/rest-services/dmaap-client/pom.xml +++ b/rest-services/dmaap-client/pom.xml @@ -56,6 +56,10 @@ <artifactId>junit-jupiter-engine</artifactId> </dependency> <dependency> + <groupId>org.junit.jupiter</groupId> + <artifactId>junit-jupiter-params</artifactId> + </dependency> + <dependency> <groupId>io.projectreactor</groupId> <artifactId>reactor-test</artifactId> <scope>test</scope> @@ -76,11 +80,10 @@ <groupId>org.testcontainers</groupId> <artifactId>junit-jupiter</artifactId> </dependency> - <dependency> - <groupId>eu.rekawek.toxiproxy</groupId> - <artifactId>toxiproxy-java</artifactId> - <version>${toxiproxy-java.version}</version> - <scope>test</scope> - </dependency> + <dependency> + <groupId>org.mock-server</groupId> + <artifactId>mockserver-client-java</artifactId> + <version>${mockserver-client.version}</version> + </dependency> </dependencies> </project> diff --git a/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/DmaapClientFactory.java b/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/DmaapClientFactory.java index 3c27da10..9d255559 100644 --- a/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/DmaapClientFactory.java +++ b/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/DmaapClientFactory.java @@ -2,7 +2,7 @@ * ============LICENSE_START==================================== * DCAEGEN2-SERVICES-SDK * ========================================================= - * Copyright (C) 2019 Nokia. All rights reserved. + * Copyright (C) 2019-2021 Nokia. All rights reserved. * ========================================================= * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -19,17 +19,28 @@ */ package org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api; +import io.vavr.control.Option; import org.jetbrains.annotations.NotNull; import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.RxHttpClient; import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.RxHttpClientFactory; +import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.config.ImmutableRetryConfig; +import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.config.ImmutableRxHttpClientConfig; +import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.config.RetryConfig; +import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.config.RxHttpClientConfig; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.error.ClientErrorReasonPresenter; import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.impl.MessageRouterPublisherImpl; import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.impl.MessageRouterSubscriberImpl; import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.DmaapClientConfiguration; import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.MessageRouterPublisherConfig; import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.MessageRouterSubscriberConfig; +import java.time.Duration; + +import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.DmaapRetryConfig.ON_RETRY_EXHAUSTED_EXCEPTION; +import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.DmaapRetryConfig.RETRYABLE_EXCEPTIONS; +import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.DmaapRetryConfig.RETRYABLE_HTTP_CODES; + /** - * * @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a> * @since 1.1.4 */ @@ -44,19 +55,37 @@ public final class DmaapClientFactory { return new MessageRouterPublisherImpl( createHttpClient(clientConfiguration), clientConfiguration.maxBatchSize(), - clientConfiguration.maxBatchDuration()); + clientConfiguration.maxBatchDuration(), + new ClientErrorReasonPresenter()); } public static @NotNull MessageRouterSubscriber createMessageRouterSubscriber( @NotNull MessageRouterSubscriberConfig clientConfiguration) { return new MessageRouterSubscriberImpl( createHttpClient(clientConfiguration), - clientConfiguration.gsonInstance()); + clientConfiguration.gsonInstance(), + new ClientErrorReasonPresenter()); } private static @NotNull RxHttpClient createHttpClient(DmaapClientConfiguration config) { + RxHttpClientConfig clientConfig = ImmutableRxHttpClientConfig.builder() + .retryConfig(createRetry(config)) + .build(); return config.securityKeys() == null - ? RxHttpClientFactory.create() - : RxHttpClientFactory.create(config.securityKeys()); + ? RxHttpClientFactory.create(clientConfig) + : RxHttpClientFactory.create(config.securityKeys(), clientConfig); + } + + private static RetryConfig createRetry(DmaapClientConfiguration config) { + return Option.of(config.retryConfig()) + .map(rc -> ImmutableRetryConfig.builder() + .retryInterval(Duration.ofSeconds(rc.retryIntervalInSeconds())) + .retryCount(rc.retryCount()) + .retryableHttpResponseCodes(RETRYABLE_HTTP_CODES) + .customRetryableExceptions(RETRYABLE_EXCEPTIONS) + .onRetryExhaustedException(ON_RETRY_EXHAUSTED_EXCEPTION) + .build()) + .getOrNull(); } } + diff --git a/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/error/ClientErrorReasonPresenter.java b/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/error/ClientErrorReasonPresenter.java index 6b22b378..1eaae78e 100644 --- a/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/error/ClientErrorReasonPresenter.java +++ b/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/error/ClientErrorReasonPresenter.java @@ -2,7 +2,7 @@ * ============LICENSE_START==================================== * DCAEGEN2-SERVICES-SDK * ========================================================= - * Copyright (C) 2020 Nokia. All rights reserved. + * Copyright (C) 2020-2021 Nokia. All rights reserved. * ========================================================= * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -23,15 +23,17 @@ package org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.error; import com.google.gson.Gson; import com.google.gson.GsonBuilder; import io.vavr.control.Option; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.error.model.ClientError; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.error.model.ImmutableClientError; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.error.model.ImmutableRequestError; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.error.model.ImmutableServiceException; public class ClientErrorReasonPresenter { - private ClientErrorReasonPresenter() { } - private static final Gson GSON = new GsonBuilder().create(); private static final String PATTERN = "%s\n%s"; - public static String present(ClientErrorReason clientErrorReason) { + public String present(ClientErrorReason clientErrorReason) { ImmutableServiceException simpleServiceException = ImmutableServiceException.builder() .messageId(clientErrorReason.messageId()) .text(clientErrorReason.text()) diff --git a/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/error/ClientError.java b/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/error/model/ClientError.java index 57187c80..d0cb35da 100644 --- a/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/error/ClientError.java +++ b/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/error/model/ClientError.java @@ -2,7 +2,7 @@ * ============LICENSE_START==================================== * DCAEGEN2-SERVICES-SDK * ========================================================= - * Copyright (C) 2020 Nokia. All rights reserved. + * Copyright (C) 2020-2021 Nokia. All rights reserved. * ========================================================= * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -18,7 +18,7 @@ * ============LICENSE_END===================================== */ -package org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.error; +package org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.error.model; import org.immutables.gson.Gson; import org.immutables.value.Value; diff --git a/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/error/RequestError.java b/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/error/model/RequestError.java index 71e673fe..79b9a299 100644 --- a/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/error/RequestError.java +++ b/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/error/model/RequestError.java @@ -2,7 +2,7 @@ * ============LICENSE_START==================================== * DCAEGEN2-SERVICES-SDK * ========================================================= - * Copyright (C) 2020 Nokia. All rights reserved. + * Copyright (C) 2020-2021 Nokia. All rights reserved. * ========================================================= * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -18,7 +18,7 @@ * ============LICENSE_END===================================== */ -package org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.error; +package org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.error.model; import org.immutables.gson.Gson; import org.immutables.value.Value; diff --git a/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/error/ServiceException.java b/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/error/model/ServiceException.java index e99330ac..a39fbc0b 100644 --- a/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/error/ServiceException.java +++ b/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/error/model/ServiceException.java @@ -2,7 +2,7 @@ * ============LICENSE_START==================================== * DCAEGEN2-SERVICES-SDK * ========================================================= - * Copyright (C) 2020 Nokia. All rights reserved. + * Copyright (C) 2020-2021 Nokia. All rights reserved. * ========================================================= * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -18,7 +18,7 @@ * ============LICENSE_END===================================== */ -package org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.error; +package org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.error.model; import org.immutables.gson.Gson; import org.immutables.value.Value; diff --git a/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/impl/MessageRouterPublisherImpl.java b/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/impl/MessageRouterPublisherImpl.java index 16068da0..7d1b0a93 100644 --- a/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/impl/MessageRouterPublisherImpl.java +++ b/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/impl/MessageRouterPublisherImpl.java @@ -2,7 +2,7 @@ * ============LICENSE_START==================================== * DCAEGEN2-SERVICES-SDK * ========================================================= - * Copyright (C) 2019-2020 Nokia. All rights reserved. + * Copyright (C) 2019-2021 Nokia. All rights reserved. * ========================================================= * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -61,12 +61,15 @@ public class MessageRouterPublisherImpl implements MessageRouterPublisher { private final RxHttpClient httpClient; private final int maxBatchSize; private final Duration maxBatchDuration; + private final ClientErrorReasonPresenter clientErrorReasonPresenter; + private static final Logger LOGGER = LoggerFactory.getLogger(MessageRouterPublisherImpl.class); - public MessageRouterPublisherImpl(RxHttpClient httpClient, int maxBatchSize, Duration maxBatchDuration) { + public MessageRouterPublisherImpl(RxHttpClient httpClient, int maxBatchSize, Duration maxBatchDuration, ClientErrorReasonPresenter clientErrorReasonPresenter) { this.httpClient = httpClient; this.maxBatchSize = maxBatchSize; this.maxBatchDuration = maxBatchDuration; + this.clientErrorReasonPresenter = clientErrorReasonPresenter; } @Override @@ -124,7 +127,7 @@ public class MessageRouterPublisherImpl implements MessageRouterPublisher { } private Mono<MessageRouterPublishResponse> createErrorResponse(ClientErrorReason clientErrorReason) { - String failReason = ClientErrorReasonPresenter.present(clientErrorReason); + String failReason = clientErrorReasonPresenter.present(clientErrorReason); return Mono.just(ImmutableMessageRouterPublishResponse.builder() .failReason(failReason) .build()); diff --git a/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/impl/MessageRouterSubscriberImpl.java b/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/impl/MessageRouterSubscriberImpl.java index f7ccf4f2..292a7157 100644 --- a/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/impl/MessageRouterSubscriberImpl.java +++ b/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/impl/MessageRouterSubscriberImpl.java @@ -20,16 +20,12 @@ package org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.impl; -import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.impl.Commons.extractFailReason; - import com.google.gson.Gson; import com.google.gson.JsonArray; import com.google.gson.JsonElement; import com.google.gson.JsonParser; import io.netty.handler.timeout.ReadTimeoutException; import io.vavr.collection.List; -import java.nio.charset.StandardCharsets; - import io.vavr.control.Option; import org.jetbrains.annotations.NotNull; import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.HttpMethod; @@ -48,6 +44,10 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import reactor.core.publisher.Mono; +import java.nio.charset.StandardCharsets; + +import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.impl.Commons.extractFailReason; + /** * @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a> * @since March 2019 @@ -55,11 +55,14 @@ import reactor.core.publisher.Mono; public class MessageRouterSubscriberImpl implements MessageRouterSubscriber { private final RxHttpClient httpClient; private final Gson gson; + private final ClientErrorReasonPresenter clientErrorReasonPresenter; private static final Logger LOGGER = LoggerFactory.getLogger(MessageRouterSubscriberImpl.class); - public MessageRouterSubscriberImpl(RxHttpClient httpClient, Gson gson) { + public MessageRouterSubscriberImpl(RxHttpClient httpClient, Gson gson, + ClientErrorReasonPresenter clientErrorReasonPresenter) { this.httpClient = httpClient; this.gson = gson; + this.clientErrorReasonPresenter = clientErrorReasonPresenter; } @Override @@ -67,7 +70,8 @@ public class MessageRouterSubscriberImpl implements MessageRouterSubscriber { LOGGER.debug("Requesting new items from DMaaP MR: {}", request); return httpClient.call(buildGetHttpRequest(request)) .map(this::buildGetResponse) - .doOnError(ReadTimeoutException.class, e -> LOGGER.error("Timeout exception occurred when subscribe items from DMaaP MR", e)) + .doOnError(ReadTimeoutException.class, + e -> LOGGER.error("Timeout exception occurred when subscribe items from DMaaP MR", e)) .onErrorResume(ReadTimeoutException.class, e -> createErrorResponse(ClientErrorReasons.TIMEOUT)); } @@ -91,7 +95,7 @@ public class MessageRouterSubscriberImpl implements MessageRouterSubscriber { : builder.failReason(extractFailReason(httpResponse)).build(); } - private List<JsonElement> getAsJsonElements(HttpResponse httpResponse){ + private List<JsonElement> getAsJsonElements(HttpResponse httpResponse) { JsonArray bodyAsJsonArray = httpResponse .bodyAsJson(StandardCharsets.UTF_8, gson, JsonArray.class); @@ -104,7 +108,7 @@ public class MessageRouterSubscriberImpl implements MessageRouterSubscriber { } private Mono<MessageRouterSubscribeResponse> createErrorResponse(ClientErrorReason clientErrorReason) { - String failReason = ClientErrorReasonPresenter.present(clientErrorReason); + String failReason = clientErrorReasonPresenter.present(clientErrorReason); return Mono.just(ImmutableMessageRouterSubscribeResponse.builder() .failReason(failReason) .build()); diff --git a/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/model/DmaapRequest.java b/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/model/DmaapRequest.java index 95c5e7d1..a5a87fbd 100644 --- a/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/model/DmaapRequest.java +++ b/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/model/DmaapRequest.java @@ -22,7 +22,7 @@ package org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model; import org.immutables.value.Value; import org.jetbrains.annotations.Nullable; -import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.TimeoutConfig; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.DmaapTimeoutConfig; import org.onap.dcaegen2.services.sdk.rest.services.model.logging.RequestDiagnosticContext; /** @@ -35,5 +35,5 @@ public interface DmaapRequest { return RequestDiagnosticContext.create(); } - @Nullable TimeoutConfig timeoutConfig(); + @Nullable DmaapTimeoutConfig timeoutConfig(); } diff --git a/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/model/config/DmaapClientConfiguration.java b/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/model/config/DmaapClientConfiguration.java index ac677f02..3e283511 100644 --- a/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/model/config/DmaapClientConfiguration.java +++ b/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/model/config/DmaapClientConfiguration.java @@ -2,7 +2,7 @@ * ============LICENSE_START==================================== * DCAEGEN2-SERVICES-SDK * ========================================================= - * Copyright (C) 2019 Nokia. All rights reserved. + * Copyright (C) 2019-2021 Nokia. All rights reserved. * ========================================================= * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -32,5 +32,8 @@ public interface DmaapClientConfiguration { default @Nullable SecurityKeys securityKeys() { return null; } - + @Value.Default + default @Nullable DmaapRetryConfig retryConfig(){ + return null; + } } diff --git a/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/model/config/DmaapRetryConfig.java b/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/model/config/DmaapRetryConfig.java new file mode 100644 index 00000000..f82edfc9 --- /dev/null +++ b/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/model/config/DmaapRetryConfig.java @@ -0,0 +1,64 @@ +/* + * ============LICENSE_START==================================== + * DCAEGEN2-SERVICES-SDK + * ========================================================= + * Copyright (C) 2021 Nokia. All rights reserved. + * ========================================================= + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END===================================== + */ + +package org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config; + +import io.netty.handler.timeout.ReadTimeoutException; +import io.vavr.collection.HashSet; +import io.vavr.collection.Set; +import org.immutables.value.Value; + +import java.net.ConnectException; + +@Value.Immutable +public interface DmaapRetryConfig { + + Set<Class<? extends Throwable>> RETRYABLE_EXCEPTIONS = HashSet.of(ReadTimeoutException.class, ConnectException.class); + RuntimeException ON_RETRY_EXHAUSTED_EXCEPTION = ReadTimeoutException.INSTANCE; + Set<Integer> RETRYABLE_HTTP_CODES = HashSet.of(404, 408, 413, 429, 500, 502, 503, 504); + + @Value.Default + default int retryCount() { + return 3; + } + + @Value.Default + default int retryIntervalInSeconds() { + return 1; + } + + @Value.Check + default void validate() { + validateRetryCount(); + validateRetryInterval(); + } + + private void validateRetryCount() { + int rc = retryCount(); + if (rc < 0) + throw new IllegalArgumentException(String.format("Invalid value: %d, retryCount should be (0-n)", rc)); + } + + private void validateRetryInterval() { + long ri = retryIntervalInSeconds(); + if (ri < 1) + throw new IllegalArgumentException(String.format("Invalid value: %d, retryInterval should be (1-n)", ri)); + } +} diff --git a/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/model/config/TimeoutConfig.java b/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/model/config/DmaapTimeoutConfig.java index 413bf8e5..0ece899b 100644 --- a/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/model/config/TimeoutConfig.java +++ b/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/model/config/DmaapTimeoutConfig.java @@ -2,7 +2,7 @@ * ============LICENSE_START==================================== * DCAEGEN2-SERVICES-SDK * ========================================================= - * Copyright (C) 2020 Nokia. All rights reserved. + * Copyright (C) 2021 Nokia. All rights reserved. * ========================================================= * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -25,7 +25,7 @@ import org.immutables.value.Value; import java.time.Duration; @Value.Immutable -public interface TimeoutConfig { +public interface DmaapTimeoutConfig { @Value.Default default Duration getTimeout() { diff --git a/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/MessageRouterTestsUtils.java b/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/MessageRouterTestsUtils.java index 2b8027c1..1a315806 100644 --- a/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/MessageRouterTestsUtils.java +++ b/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/MessageRouterTestsUtils.java @@ -38,7 +38,7 @@ import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRo import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishResponse; import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterSubscribeRequest; import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterSubscribeResponse; -import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.ImmutableTimeoutConfig; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.ImmutableDmaapTimeoutConfig; import reactor.core.publisher.Flux; import java.time.Duration; @@ -56,7 +56,7 @@ public final class MessageRouterTestsUtils { return ImmutableMessageRouterPublishRequest.builder() .sinkDefinition(createMessageRouterSink(topicUrl)) .contentType(ContentType.APPLICATION_JSON) - .timeoutConfig(ImmutableTimeoutConfig.builder() + .timeoutConfig(ImmutableDmaapTimeoutConfig.builder() .timeout(timeout) .build()) .build(); @@ -86,7 +86,7 @@ public final class MessageRouterTestsUtils { return ImmutableMessageRouterSubscribeRequest .builder() - .timeoutConfig(ImmutableTimeoutConfig.builder() + .timeoutConfig(ImmutableDmaapTimeoutConfig.builder() .timeout(timeout) .build()) .sourceDefinition(getImmutableMessageRouterSource(topicUrl)) diff --git a/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/DMaapContainer.java b/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/DMaapContainer.java index 494ca62a..5b1984df 100644 --- a/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/DMaapContainer.java +++ b/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/DMaapContainer.java @@ -2,7 +2,7 @@ * ============LICENSE_START==================================== * DCAEGEN2-SERVICES-SDK * ========================================================= - * Copyright (C) 2019 Nokia. All rights reserved. + * Copyright (C) 2019-2021 Nokia. All rights reserved. * ========================================================= * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -27,11 +27,10 @@ import java.net.URL; final class DMaapContainer { private static final String MR_COMPOSE_RESOURCE_NAME = "dmaap-msg-router/message-router-compose.yml"; - private static final String DOCKER_COMPOSE_FILE_PATH = getDockerComposeFilePath( - MR_COMPOSE_RESOURCE_NAME); + private static final String DOCKER_COMPOSE_FILE_PATH = getDockerComposeFilePath(MR_COMPOSE_RESOURCE_NAME); static final int DMAAP_SERVICE_EXPOSED_PORT = 3904; static final String DMAAP_SERVICE_NAME = "dmaap"; - static final int PROXY_SERVICE_EXPOSED_PORT = 8666; + static final int PROXY_MOCK_SERVICE_EXPOSED_PORT = 1080; static final String LOCALHOST = "localhost"; private DMaapContainer() {} @@ -43,11 +42,11 @@ final class DMaapContainer { .withLocalCompose(true); } - private static String getDockerComposeFilePath(String resourceName){ + private static String getDockerComposeFilePath(String resourceName) { URL resource = DMaapContainer.class.getClassLoader() .getResource(resourceName); - if(resource != null) return resource.getFile(); + if (resource != null) return resource.getFile(); else throw new DockerComposeNotFoundException(String .format("File %s does not exist", resourceName)); } diff --git a/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterPublisherIT.java b/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterPublisherIT.java index 24cd2c34..f6ef94b7 100644 --- a/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterPublisherIT.java +++ b/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterPublisherIT.java @@ -2,7 +2,7 @@ * ============LICENSE_START==================================== * DCAEGEN2-SERVICES-SDK * ========================================================= - * Copyright (C) 2019-2020 Nokia. All rights reserved. + * Copyright (C) 2019-2021 Nokia. All rights reserved. * ========================================================= * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -22,16 +22,21 @@ package org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api; import com.google.gson.JsonElement; import com.google.gson.JsonObject; -import eu.rekawek.toxiproxy.Proxy; -import eu.rekawek.toxiproxy.ToxiproxyClient; import io.vavr.collection.List; import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import org.mockserver.client.MockServerClient; +import org.mockserver.matchers.TimeToLive; +import org.mockserver.matchers.Times; +import org.mockserver.verify.VerificationTimes; import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.ContentType; import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishRequest; import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishResponse; import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterSubscribeRequest; import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterSubscribeResponse; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.ImmutableDmaapRetryConfig; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.ImmutableMessageRouterPublisherConfig; import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.MessageRouterPublisherConfig; import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.MessageRouterSubscriberConfig; import org.testcontainers.containers.DockerComposeContainer; @@ -41,11 +46,11 @@ import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.test.StepVerifier; -import java.io.IOException; import java.time.Duration; import java.util.concurrent.TimeUnit; -import static eu.rekawek.toxiproxy.model.ToxicDirection.DOWNSTREAM; +import static org.mockserver.model.HttpRequest.request; +import static org.mockserver.model.HttpResponse.response; import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.MessageRouterTestsUtils.createMRSubscribeRequest; import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.MessageRouterTestsUtils.createPublishRequest; import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.MessageRouterTestsUtils.errorPublishResponse; @@ -56,14 +61,19 @@ import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.MessageR import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.MessageRouterTestsUtils.successPublishResponse; import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.MessageRouterTestsUtils.successSubscribeResponse; import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.DMaapContainer.DMAAP_SERVICE_EXPOSED_PORT; -import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.DMaapContainer.DMAAP_SERVICE_NAME; import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.DMaapContainer.LOCALHOST; -import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.DMaapContainer.PROXY_SERVICE_EXPOSED_PORT; +import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.DMaapContainer.PROXY_MOCK_SERVICE_EXPOSED_PORT; +import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.DMaapContainer.createContainerInstance; @Testcontainers class MessageRouterPublisherIT { @Container - private static final DockerComposeContainer CONTAINER = DMaapContainer.createContainerInstance(); + private static final DockerComposeContainer CONTAINER = createContainerInstance(); + private static final MockServerClient MOCK_SERVER_CLIENT = new MockServerClient( + LOCALHOST, PROXY_MOCK_SERVICE_EXPOSED_PORT); + private static String EVENTS_PATH; + private static String PROXY_MOCK_EVENTS_PATH; + private static final Duration TIMEOUT = Duration.ofSeconds(10); private static final String DMAAP_400_ERROR_RESPONSE_FORMAT = "400 Bad Request\n" + "{" @@ -85,22 +95,21 @@ class MessageRouterPublisherIT { + "}" + "}" + "}"; - private static Proxy DMAAP_PROXY; - private static String EVENTS_PATH; - private static String PROXY_EVENTS_PATH; + private final MessageRouterPublisher publisher = DmaapClientFactory .createMessageRouterPublisher(MessageRouterPublisherConfig.createDefault()); private final MessageRouterSubscriber subscriber = DmaapClientFactory .createMessageRouterSubscriber(MessageRouterSubscriberConfig.createDefault()); @BeforeAll - static void setUp() throws IOException { + static void setUp() { EVENTS_PATH = String.format("http://%s:%d/events", LOCALHOST, DMAAP_SERVICE_EXPOSED_PORT); - PROXY_EVENTS_PATH = String.format("http://%s:%d/events", LOCALHOST, PROXY_SERVICE_EXPOSED_PORT); + PROXY_MOCK_EVENTS_PATH = String.format("http://%s:%d/events", LOCALHOST, PROXY_MOCK_SERVICE_EXPOSED_PORT); + } - DMAAP_PROXY = new ToxiproxyClient().createProxy("dmaapProxy", - String.format("[::]:%s", PROXY_SERVICE_EXPOSED_PORT), - String.format("%s:%d", DMAAP_SERVICE_NAME, DMAAP_SERVICE_EXPOSED_PORT)); + @BeforeEach + void set() { + MOCK_SERVER_CLIENT.reset(); } @Test @@ -302,17 +311,18 @@ class MessageRouterPublisherIT { } @Test - void publisher_shouldHandleClientTimeoutErrorWhenTimeoutDefined() throws IOException { + void publisher_shouldHandleClientTimeoutErrorWhenTimeoutDefined() { //given - final String toxic = "latency-toxic"; - DMAAP_PROXY.toxics() - .latency(toxic, DOWNSTREAM, TimeUnit.SECONDS.toMillis(5)); final String topic = "TOPIC10"; final List<String> singleJsonMessage = List.of("{\"message\":\"message1\"}"); final Flux<JsonObject> messageBatch = jsonBatch(singleJsonMessage); final MessageRouterPublishRequest mrRequest = createPublishRequest( - String.format("%s/%s", PROXY_EVENTS_PATH, topic), Duration.ofSeconds(1)); + String.format("%s/%s", PROXY_MOCK_EVENTS_PATH, topic), Duration.ofSeconds(1)); final MessageRouterPublishResponse expectedResponse = errorPublishResponse(TIMEOUT_ERROR_MESSAGE); + final String path = String.format("/events/%s", topic); + MOCK_SERVER_CLIENT + .when(request().withPath(path), Times.once()) + .respond(response().withDelay(TimeUnit.SECONDS, 2)); //when final Flux<MessageRouterPublishResponse> result = publisher.put(mrRequest, messageBatch); @@ -323,7 +333,75 @@ class MessageRouterPublisherIT { .expectComplete() .verify(TIMEOUT); - //cleanup - DMAAP_PROXY.toxics().get(toxic).remove(); + MOCK_SERVER_CLIENT.verify(request().withPath(path), VerificationTimes.exactly(1)); + } + + @Test + void publisher_shouldRetryWhenRetryableHttpCodeAndSuccessfullyPublish() { + final String topic = "TOPIC11"; + final String topicUrl = String.format("%s/%s", PROXY_MOCK_EVENTS_PATH, topic); + + final List<String> singleJsonMessage = List.of("{\"message\":\"message1\"}"); + final List<JsonElement> expectedItems = getAsJsonElements(singleJsonMessage); + final Flux<JsonElement> plainBatch = plainBatch(singleJsonMessage); + + final MessageRouterPublishRequest publishRequest = createPublishRequest(topicUrl); + final MessageRouterPublishResponse expectedResponse = successPublishResponse(expectedItems); + + final String path = String.format("/events/%s", topic); + MOCK_SERVER_CLIENT + .when(request().withPath(path), Times.once()) + .respond(response().withStatusCode(404)); + final MessageRouterPublisher publisher = DmaapClientFactory.createMessageRouterPublisher(retryConfig()); + + //when + final Flux<MessageRouterPublishResponse> result = publisher.put(publishRequest, plainBatch); + + //then + StepVerifier.create(result) + .expectNext(expectedResponse) + .expectComplete() + .verify(); + + MOCK_SERVER_CLIENT.verify(request().withPath(path), VerificationTimes.exactly(2)); + } + + @Test + void publisher_shouldRetryWhenClientTimeoutAndSuccessfullyPublish() { + final String topic = "TOPIC12"; + final String topicUrl = String.format("%s/%s", PROXY_MOCK_EVENTS_PATH, topic); + + final List<String> singleJsonMessage = List.of("{\"message\":\"message1\"}"); + final List<JsonElement> expectedItems = getAsJsonElements(singleJsonMessage); + final Flux<JsonElement> plainBatch = plainBatch(singleJsonMessage); + + final MessageRouterPublishRequest publishRequest = createPublishRequest(topicUrl, Duration.ofSeconds(1)); + final MessageRouterPublishResponse expectedResponse = successPublishResponse(expectedItems); + + final String path = String.format("/events/%s", topic); + MOCK_SERVER_CLIENT + .when(request().withPath(path), Times.once()) + .respond(response().withDelay(TimeUnit.SECONDS, 10)); + final MessageRouterPublisher publisher = DmaapClientFactory.createMessageRouterPublisher(retryConfig()); + + //when + final Flux<MessageRouterPublishResponse> result = publisher.put(publishRequest, plainBatch); + + //then + StepVerifier.create(result) + .expectNext(expectedResponse) + .expectComplete() + .verify(); + + MOCK_SERVER_CLIENT.verify(request().withPath(path), VerificationTimes.exactly(2)); + } + + private MessageRouterPublisherConfig retryConfig() { + return ImmutableMessageRouterPublisherConfig.builder() + .retryConfig(ImmutableDmaapRetryConfig.builder() + .retryIntervalInSeconds(1) + .retryCount(1) + .build()) + .build(); } } diff --git a/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterPublisherTest.java b/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterPublisherTest.java index b0a07eda..82b6661c 100644 --- a/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterPublisherTest.java +++ b/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterPublisherTest.java @@ -2,7 +2,7 @@ * ============LICENSE_START==================================== * DCAEGEN2-SERVICES-SDK * ========================================================= - * Copyright (C) 2019-2020 Nokia. All rights reserved. + * Copyright (C) 2019-2021 Nokia. All rights reserved. * ========================================================= * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -25,14 +25,18 @@ import com.google.gson.JsonPrimitive; import io.vavr.collection.List; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.CsvSource; import org.onap.dcaegen2.services.sdk.model.streams.dmaap.ImmutableMessageRouterSink; import org.onap.dcaegen2.services.sdk.model.streams.dmaap.MessageRouterSink; import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.test.DummyHttpServer; import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.ContentType; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.DmaapResponse; import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.ImmutableMessageRouterPublishRequest; import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.ImmutableMessageRouterPublishResponse; import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishRequest; import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishResponse; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.ImmutableDmaapTimeoutConfig; import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.MessageRouterPublisherConfig; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; @@ -40,8 +44,10 @@ import reactor.test.StepVerifier; import java.time.Duration; +import static org.assertj.core.api.Assertions.assertThat; import static org.onap.dcaegen2.services.sdk.rest.services.adapters.http.test.DummyHttpServer.sendError; import static org.onap.dcaegen2.services.sdk.rest.services.adapters.http.test.DummyHttpServer.sendString; +import static org.onap.dcaegen2.services.sdk.rest.services.adapters.http.test.DummyHttpServer.sendWithDelay; /** * @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a> @@ -50,12 +56,14 @@ import static org.onap.dcaegen2.services.sdk.rest.services.adapters.http.test.Du class MessageRouterPublisherTest { private static final String ERROR_MESSAGE = "Something went wrong"; + private static final String TIMEOUT_ERROR_MESSAGE = "408 Request Timeout"; private static final String SUCCESS_RESP_TOPIC_PATH = "/events/TOPIC"; + private static final String DELAY_RESP_TOPIC_PATH = "/events/DELAY"; private static final String FAILING_WITH_400_RESP_PATH = "/events/TOPIC400"; private static final String FAILING_WITH_401_RESP_PATH = "/events/TOPIC401"; private static final String FAILING_WITH_403_RESP_PATH = "/events/TOPIC403"; private static final String FAILING_WITH_404_RESP_PATH = "/events/TOPIC404"; - private static final String FAILING_WITH_500_TOPIC_PATH = "/events/TOPIC500"; + private static final String FAILING_WITH_500_RESP_PATH = "/events/TOPIC500"; private static final Duration TIMEOUT = Duration.ofSeconds(10); private static final Flux<JsonPrimitive> messageBatch = Flux.just("ala", "ma", "kota") .map(JsonPrimitive::new); @@ -69,29 +77,22 @@ class MessageRouterPublisherTest { @BeforeAll static void setUp() { server = DummyHttpServer.start(routes -> routes - .post(SUCCESS_RESP_TOPIC_PATH, (req, resp) -> - sendString(resp, Mono.just("OK"))) - .post(FAILING_WITH_400_RESP_PATH, (req, resp) -> - sendError(resp, 400, ERROR_MESSAGE)) - .post(FAILING_WITH_401_RESP_PATH, (req, resp) -> - sendError(resp, 401, ERROR_MESSAGE)) - .post(FAILING_WITH_403_RESP_PATH, (req, resp) -> - sendError(resp, 403, ERROR_MESSAGE)) - .post(FAILING_WITH_404_RESP_PATH, (req, resp) -> - sendError(resp, 404, ERROR_MESSAGE)) - .post(FAILING_WITH_500_TOPIC_PATH, (req, resp) -> - sendError(resp, 500, ERROR_MESSAGE)) + .post(SUCCESS_RESP_TOPIC_PATH, (req, resp) -> sendString(resp, Mono.just("OK"))) + .post(DELAY_RESP_TOPIC_PATH, (req, resp) -> sendWithDelay(resp, 200, TIMEOUT)) + .post(FAILING_WITH_400_RESP_PATH, (req, resp) -> sendError(resp, 400, ERROR_MESSAGE)) + .post(FAILING_WITH_401_RESP_PATH, (req, resp) -> sendError(resp, 401, ERROR_MESSAGE)) + .post(FAILING_WITH_403_RESP_PATH, (req, resp) -> sendError(resp, 403, ERROR_MESSAGE)) + .post(FAILING_WITH_404_RESP_PATH, (req, resp) -> sendError(resp, 404, ERROR_MESSAGE)) + .post(FAILING_WITH_500_RESP_PATH, (req, resp) -> sendError(resp, 500, ERROR_MESSAGE)) ); } @Test void test_put_givenMessageBatch_shouldMakeSuccessfulPostRequestReturningBatch() { //given - final MessageRouterPublishRequest mrRequest = createMRRequest(SUCCESS_RESP_TOPIC_PATH, - ContentType.TEXT_PLAIN); + final MessageRouterPublishRequest mrRequest = createTextPlainMRRequest(SUCCESS_RESP_TOPIC_PATH); final List<JsonElement> expectedItems = messageBatchItems.map(JsonPrimitive::new); - //when final Flux<MessageRouterPublishResponse> result = sut.put(mrRequest, messageBatch); @@ -102,13 +103,18 @@ class MessageRouterPublisherTest { .verify(TIMEOUT); } - @Test - void publisher_shouldHandleBadRequestError() { + @ParameterizedTest + @CsvSource({ + FAILING_WITH_400_RESP_PATH + "," + "400 Bad Request", + FAILING_WITH_401_RESP_PATH + "," + "401 Unauthorized", + FAILING_WITH_403_RESP_PATH + "," + "403 Forbidden", + FAILING_WITH_404_RESP_PATH + "," + "404 Not Found", + FAILING_WITH_500_RESP_PATH + "," + "500 Internal Server Error" + }) + void publisher_shouldHandleError(String failingPath, String failReason) { //given - final MessageRouterPublishRequest mrRequest = createMRRequest(FAILING_WITH_400_RESP_PATH, - ContentType.TEXT_PLAIN); - final MessageRouterPublishResponse expectedResponse = createErrorResponse( - "400 Bad Request\n%s", ERROR_MESSAGE); + final MessageRouterPublishRequest mrRequest = createTextPlainMRRequest(failingPath); + final MessageRouterPublishResponse expectedResponse = createErrorResponse(failReason); //when final Flux<MessageRouterPublishResponse> result = sut.put(mrRequest, messageBatch); @@ -121,83 +127,40 @@ class MessageRouterPublisherTest { } @Test - void publisher_shouldHandleUnauthorizedError() { + void publisher_shouldHandleClientTimeoutError() { //given - final MessageRouterPublishRequest mrRequest = createMRRequest(FAILING_WITH_401_RESP_PATH, - ContentType.TEXT_PLAIN); - final MessageRouterPublishResponse expectedResponse = createErrorResponse( - "401 Unauthorized\n%s", ERROR_MESSAGE); + final Duration requestTimeout = Duration.ofMillis(1); + final MessageRouterPublishRequest mrRequest = createTextPlainMRRequest(DELAY_RESP_TOPIC_PATH, requestTimeout); //when final Flux<MessageRouterPublishResponse> result = sut.put(mrRequest, messageBatch); //then StepVerifier.create(result) - .expectNext(expectedResponse) - .expectComplete() - .verify(TIMEOUT); - } - - @Test - void publisher_shouldHandleForbiddenError() { - //given - final MessageRouterPublishRequest mrRequest = createMRRequest(FAILING_WITH_403_RESP_PATH, - ContentType.TEXT_PLAIN); - final MessageRouterPublishResponse expectedResponse = createErrorResponse( - "403 Forbidden\n%s", ERROR_MESSAGE); - - //when - final Flux<MessageRouterPublishResponse> result = sut - .put(mrRequest, messageBatch); - - //then - StepVerifier.create(result) - .expectNext(expectedResponse) + .consumeNextWith(this::assertTimeoutError) .expectComplete() .verify(TIMEOUT); } - @Test - void publisher_shouldHandleNotFoundError() { - //given - final MessageRouterPublishRequest mrRequest = createMRRequest(FAILING_WITH_404_RESP_PATH, - ContentType.TEXT_PLAIN); - final MessageRouterPublishResponse expectedResponse = createErrorResponse( - "404 Not Found\n%s", ERROR_MESSAGE); - - //when - final Flux<MessageRouterPublishResponse> result = sut - .put(mrRequest, messageBatch); - - //then - StepVerifier.create(result) - .expectNext(expectedResponse) - .expectComplete() - .verify(TIMEOUT); + private static MessageRouterPublishRequest createTextPlainMRRequest(String topicPath) { + final MessageRouterSink sinkDefinition = createMRSink(topicPath); + return ImmutableMessageRouterPublishRequest.builder() + .sinkDefinition(sinkDefinition) + .contentType(ContentType.TEXT_PLAIN) + .build(); } - @Test - void publisher_shouldHandleInternalServerError() { - //given - final MessageRouterPublishRequest mrRequest = createMRRequest(FAILING_WITH_500_TOPIC_PATH, - ContentType.TEXT_PLAIN); - final MessageRouterPublishResponse expectedResponse = createErrorResponse( - "500 Internal Server Error\n%s", ERROR_MESSAGE); - - //when - final Flux<MessageRouterPublishResponse> result = sut - .put(mrRequest, messageBatch); - - //then - StepVerifier.create(result) - .expectNext(expectedResponse) - .expectComplete() - .verify(TIMEOUT); + private static MessageRouterPublishRequest createTextPlainMRRequest(String topicPath, Duration timeout) { + final MessageRouterSink sinkDefinition = createMRSink(topicPath); + return ImmutableMessageRouterPublishRequest.builder() + .sinkDefinition(sinkDefinition) + .contentType(ContentType.TEXT_PLAIN) + .timeoutConfig(ImmutableDmaapTimeoutConfig.builder().timeout(timeout).build()) + .build(); } - - private MessageRouterPublishRequest createMRRequest(String topicPath, ContentType contentType) { - final MessageRouterSink sinkDefinition = ImmutableMessageRouterSink.builder() + private static MessageRouterSink createMRSink(String topicPath) { + return ImmutableMessageRouterSink.builder() .name("the topic") .topicUrl(String.format("http://%s:%d%s", server.host(), @@ -205,18 +168,19 @@ class MessageRouterPublisherTest { topicPath) ) .build(); - - return ImmutableMessageRouterPublishRequest.builder() - .sinkDefinition(sinkDefinition) - .contentType(contentType) - .build(); } - private MessageRouterPublishResponse createErrorResponse(String failReasonFormat, Object... formatArgs) { + private static MessageRouterPublishResponse createErrorResponse(String failReason) { + String failReasonFormat = failReason + "\n%s"; return ImmutableMessageRouterPublishResponse .builder() - .failReason(String.format(failReasonFormat, formatArgs)) + .failReason(String.format(failReasonFormat, ERROR_MESSAGE)) .build(); } + + private void assertTimeoutError(DmaapResponse response) { + assertThat(response.failed()).isTrue(); + assertThat(response.failReason()).startsWith(TIMEOUT_ERROR_MESSAGE); + } } diff --git a/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterSubscriberIT.java b/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterSubscriberIT.java index bd161aab..1f4e499d 100644 --- a/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterSubscriberIT.java +++ b/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterSubscriberIT.java @@ -22,14 +22,18 @@ package org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api; import com.google.gson.JsonElement; import com.google.gson.JsonObject; -import eu.rekawek.toxiproxy.Proxy; -import eu.rekawek.toxiproxy.ToxiproxyClient; import io.vavr.collection.List; import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import org.mockserver.client.MockServerClient; +import org.mockserver.matchers.Times; +import org.mockserver.verify.VerificationTimes; import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishRequest; import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterSubscribeRequest; import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterSubscribeResponse; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.ImmutableDmaapRetryConfig; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.ImmutableMessageRouterSubscriberConfig; import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.MessageRouterPublisherConfig; import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.MessageRouterSubscriberConfig; import org.testcontainers.containers.DockerComposeContainer; @@ -39,11 +43,11 @@ import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.test.StepVerifier; -import java.io.IOException; import java.time.Duration; import java.util.concurrent.TimeUnit; -import static eu.rekawek.toxiproxy.model.ToxicDirection.DOWNSTREAM; +import static org.mockserver.model.HttpRequest.request; +import static org.mockserver.model.HttpResponse.response; import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.MessageRouterTestsUtils.createMRSubscribeRequest; import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.MessageRouterTestsUtils.createPublishRequest; import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.MessageRouterTestsUtils.errorSubscribeResponse; @@ -52,19 +56,23 @@ import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.MessageR import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.MessageRouterTestsUtils.jsonBatch; import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.MessageRouterTestsUtils.registerTopic; import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.MessageRouterTestsUtils.successSubscribeResponse; - import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.DMaapContainer.DMAAP_SERVICE_EXPOSED_PORT; -import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.DMaapContainer.DMAAP_SERVICE_NAME; import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.DMaapContainer.LOCALHOST; -import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.DMaapContainer.PROXY_SERVICE_EXPOSED_PORT; +import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.DMaapContainer.PROXY_MOCK_SERVICE_EXPOSED_PORT; +import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.DMaapContainer.createContainerInstance; @Testcontainers class MessageRouterSubscriberIT { + @Container + private static final DockerComposeContainer CONTAINER = createContainerInstance(); + private static final MockServerClient MOCK_SERVER_CLIENT = new MockServerClient( + LOCALHOST, PROXY_MOCK_SERVICE_EXPOSED_PORT); + private static String EVENTS_PATH; + private static String PROXY_MOCK_EVENTS_PATH; + private static final Duration TIMEOUT = Duration.ofSeconds(10); private static final String CONSUMER_GROUP = "group1"; private static final String CONSUMER_ID = "consumer200"; - private static String PROXY_EVENTS_PATH; - private static Proxy DMAAP_PROXY; private static final String DMAAP_404_ERROR_RESPONSE_FORMAT = "404 Not Found\n" + "{" + "\"mrstatus\":3001," + @@ -85,27 +93,21 @@ class MessageRouterSubscriberIT { + "}" + "}"; - @Container - private static final DockerComposeContainer CONTAINER = DMaapContainer.createContainerInstance(); - - private static String EVENTS_PATH; - private MessageRouterPublisher publisher = DmaapClientFactory .createMessageRouterPublisher(MessageRouterPublisherConfig.createDefault()); private MessageRouterSubscriber subscriber = DmaapClientFactory .createMessageRouterSubscriber(MessageRouterSubscriberConfig.createDefault()); - @BeforeAll - static void setUp() throws IOException { + static void setUp() { EVENTS_PATH = String.format("http://%s:%d/events", LOCALHOST, DMAAP_SERVICE_EXPOSED_PORT); - PROXY_EVENTS_PATH = String.format("http://%s:%d/events", LOCALHOST, PROXY_SERVICE_EXPOSED_PORT); - - DMAAP_PROXY = new ToxiproxyClient().createProxy("dmaapProxy", - String.format("[::]:%s", PROXY_SERVICE_EXPOSED_PORT), - String.format("%s:%d", DMAAP_SERVICE_NAME, DMAAP_SERVICE_EXPOSED_PORT)); + PROXY_MOCK_EVENTS_PATH = String.format("http://%s:%d/events", LOCALHOST, PROXY_MOCK_SERVICE_EXPOSED_PORT); } + @BeforeEach + void set() { + MOCK_SERVER_CLIENT.reset(); + } @Test void subscriber_shouldHandleNoSuchTopicException() { @@ -128,7 +130,7 @@ class MessageRouterSubscriberIT { } @Test - void subscriberShouldHandleSingleItemResponse(){ + void subscriberShouldHandleSingleItemResponse() { //given final String topic = "TOPIC"; final String topicUrl = String.format("%s/%s", EVENTS_PATH, topic); @@ -207,7 +209,7 @@ class MessageRouterSubscriberIT { } @Test - void subscriber_shouldSubscribeToTopic(){ + void subscriber_shouldSubscribeToTopic() { //given final String topic = "TOPIC4"; final String topicUrl = String.format("%s/%s", EVENTS_PATH, topic); @@ -234,17 +236,16 @@ class MessageRouterSubscriberIT { } @Test - void subscriber_shouldHandleTimeoutException() throws IOException { + void subscriber_shouldHandleTimeoutException() { //given - final String topic = "newTopic"; + final String topic = "TOPIC5"; final MessageRouterSubscribeRequest mrSubscribeRequest = createMRSubscribeRequest( - String.format("%s/%s", PROXY_EVENTS_PATH, topic), CONSUMER_GROUP, CONSUMER_ID, Duration.ofSeconds(1)); - final MessageRouterSubscribeResponse expectedResponse = errorSubscribeResponse( - TIMEOUT_ERROR_MESSAGE); - - final String toxic = "latency-toxic"; - DMAAP_PROXY.toxics() - .latency(toxic, DOWNSTREAM, TimeUnit.SECONDS.toMillis(5)); + String.format("%s/%s", PROXY_MOCK_EVENTS_PATH, topic), CONSUMER_GROUP, CONSUMER_ID, Duration.ofSeconds(1)); + final MessageRouterSubscribeResponse expectedResponse = errorSubscribeResponse(TIMEOUT_ERROR_MESSAGE); + final String path = String.format("/events/%s/%s/%s", topic, CONSUMER_GROUP, CONSUMER_ID); + MOCK_SERVER_CLIENT + .when(request().withPath(path), Times.once()) + .respond(response().withDelay(TimeUnit.SECONDS, 5)); //when Mono<MessageRouterSubscribeResponse> response = subscriber @@ -256,7 +257,88 @@ class MessageRouterSubscriberIT { .expectComplete() .verify(TIMEOUT); - //cleanup - DMAAP_PROXY.toxics().get(toxic).remove(); + MOCK_SERVER_CLIENT.verify(request().withPath(path), VerificationTimes.exactly(1)); + } + + @Test + void subscriber_shouldRetryWhenRetryableHttpCodeAndSuccessfullySubscribe() { + //given + final String topic = "TOPIC6"; + final String proxyTopicUrl = String.format("%s/%s", PROXY_MOCK_EVENTS_PATH, topic); + final String topicUrl = String.format("%s/%s", EVENTS_PATH, topic); + final MessageRouterPublishRequest publishRequest = createPublishRequest(proxyTopicUrl); + final MessageRouterSubscribeRequest subscribeRequest = createMRSubscribeRequest(proxyTopicUrl, CONSUMER_GROUP, CONSUMER_ID); + + final List<String> singleJsonMessage = List.of("{\"message\":\"message1\"}"); + final List<JsonElement> expectedItems = getAsJsonElements(singleJsonMessage); + final Flux<JsonObject> jsonMessageBatch = jsonBatch(singleJsonMessage); + final MessageRouterSubscribeResponse expectedResponse = successSubscribeResponse(expectedItems); + + final String path = String.format("/events/%s/%s/%s", topic, CONSUMER_GROUP, CONSUMER_ID); + MOCK_SERVER_CLIENT + .when(request().withPath(path), Times.once()) + .respond(response().withStatusCode(404)); + final MessageRouterSubscriber subscriber = DmaapClientFactory.createMessageRouterSubscriber(retryConfig()); + + //when + registerTopic(publisher, createPublishRequest(topicUrl), subscriber, + createMRSubscribeRequest(topicUrl, CONSUMER_GROUP, CONSUMER_ID)); + Mono<MessageRouterSubscribeResponse> response = publisher + .put(publishRequest, jsonMessageBatch) + .then(subscriber.get(subscribeRequest)); + + //then + StepVerifier.create(response) + .expectNext(expectedResponse) + .expectComplete() + .verify(); + + MOCK_SERVER_CLIENT.verify(request().withPath(path), VerificationTimes.exactly(2)); + } + + @Test + void subscriber_shouldRetryWhenClientTimeoutAndSuccessfullySubscribe() { + //given + final String topic = "TOPIC7"; + final String proxyTopicUrl = String.format("%s/%s", PROXY_MOCK_EVENTS_PATH, topic); + final String topicUrl = String.format("%s/%s", EVENTS_PATH, topic); + final MessageRouterPublishRequest publishRequest = createPublishRequest(proxyTopicUrl); + final MessageRouterSubscribeRequest subscribeRequest = createMRSubscribeRequest( + proxyTopicUrl, CONSUMER_GROUP, CONSUMER_ID, Duration.ofSeconds(1)); + + final List<String> singleJsonMessage = List.of("{\"message\":\"message1\"}"); + final List<JsonElement> expectedItems = getAsJsonElements(singleJsonMessage); + final Flux<JsonObject> jsonMessageBatch = jsonBatch(singleJsonMessage); + final MessageRouterSubscribeResponse expectedResponse = successSubscribeResponse(expectedItems); + + final String path = String.format("/events/%s/%s/%s", topic, CONSUMER_GROUP, CONSUMER_ID); + MOCK_SERVER_CLIENT + .when(request().withPath(path), Times.once()) + .respond(response().withDelay(TimeUnit.SECONDS, 10)); + final MessageRouterSubscriber subscriber = DmaapClientFactory.createMessageRouterSubscriber(retryConfig()); + + //when + registerTopic(publisher, createPublishRequest(topicUrl), subscriber, + createMRSubscribeRequest(topicUrl, CONSUMER_GROUP, CONSUMER_ID)); + Mono<MessageRouterSubscribeResponse> response = publisher + .put(publishRequest, jsonMessageBatch) + .then(subscriber.get(subscribeRequest)); + + //then + StepVerifier.create(response) + .expectNext(expectedResponse) + .expectComplete() + .verify(); + + MOCK_SERVER_CLIENT.verify(request().withPath(path), VerificationTimes.exactly(2)); + } + + private MessageRouterSubscriberConfig retryConfig() { + return ImmutableMessageRouterSubscriberConfig.builder() + .retryConfig(ImmutableDmaapRetryConfig.builder() + .retryIntervalInSeconds(1) + .retryCount(1) + .build()) + .build(); } } diff --git a/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterSubscriberTest.java b/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterSubscriberTest.java index 18584789..06875394 100644 --- a/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterSubscriberTest.java +++ b/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterSubscriberTest.java @@ -2,7 +2,7 @@ * ============LICENSE_START==================================== * DCAEGEN2-SERVICES-SDK * ========================================================= - * Copyright (C) 2019 Nokia. All rights reserved. + * Copyright (C) 2019-2021 Nokia. All rights reserved. * ========================================================= * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -20,30 +20,34 @@ package org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api; -import static org.onap.dcaegen2.services.sdk.rest.services.adapters.http.test.DummyHttpServer.sendError; -import static org.onap.dcaegen2.services.sdk.rest.services.adapters.http.test.DummyHttpServer.sendResource; - import com.google.gson.JsonElement; import com.google.gson.JsonPrimitive; import io.vavr.collection.List; - -import java.time.Duration; - import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.CsvSource; import org.onap.dcaegen2.services.sdk.model.streams.dmaap.ImmutableMessageRouterSource; import org.onap.dcaegen2.services.sdk.model.streams.dmaap.MessageRouterSource; import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.test.DummyHttpServer; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.DmaapResponse; import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.ImmutableMessageRouterSubscribeRequest; import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.ImmutableMessageRouterSubscribeResponse; import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterSubscribeRequest; import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterSubscribeResponse; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.ImmutableDmaapTimeoutConfig; import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.MessageRouterSubscriberConfig; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; -import reactor.netty.http.server.HttpServerRoutes; import reactor.test.StepVerifier; +import java.time.Duration; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.onap.dcaegen2.services.sdk.rest.services.adapters.http.test.DummyHttpServer.sendError; +import static org.onap.dcaegen2.services.sdk.rest.services.adapters.http.test.DummyHttpServer.sendResource; +import static org.onap.dcaegen2.services.sdk.rest.services.adapters.http.test.DummyHttpServer.sendWithDelay; + /** * @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a> * @since May 2019 @@ -51,8 +55,10 @@ import reactor.test.StepVerifier; class MessageRouterSubscriberTest { private static final Duration TIMEOUT = Duration.ofSeconds(10); private static final String ERROR_MESSAGE = "Something went wrong"; + private static final String TIMEOUT_ERROR_MESSAGE = "408 Request Timeout"; private static final String CONSUMER_GROUP = "group1"; private static final String SUCCESS_CONSUMER_ID = "consumer200"; + private static final String DELAY_CONSUMER_ID = "delay200"; private static final String FAILING_WITH_401_CONSUMER_ID = "consumer401"; private static final String FAILING_WITH_403_CONSUMER_ID = "consumer403"; private static final String FAILING_WITH_409_CONSUMER_ID = "consumer409"; @@ -63,6 +69,8 @@ class MessageRouterSubscriberTest { private static final String SUCCESS_RESP_PATH = String .format("%s/%s", CONSUMER_PATH, SUCCESS_CONSUMER_ID); + private static final String DELAY_RESP_PATH = String + .format("%s/%s", CONSUMER_PATH, DELAY_CONSUMER_ID); private static final String FAILING_WITH_401_RESP_PATH = String .format("%s/%s", CONSUMER_PATH, FAILING_WITH_401_CONSUMER_ID); private static final String FAILING_WITH_403_RESP_PATH = String @@ -83,7 +91,15 @@ class MessageRouterSubscriberTest { @BeforeAll static void setUp() { - DummyHttpServer server = DummyHttpServer.start(MessageRouterSubscriberTest::setRoutes); + DummyHttpServer server = DummyHttpServer.start(routes -> routes + .get(SUCCESS_RESP_PATH, (req, resp) -> + sendResource(resp, "/sample-mr-subscribe-response.json")) + .get(DELAY_RESP_PATH, (req, resp) -> sendWithDelay(resp, 200, TIMEOUT)) + .get(FAILING_WITH_401_RESP_PATH, (req, resp) -> sendError(resp, 401, ERROR_MESSAGE)) + .get(FAILING_WITH_403_RESP_PATH, (req, resp) -> sendError(resp, 403, ERROR_MESSAGE)) + .get(FAILING_WITH_409_RESP_PATH, (req, resp) -> sendError(resp, 409, ERROR_MESSAGE)) + .get(FAILING_WITH_429_RESP_PATH, (req, resp) -> sendError(resp, 429, ERROR_MESSAGE)) + .get(FAILING_WITH_500_RESP_PATH, (req, resp) -> sendError(resp, 500, ERROR_MESSAGE))); sourceDefinition = createMessageRouterSource(server); @@ -110,69 +126,19 @@ class MessageRouterSubscriberTest { .verify(TIMEOUT); } - @Test - void subscriber_shouldGetUnauthorizedErrorResponse() { - MessageRouterSubscribeRequest request = createFailingRequest(FAILING_WITH_401_CONSUMER_ID); - Mono<MessageRouterSubscribeResponse> response = sut.get(request); - - MessageRouterSubscribeResponse expectedResponse = createErrorResponse(String - .format("401 Unauthorized\n%s", ERROR_MESSAGE)); - - StepVerifier.create(response) - .expectNext(expectedResponse) - .expectComplete() - .verify(TIMEOUT); - } - - @Test - void subscriber_shouldGetForbiddenErrorResponse() { - MessageRouterSubscribeRequest request = createFailingRequest(FAILING_WITH_403_CONSUMER_ID); - Mono<MessageRouterSubscribeResponse> response = sut.get(request); - - MessageRouterSubscribeResponse expectedResponse = createErrorResponse(String - .format("403 Forbidden\n%s", ERROR_MESSAGE)); - - StepVerifier.create(response) - .expectNext(expectedResponse) - .expectComplete() - .verify(TIMEOUT); - } - - @Test - void subscriber_shouldGetConflictErrorResponse() { - MessageRouterSubscribeRequest request = createFailingRequest(FAILING_WITH_409_CONSUMER_ID); + @ParameterizedTest + @CsvSource({ + FAILING_WITH_401_CONSUMER_ID + "," + "401 Unauthorized", + FAILING_WITH_403_CONSUMER_ID + "," + "403 Forbidden", + FAILING_WITH_409_CONSUMER_ID + "," + "409 Conflict", + FAILING_WITH_429_CONSUMER_ID + "," + "429 Too Many Requests", + FAILING_WITH_500_CONSUMER_ID + "," + "500 Internal Server Error" + }) + void subscriber_shouldHandleError(String consumerId, String failReason) { + MessageRouterSubscribeRequest request = createFailingRequest(consumerId); Mono<MessageRouterSubscribeResponse> response = sut.get(request); - MessageRouterSubscribeResponse expectedResponse = createErrorResponse(String - .format("409 Conflict\n%s", ERROR_MESSAGE)); - - StepVerifier.create(response) - .expectNext(expectedResponse) - .expectComplete() - .verify(TIMEOUT); - } - - @Test - void subscriber_shouldGetTooManyRequestsErrorResponse() { - MessageRouterSubscribeRequest request = createFailingRequest(FAILING_WITH_429_CONSUMER_ID); - Mono<MessageRouterSubscribeResponse> response = sut.get(request); - - MessageRouterSubscribeResponse expectedResponse = createErrorResponse(String - .format("429 Too Many Requests\n%s", ERROR_MESSAGE)); - - StepVerifier.create(response) - .expectNext(expectedResponse) - .expectComplete() - .verify(TIMEOUT); - } - - @Test - void subscriber_shouldGetInternalServerErrorResponse() { - Mono<MessageRouterSubscribeResponse> response = sut - .get(mrFailingRequest); - - MessageRouterSubscribeResponse expectedResponse = createErrorResponse(String - .format("500 Internal Server Error\n%s", ERROR_MESSAGE)); + MessageRouterSubscribeResponse expectedResponse = createErrorResponse(failReason); StepVerifier.create(response) .expectNext(expectedResponse) @@ -226,20 +192,16 @@ class MessageRouterSubscriberTest { .verify(TIMEOUT); } - private static HttpServerRoutes setRoutes(HttpServerRoutes routes) { - return routes - .get(SUCCESS_RESP_PATH, (req, resp) -> - sendResource(resp, "/sample-mr-subscribe-response.json")) - .get(FAILING_WITH_401_RESP_PATH, (req, resp) -> - sendError(resp, 401, ERROR_MESSAGE)) - .get(FAILING_WITH_403_RESP_PATH, (req, resp) -> - sendError(resp, 403, ERROR_MESSAGE)) - .get(FAILING_WITH_409_RESP_PATH, (req, resp) -> - sendError(resp, 409, ERROR_MESSAGE)) - .get(FAILING_WITH_429_RESP_PATH, (req, resp) -> - sendError(resp, 429, ERROR_MESSAGE)) - .get(FAILING_WITH_500_RESP_PATH, (req, resp) -> - sendError(resp, 500, ERROR_MESSAGE)); + @Test + void subscriber_shouldHandleClientTimeoutError() { + Duration requestTimeout = Duration.ofMillis(1); + MessageRouterSubscribeRequest request = createDelayRequest(DELAY_CONSUMER_ID, requestTimeout); + Mono<MessageRouterSubscribeResponse> response = sut.get(request); + + StepVerifier.create(response) + .consumeNextWith(this::assertTimeoutError) + .expectComplete() + .verify(TIMEOUT); } private static MessageRouterSource createMessageRouterSource(DummyHttpServer server) { @@ -257,6 +219,15 @@ class MessageRouterSubscriberTest { .build(); } + private static MessageRouterSubscribeRequest createDelayRequest(String consumerId, Duration timeout) { + return ImmutableMessageRouterSubscribeRequest.builder() + .sourceDefinition(sourceDefinition) + .consumerGroup(CONSUMER_GROUP) + .consumerId(consumerId) + .timeoutConfig(ImmutableDmaapTimeoutConfig.builder().timeout(timeout).build()) + .build(); + } + private static MessageRouterSubscribeRequest createFailingRequest(String consumerId) { return ImmutableMessageRouterSubscribeRequest .builder() @@ -266,11 +237,17 @@ class MessageRouterSubscriberTest { .build(); } - private static MessageRouterSubscribeResponse createErrorResponse(String failReason) { + private MessageRouterSubscribeResponse createErrorResponse(String failReason) { + String failReasonFormat = failReason + "\n%s"; return ImmutableMessageRouterSubscribeResponse .builder() - .failReason(failReason) + .failReason(String.format(failReasonFormat, ERROR_MESSAGE)) .build(); } + + private void assertTimeoutError(DmaapResponse response) { + assertThat(response.failed()).isTrue(); + assertThat(response.failReason()).startsWith(TIMEOUT_ERROR_MESSAGE); + } } diff --git a/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/error/ClientErrorReasonPresenterTest.java b/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/error/ClientErrorReasonPresenterTest.java index 9b318d73..76d7a381 100644 --- a/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/error/ClientErrorReasonPresenterTest.java +++ b/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/error/ClientErrorReasonPresenterTest.java @@ -2,7 +2,7 @@ * ============LICENSE_START==================================== * DCAEGEN2-SERVICES-SDK * ========================================================= - * Copyright (C) 2020 Nokia. All rights reserved. + * Copyright (C) 2020-2021 Nokia. All rights reserved. * ========================================================= * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -34,7 +34,7 @@ class ClientErrorReasonPresenterTest { + "{\"requestError\":{\"serviceException\":{\"messageId\":\"messageId\",\"text\":\"text\"}}}"; //when - String actual = ClientErrorReasonPresenter.present(clientErrorReason); + String actual = new ClientErrorReasonPresenter().present(clientErrorReason); //then assertThat(actual).isEqualTo(expected); @@ -48,7 +48,7 @@ class ClientErrorReasonPresenterTest { + "{\"requestError\":{\"serviceException\":{\"messageId\":\"messageId\",\"text\":\"text\",\"variables\":[\"v1\",\"v2\"]}}}"; //when - String actual = ClientErrorReasonPresenter.present(clientErrorReason); + String actual = new ClientErrorReasonPresenter().present(clientErrorReason); //then assertThat(actual).isEqualTo(expected); diff --git a/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/impl/MessageRouterPublisherImplTest.java b/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/impl/MessageRouterPublisherImplTest.java index f29bfa27..2825a87c 100644 --- a/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/impl/MessageRouterPublisherImplTest.java +++ b/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/impl/MessageRouterPublisherImplTest.java @@ -2,7 +2,7 @@ * ============LICENSE_START==================================== * DCAEGEN2-SERVICES-SDK * ========================================================= - * Copyright (C) 2019-2020 Nokia. All rights reserved. + * Copyright (C) 2019-2021 Nokia. All rights reserved. * ========================================================= * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -39,6 +39,7 @@ import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.ImmutableHttpR import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.RxHttpClient; import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.ContentType; import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.MessageRouterPublisher; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.error.ClientErrorReasonPresenter; import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishRequest; import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishResponse; import reactor.core.publisher.Flux; @@ -69,9 +70,11 @@ class MessageRouterPublisherImplTest { private static final Duration TIMEOUT = Duration.ofSeconds(5); private static final String TOPIC_URL = "https://dmaap-mr/TOPIC"; private static final int MAX_BATCH_SIZE = 3; - public static final String TIMEOUT_ERROR_MESSAGE_HEADER = "408 Request Timeout"; + private static final String ERROR_MESSAGE = "Something went wrong"; private final RxHttpClient httpClient = mock(RxHttpClient.class); - private final MessageRouterPublisher cut = new MessageRouterPublisherImpl(httpClient, MAX_BATCH_SIZE, Duration.ofMinutes(1)); + private final ClientErrorReasonPresenter clientErrorReasonPresenter = mock(ClientErrorReasonPresenter.class); + private final MessageRouterPublisher cut = new MessageRouterPublisherImpl( + httpClient, MAX_BATCH_SIZE, Duration.ofMinutes(1), clientErrorReasonPresenter); private final ArgumentCaptor<HttpRequest> httpRequestArgumentCaptor = ArgumentCaptor.forClass(HttpRequest.class); private final MessageRouterPublishRequest plainPublishRequest = createPublishRequest(TOPIC_URL, ContentType.TEXT_PLAIN); private final MessageRouterPublishRequest jsonPublishRequest = createPublishRequest(TOPIC_URL); @@ -417,7 +420,10 @@ class MessageRouterPublisherImplTest { final List<String> plainMessage = List.of("I", "like", "cookies"); final Flux<JsonElement> plainMessagesMaxBatch = plainBatch(plainMessage); - given(httpClient.call(any(HttpRequest.class))).willReturn(Mono.error(ReadTimeoutException.INSTANCE)); + given(clientErrorReasonPresenter.present(any())) + .willReturn(ERROR_MESSAGE); + given(httpClient.call(any(HttpRequest.class))) + .willReturn(Mono.error(ReadTimeoutException.INSTANCE)); // when final Flux<MessageRouterPublishResponse> responses = cut @@ -439,9 +445,12 @@ class MessageRouterPublisherImplTest { final List<JsonObject> parsedThreeMessages = getAsJsonObjects(threeJsonMessages); final Flux<JsonObject> doubleJsonMessageBatch = jsonBatch(threeJsonMessages.appendAll(twoJsonMessages)); + given(clientErrorReasonPresenter.present(any())) + .willReturn(ERROR_MESSAGE); given(httpClient.call(any(HttpRequest.class))) .willReturn(Mono.just(successHttpResponse)) .willReturn(Mono.error(ReadTimeoutException.INSTANCE)); + // when final Flux<MessageRouterPublishResponse> responses = cut .put(jsonPublishRequest, doubleJsonMessageBatch); @@ -463,9 +472,12 @@ class MessageRouterPublisherImplTest { final List<JsonObject> parsedTwoMessages = getAsJsonObjects(twoJsonMessages); final Flux<JsonObject> doubleJsonMessageBatch = jsonBatch(threeJsonMessages.appendAll(twoJsonMessages)); + given(clientErrorReasonPresenter.present(any())) + .willReturn(ERROR_MESSAGE); given(httpClient.call(any(HttpRequest.class))) .willReturn(Mono.error(ReadTimeoutException.INSTANCE)) .willReturn(Mono.just(successHttpResponse)); + // when final Flux<MessageRouterPublishResponse> responses = cut .put(jsonPublishRequest, doubleJsonMessageBatch); @@ -540,7 +552,7 @@ class MessageRouterPublisherImplTest { private void assertTimeoutError(MessageRouterPublishResponse response) { assertThat(response.failed()).isTrue(); assertThat(response.items()).isEmpty(); - assertThat(response.failReason()).startsWith(TIMEOUT_ERROR_MESSAGE_HEADER); + assertThat(response.failReason()).isEqualTo(ERROR_MESSAGE); } private void verifySingleResponse(List<? extends JsonElement> threeMessages, diff --git a/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/impl/MessageRouterSubscriberImplTest.java b/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/impl/MessageRouterSubscriberImplTest.java index 1f97001e..0396eff9 100644 --- a/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/impl/MessageRouterSubscriberImplTest.java +++ b/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/impl/MessageRouterSubscriberImplTest.java @@ -35,6 +35,7 @@ import org.onap.dcaegen2.services.sdk.model.streams.dmaap.ImmutableMessageRouter import org.onap.dcaegen2.services.sdk.model.streams.dmaap.MessageRouterSource; import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.*; import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.MessageRouterSubscriber; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.error.ClientErrorReasonPresenter; import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.ImmutableMessageRouterSubscribeRequest; import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterSubscribeRequest; import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterSubscribeResponse; @@ -47,10 +48,12 @@ import reactor.core.publisher.Mono; */ class MessageRouterSubscriberImplTest { + private static final String ERROR_MESSAGE = "Something went wrong"; private final RxHttpClient httpClient = mock(RxHttpClient.class); + private final ClientErrorReasonPresenter clientErrorReasonPresenter = mock(ClientErrorReasonPresenter.class); private final MessageRouterSubscriberConfig clientConfig = MessageRouterSubscriberConfig.createDefault(); private final MessageRouterSubscriber - cut = new MessageRouterSubscriberImpl(httpClient, clientConfig.gsonInstance()); + cut = new MessageRouterSubscriberImpl(httpClient, clientConfig.gsonInstance(),clientErrorReasonPresenter); private final ArgumentCaptor<HttpRequest> httpRequestArgumentCaptor = ArgumentCaptor.forClass(HttpRequest.class); private final MessageRouterSource sourceDefinition = ImmutableMessageRouterSource.builder() @@ -136,7 +139,10 @@ class MessageRouterSubscriberImplTest { void getWithProperRequest_shouldReturnTimeoutError() { // given - given(httpClient.call(any(HttpRequest.class))).willReturn(Mono.error(ReadTimeoutException.INSTANCE)); + given(clientErrorReasonPresenter.present(any())) + .willReturn(ERROR_MESSAGE); + given(httpClient.call(any(HttpRequest.class))) + .willReturn(Mono.error(ReadTimeoutException.INSTANCE)); // when final Mono<MessageRouterSubscribeResponse> responses = cut @@ -145,7 +151,7 @@ class MessageRouterSubscriberImplTest { // then assertThat(response.failed()).isTrue(); - assertThat(response.failReason()).contains("408 Request Timeout"); + assertThat(response.failReason()).isEqualTo(ERROR_MESSAGE); assertThat(response.hasElements()).isFalse(); @@ -156,4 +162,4 @@ class MessageRouterSubscriberImplTest { mrRequest.consumerGroup(), mrRequest.consumerId())); assertThat(httpRequest.body()).isNull(); } -}
\ No newline at end of file +} diff --git a/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/model/config/DmaapRetryConfigTest.java b/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/model/config/DmaapRetryConfigTest.java new file mode 100644 index 00000000..da3a88fe --- /dev/null +++ b/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/model/config/DmaapRetryConfigTest.java @@ -0,0 +1,71 @@ +/* + * ============LICENSE_START==================================== + * DCAEGEN2-SERVICES-SDK + * ========================================================= + * Copyright (C) 2021 Nokia. All rights reserved. + * ========================================================= + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END===================================== + */ + +package org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config; + +import org.junit.jupiter.api.Test; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.junit.jupiter.api.Assertions.assertThrows; + +class DmaapRetryConfigTest { + @Test + void shouldSuccessfullyCreateObject() { + DmaapRetryConfig retryConfig = ImmutableDmaapRetryConfig.builder() + .retryIntervalInSeconds(1) + .retryCount(0) + .build(); + + assertThat(retryConfig.retryIntervalInSeconds()).isOne(); + assertThat(retryConfig.retryCount()).isZero(); + } + + @Test + void shouldSuccessfullyCreateObjectForDefaults() { + DmaapRetryConfig retryConfig = ImmutableDmaapRetryConfig.builder().build(); + + assertThat(retryConfig.retryIntervalInSeconds()).isOne(); + assertThat(retryConfig.retryCount()).isEqualTo(3); + } + + @Test + void shouldThrowInvalidArgumentExceptionForInvalidRetryInterval() { + assertThrows(IllegalArgumentException.class, () -> withRetryInterval(0)); + assertThrows(IllegalArgumentException.class, () -> withRetryInterval(-3)); + } + + @Test + void shouldThrowInvalidArgumentExceptionForInvalidRetryCount() { + assertThrows(IllegalArgumentException.class, () -> withRetryCount(-1)); + assertThrows(IllegalArgumentException.class, () -> withRetryCount(-3)); + } + + private void withRetryInterval(int ri) { + ImmutableDmaapRetryConfig.builder() + .retryIntervalInSeconds(ri) + .build(); + } + + private void withRetryCount(int rc) { + ImmutableDmaapRetryConfig.builder() + .retryCount(rc) + .build(); + } +} diff --git a/rest-services/dmaap-client/src/test/resources/dmaap-msg-router/message-router-compose.yml b/rest-services/dmaap-client/src/test/resources/dmaap-msg-router/message-router-compose.yml index ab6641cb..26eb1763 100644 --- a/rest-services/dmaap-client/src/test/resources/dmaap-msg-router/message-router-compose.yml +++ b/rest-services/dmaap-client/src/test/resources/dmaap-msg-router/message-router-compose.yml @@ -67,11 +67,11 @@ services: - zookeeper - kafka - toxiproxy: - image: shopify/toxiproxy:2.1.4 + mockserver: + image: mockserver/mockserver:mockserver-5.11.2 + command: -serverPort 1090 -proxyRemotePort 3904 -proxyRemoteHost dmaap ports: - - "8474:8474" - - "8666:8666" + - "1080:1090" networks: - net depends_on: |