From 9b309b5e3905cb25d5d661c4428cc9d4ad0402a6 Mon Sep 17 00:00:00 2001 From: tkogut Date: Tue, 19 Jan 2021 09:00:56 +0100 Subject: Support retry in DCAE-SDK DMaaP-Client Issue-ID: DCAEGEN2-1483 Signed-off-by: tkogut Change-Id: Id3f98c0a9367f7c7c2c53ed3eba8805a5a6ab87e --- pom.xml | 8 +- rest-services/dmaap-client/pom.xml | 15 +- .../dmaap/client/api/DmaapClientFactory.java | 41 +++- .../services/dmaap/client/error/ClientError.java | 30 --- .../client/error/ClientErrorReasonPresenter.java | 10 +- .../services/dmaap/client/error/RequestError.java | 30 --- .../dmaap/client/error/ServiceException.java | 38 --- .../dmaap/client/error/model/ClientError.java | 30 +++ .../dmaap/client/error/model/RequestError.java | 30 +++ .../dmaap/client/error/model/ServiceException.java | 38 +++ .../client/impl/MessageRouterPublisherImpl.java | 9 +- .../client/impl/MessageRouterSubscriberImpl.java | 20 +- .../services/dmaap/client/model/DmaapRequest.java | 4 +- .../model/config/DmaapClientConfiguration.java | 7 +- .../client/model/config/DmaapRetryConfig.java | 64 +++++ .../client/model/config/DmaapTimeoutConfig.java | 34 +++ .../dmaap/client/model/config/TimeoutConfig.java | 34 --- .../dmaap/client/MessageRouterTestsUtils.java | 6 +- .../services/dmaap/client/api/DMaapContainer.java | 11 +- .../dmaap/client/api/MessageRouterPublisherIT.java | 124 ++++++++-- .../client/api/MessageRouterPublisherTest.java | 148 +++++------- .../client/api/MessageRouterSubscriberIT.java | 150 +++++++++--- .../client/api/MessageRouterSubscriberTest.java | 149 +++++------- .../error/ClientErrorReasonPresenterTest.java | 6 +- .../impl/MessageRouterPublisherImplTest.java | 22 +- .../impl/MessageRouterSubscriberImplTest.java | 14 +- .../client/model/config/DmaapRetryConfigTest.java | 71 ++++++ .../dmaap-msg-router/message-router-compose.yml | 8 +- .../rest/services/adapters/http/RxHttpClient.java | 68 +++++- .../adapters/http/RxHttpClientFactory.java | 36 ++- .../services/adapters/http/config/RetryConfig.java | 59 +++++ .../adapters/http/config/RxHttpClientConfig.java | 29 +++ .../adapters/http/test/DummyHttpServer.java | 38 ++- .../services/adapters/http/RxHttpClientIT.java | 263 +++++++++++++++++++-- 34 files changed, 1182 insertions(+), 462 deletions(-) delete mode 100644 rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/error/ClientError.java delete mode 100644 rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/error/RequestError.java delete mode 100644 rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/error/ServiceException.java create mode 100644 rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/error/model/ClientError.java create mode 100644 rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/error/model/RequestError.java create mode 100644 rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/error/model/ServiceException.java create mode 100644 rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/model/config/DmaapRetryConfig.java create mode 100644 rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/model/config/DmaapTimeoutConfig.java delete mode 100644 rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/model/config/TimeoutConfig.java create mode 100644 rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/model/config/DmaapRetryConfigTest.java create mode 100644 rest-services/http-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/adapters/http/config/RetryConfig.java create mode 100644 rest-services/http-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/adapters/http/config/RxHttpClientConfig.java diff --git a/pom.xml b/pom.xml index 80b0c915..2c2ed16b 100644 --- a/pom.xml +++ b/pom.xml @@ -60,8 +60,8 @@ 11 - 5.3.1 - 5.3.1 + 5.7.0 + 5.7.0 1.3.1 2.7.5 3.12.2 @@ -74,11 +74,11 @@ 1.6 16.0.3 3.6.0.2 - 1.15.0 + 1.15.1 2.4.0 1.17.2 1.0.3 - 2.1.4 + 5.11.2 ${project.reporting.outputDirectory}/jacoco-ut/jacoco.xml diff --git a/rest-services/dmaap-client/pom.xml b/rest-services/dmaap-client/pom.xml index d619590f..b8620311 100644 --- a/rest-services/dmaap-client/pom.xml +++ b/rest-services/dmaap-client/pom.xml @@ -55,6 +55,10 @@ org.junit.jupiter junit-jupiter-engine + + org.junit.jupiter + junit-jupiter-params + io.projectreactor reactor-test @@ -76,11 +80,10 @@ org.testcontainers junit-jupiter - - eu.rekawek.toxiproxy - toxiproxy-java - ${toxiproxy-java.version} - test - + + org.mock-server + mockserver-client-java + ${mockserver-client.version} + diff --git a/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/DmaapClientFactory.java b/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/DmaapClientFactory.java index 3c27da10..9d255559 100644 --- a/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/DmaapClientFactory.java +++ b/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/DmaapClientFactory.java @@ -2,7 +2,7 @@ * ============LICENSE_START==================================== * DCAEGEN2-SERVICES-SDK * ========================================================= - * Copyright (C) 2019 Nokia. All rights reserved. + * Copyright (C) 2019-2021 Nokia. All rights reserved. * ========================================================= * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -19,17 +19,28 @@ */ package org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api; +import io.vavr.control.Option; import org.jetbrains.annotations.NotNull; import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.RxHttpClient; import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.RxHttpClientFactory; +import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.config.ImmutableRetryConfig; +import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.config.ImmutableRxHttpClientConfig; +import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.config.RetryConfig; +import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.config.RxHttpClientConfig; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.error.ClientErrorReasonPresenter; import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.impl.MessageRouterPublisherImpl; import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.impl.MessageRouterSubscriberImpl; import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.DmaapClientConfiguration; import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.MessageRouterPublisherConfig; import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.MessageRouterSubscriberConfig; +import java.time.Duration; + +import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.DmaapRetryConfig.ON_RETRY_EXHAUSTED_EXCEPTION; +import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.DmaapRetryConfig.RETRYABLE_EXCEPTIONS; +import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.DmaapRetryConfig.RETRYABLE_HTTP_CODES; + /** - * * @author Piotr Jaszczyk * @since 1.1.4 */ @@ -44,19 +55,37 @@ public final class DmaapClientFactory { return new MessageRouterPublisherImpl( createHttpClient(clientConfiguration), clientConfiguration.maxBatchSize(), - clientConfiguration.maxBatchDuration()); + clientConfiguration.maxBatchDuration(), + new ClientErrorReasonPresenter()); } public static @NotNull MessageRouterSubscriber createMessageRouterSubscriber( @NotNull MessageRouterSubscriberConfig clientConfiguration) { return new MessageRouterSubscriberImpl( createHttpClient(clientConfiguration), - clientConfiguration.gsonInstance()); + clientConfiguration.gsonInstance(), + new ClientErrorReasonPresenter()); } private static @NotNull RxHttpClient createHttpClient(DmaapClientConfiguration config) { + RxHttpClientConfig clientConfig = ImmutableRxHttpClientConfig.builder() + .retryConfig(createRetry(config)) + .build(); return config.securityKeys() == null - ? RxHttpClientFactory.create() - : RxHttpClientFactory.create(config.securityKeys()); + ? RxHttpClientFactory.create(clientConfig) + : RxHttpClientFactory.create(config.securityKeys(), clientConfig); + } + + private static RetryConfig createRetry(DmaapClientConfiguration config) { + return Option.of(config.retryConfig()) + .map(rc -> ImmutableRetryConfig.builder() + .retryInterval(Duration.ofSeconds(rc.retryIntervalInSeconds())) + .retryCount(rc.retryCount()) + .retryableHttpResponseCodes(RETRYABLE_HTTP_CODES) + .customRetryableExceptions(RETRYABLE_EXCEPTIONS) + .onRetryExhaustedException(ON_RETRY_EXHAUSTED_EXCEPTION) + .build()) + .getOrNull(); } } + diff --git a/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/error/ClientError.java b/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/error/ClientError.java deleted file mode 100644 index 57187c80..00000000 --- a/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/error/ClientError.java +++ /dev/null @@ -1,30 +0,0 @@ -/* - * ============LICENSE_START==================================== - * DCAEGEN2-SERVICES-SDK - * ========================================================= - * Copyright (C) 2020 Nokia. All rights reserved. - * ========================================================= - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END===================================== - */ - -package org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.error; - -import org.immutables.gson.Gson; -import org.immutables.value.Value; - -@Value.Immutable -@Gson.TypeAdapters -public interface ClientError { - RequestError requestError(); -} diff --git a/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/error/ClientErrorReasonPresenter.java b/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/error/ClientErrorReasonPresenter.java index 6b22b378..1eaae78e 100644 --- a/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/error/ClientErrorReasonPresenter.java +++ b/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/error/ClientErrorReasonPresenter.java @@ -2,7 +2,7 @@ * ============LICENSE_START==================================== * DCAEGEN2-SERVICES-SDK * ========================================================= - * Copyright (C) 2020 Nokia. All rights reserved. + * Copyright (C) 2020-2021 Nokia. All rights reserved. * ========================================================= * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -23,15 +23,17 @@ package org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.error; import com.google.gson.Gson; import com.google.gson.GsonBuilder; import io.vavr.control.Option; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.error.model.ClientError; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.error.model.ImmutableClientError; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.error.model.ImmutableRequestError; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.error.model.ImmutableServiceException; public class ClientErrorReasonPresenter { - private ClientErrorReasonPresenter() { } - private static final Gson GSON = new GsonBuilder().create(); private static final String PATTERN = "%s\n%s"; - public static String present(ClientErrorReason clientErrorReason) { + public String present(ClientErrorReason clientErrorReason) { ImmutableServiceException simpleServiceException = ImmutableServiceException.builder() .messageId(clientErrorReason.messageId()) .text(clientErrorReason.text()) diff --git a/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/error/RequestError.java b/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/error/RequestError.java deleted file mode 100644 index 71e673fe..00000000 --- a/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/error/RequestError.java +++ /dev/null @@ -1,30 +0,0 @@ -/* - * ============LICENSE_START==================================== - * DCAEGEN2-SERVICES-SDK - * ========================================================= - * Copyright (C) 2020 Nokia. All rights reserved. - * ========================================================= - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END===================================== - */ - -package org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.error; - -import org.immutables.gson.Gson; -import org.immutables.value.Value; - -@Value.Immutable -@Gson.TypeAdapters -public interface RequestError { - ServiceException serviceException(); -} diff --git a/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/error/ServiceException.java b/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/error/ServiceException.java deleted file mode 100644 index e99330ac..00000000 --- a/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/error/ServiceException.java +++ /dev/null @@ -1,38 +0,0 @@ -/* - * ============LICENSE_START==================================== - * DCAEGEN2-SERVICES-SDK - * ========================================================= - * Copyright (C) 2020 Nokia. All rights reserved. - * ========================================================= - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END===================================== - */ - -package org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.error; - -import org.immutables.gson.Gson; -import org.immutables.value.Value; -import reactor.util.annotation.Nullable; - -import java.util.List; - -@Value.Immutable -@Gson.TypeAdapters -public interface ServiceException { - - String messageId(); - - String text(); - - @Nullable List variables(); -} diff --git a/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/error/model/ClientError.java b/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/error/model/ClientError.java new file mode 100644 index 00000000..d0cb35da --- /dev/null +++ b/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/error/model/ClientError.java @@ -0,0 +1,30 @@ +/* + * ============LICENSE_START==================================== + * DCAEGEN2-SERVICES-SDK + * ========================================================= + * Copyright (C) 2020-2021 Nokia. All rights reserved. + * ========================================================= + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END===================================== + */ + +package org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.error.model; + +import org.immutables.gson.Gson; +import org.immutables.value.Value; + +@Value.Immutable +@Gson.TypeAdapters +public interface ClientError { + RequestError requestError(); +} diff --git a/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/error/model/RequestError.java b/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/error/model/RequestError.java new file mode 100644 index 00000000..79b9a299 --- /dev/null +++ b/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/error/model/RequestError.java @@ -0,0 +1,30 @@ +/* + * ============LICENSE_START==================================== + * DCAEGEN2-SERVICES-SDK + * ========================================================= + * Copyright (C) 2020-2021 Nokia. All rights reserved. + * ========================================================= + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END===================================== + */ + +package org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.error.model; + +import org.immutables.gson.Gson; +import org.immutables.value.Value; + +@Value.Immutable +@Gson.TypeAdapters +public interface RequestError { + ServiceException serviceException(); +} diff --git a/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/error/model/ServiceException.java b/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/error/model/ServiceException.java new file mode 100644 index 00000000..a39fbc0b --- /dev/null +++ b/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/error/model/ServiceException.java @@ -0,0 +1,38 @@ +/* + * ============LICENSE_START==================================== + * DCAEGEN2-SERVICES-SDK + * ========================================================= + * Copyright (C) 2020-2021 Nokia. All rights reserved. + * ========================================================= + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END===================================== + */ + +package org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.error.model; + +import org.immutables.gson.Gson; +import org.immutables.value.Value; +import reactor.util.annotation.Nullable; + +import java.util.List; + +@Value.Immutable +@Gson.TypeAdapters +public interface ServiceException { + + String messageId(); + + String text(); + + @Nullable List variables(); +} diff --git a/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/impl/MessageRouterPublisherImpl.java b/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/impl/MessageRouterPublisherImpl.java index 16068da0..7d1b0a93 100644 --- a/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/impl/MessageRouterPublisherImpl.java +++ b/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/impl/MessageRouterPublisherImpl.java @@ -2,7 +2,7 @@ * ============LICENSE_START==================================== * DCAEGEN2-SERVICES-SDK * ========================================================= - * Copyright (C) 2019-2020 Nokia. All rights reserved. + * Copyright (C) 2019-2021 Nokia. All rights reserved. * ========================================================= * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -61,12 +61,15 @@ public class MessageRouterPublisherImpl implements MessageRouterPublisher { private final RxHttpClient httpClient; private final int maxBatchSize; private final Duration maxBatchDuration; + private final ClientErrorReasonPresenter clientErrorReasonPresenter; + private static final Logger LOGGER = LoggerFactory.getLogger(MessageRouterPublisherImpl.class); - public MessageRouterPublisherImpl(RxHttpClient httpClient, int maxBatchSize, Duration maxBatchDuration) { + public MessageRouterPublisherImpl(RxHttpClient httpClient, int maxBatchSize, Duration maxBatchDuration, ClientErrorReasonPresenter clientErrorReasonPresenter) { this.httpClient = httpClient; this.maxBatchSize = maxBatchSize; this.maxBatchDuration = maxBatchDuration; + this.clientErrorReasonPresenter = clientErrorReasonPresenter; } @Override @@ -124,7 +127,7 @@ public class MessageRouterPublisherImpl implements MessageRouterPublisher { } private Mono createErrorResponse(ClientErrorReason clientErrorReason) { - String failReason = ClientErrorReasonPresenter.present(clientErrorReason); + String failReason = clientErrorReasonPresenter.present(clientErrorReason); return Mono.just(ImmutableMessageRouterPublishResponse.builder() .failReason(failReason) .build()); diff --git a/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/impl/MessageRouterSubscriberImpl.java b/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/impl/MessageRouterSubscriberImpl.java index f7ccf4f2..292a7157 100644 --- a/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/impl/MessageRouterSubscriberImpl.java +++ b/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/impl/MessageRouterSubscriberImpl.java @@ -20,16 +20,12 @@ package org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.impl; -import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.impl.Commons.extractFailReason; - import com.google.gson.Gson; import com.google.gson.JsonArray; import com.google.gson.JsonElement; import com.google.gson.JsonParser; import io.netty.handler.timeout.ReadTimeoutException; import io.vavr.collection.List; -import java.nio.charset.StandardCharsets; - import io.vavr.control.Option; import org.jetbrains.annotations.NotNull; import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.HttpMethod; @@ -48,6 +44,10 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import reactor.core.publisher.Mono; +import java.nio.charset.StandardCharsets; + +import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.impl.Commons.extractFailReason; + /** * @author Piotr Jaszczyk * @since March 2019 @@ -55,11 +55,14 @@ import reactor.core.publisher.Mono; public class MessageRouterSubscriberImpl implements MessageRouterSubscriber { private final RxHttpClient httpClient; private final Gson gson; + private final ClientErrorReasonPresenter clientErrorReasonPresenter; private static final Logger LOGGER = LoggerFactory.getLogger(MessageRouterSubscriberImpl.class); - public MessageRouterSubscriberImpl(RxHttpClient httpClient, Gson gson) { + public MessageRouterSubscriberImpl(RxHttpClient httpClient, Gson gson, + ClientErrorReasonPresenter clientErrorReasonPresenter) { this.httpClient = httpClient; this.gson = gson; + this.clientErrorReasonPresenter = clientErrorReasonPresenter; } @Override @@ -67,7 +70,8 @@ public class MessageRouterSubscriberImpl implements MessageRouterSubscriber { LOGGER.debug("Requesting new items from DMaaP MR: {}", request); return httpClient.call(buildGetHttpRequest(request)) .map(this::buildGetResponse) - .doOnError(ReadTimeoutException.class, e -> LOGGER.error("Timeout exception occurred when subscribe items from DMaaP MR", e)) + .doOnError(ReadTimeoutException.class, + e -> LOGGER.error("Timeout exception occurred when subscribe items from DMaaP MR", e)) .onErrorResume(ReadTimeoutException.class, e -> createErrorResponse(ClientErrorReasons.TIMEOUT)); } @@ -91,7 +95,7 @@ public class MessageRouterSubscriberImpl implements MessageRouterSubscriber { : builder.failReason(extractFailReason(httpResponse)).build(); } - private List getAsJsonElements(HttpResponse httpResponse){ + private List getAsJsonElements(HttpResponse httpResponse) { JsonArray bodyAsJsonArray = httpResponse .bodyAsJson(StandardCharsets.UTF_8, gson, JsonArray.class); @@ -104,7 +108,7 @@ public class MessageRouterSubscriberImpl implements MessageRouterSubscriber { } private Mono createErrorResponse(ClientErrorReason clientErrorReason) { - String failReason = ClientErrorReasonPresenter.present(clientErrorReason); + String failReason = clientErrorReasonPresenter.present(clientErrorReason); return Mono.just(ImmutableMessageRouterSubscribeResponse.builder() .failReason(failReason) .build()); diff --git a/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/model/DmaapRequest.java b/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/model/DmaapRequest.java index 95c5e7d1..a5a87fbd 100644 --- a/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/model/DmaapRequest.java +++ b/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/model/DmaapRequest.java @@ -22,7 +22,7 @@ package org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model; import org.immutables.value.Value; import org.jetbrains.annotations.Nullable; -import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.TimeoutConfig; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.DmaapTimeoutConfig; import org.onap.dcaegen2.services.sdk.rest.services.model.logging.RequestDiagnosticContext; /** @@ -35,5 +35,5 @@ public interface DmaapRequest { return RequestDiagnosticContext.create(); } - @Nullable TimeoutConfig timeoutConfig(); + @Nullable DmaapTimeoutConfig timeoutConfig(); } diff --git a/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/model/config/DmaapClientConfiguration.java b/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/model/config/DmaapClientConfiguration.java index ac677f02..3e283511 100644 --- a/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/model/config/DmaapClientConfiguration.java +++ b/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/model/config/DmaapClientConfiguration.java @@ -2,7 +2,7 @@ * ============LICENSE_START==================================== * DCAEGEN2-SERVICES-SDK * ========================================================= - * Copyright (C) 2019 Nokia. All rights reserved. + * Copyright (C) 2019-2021 Nokia. All rights reserved. * ========================================================= * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -32,5 +32,8 @@ public interface DmaapClientConfiguration { default @Nullable SecurityKeys securityKeys() { return null; } - + @Value.Default + default @Nullable DmaapRetryConfig retryConfig(){ + return null; + } } diff --git a/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/model/config/DmaapRetryConfig.java b/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/model/config/DmaapRetryConfig.java new file mode 100644 index 00000000..f82edfc9 --- /dev/null +++ b/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/model/config/DmaapRetryConfig.java @@ -0,0 +1,64 @@ +/* + * ============LICENSE_START==================================== + * DCAEGEN2-SERVICES-SDK + * ========================================================= + * Copyright (C) 2021 Nokia. All rights reserved. + * ========================================================= + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END===================================== + */ + +package org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config; + +import io.netty.handler.timeout.ReadTimeoutException; +import io.vavr.collection.HashSet; +import io.vavr.collection.Set; +import org.immutables.value.Value; + +import java.net.ConnectException; + +@Value.Immutable +public interface DmaapRetryConfig { + + Set> RETRYABLE_EXCEPTIONS = HashSet.of(ReadTimeoutException.class, ConnectException.class); + RuntimeException ON_RETRY_EXHAUSTED_EXCEPTION = ReadTimeoutException.INSTANCE; + Set RETRYABLE_HTTP_CODES = HashSet.of(404, 408, 413, 429, 500, 502, 503, 504); + + @Value.Default + default int retryCount() { + return 3; + } + + @Value.Default + default int retryIntervalInSeconds() { + return 1; + } + + @Value.Check + default void validate() { + validateRetryCount(); + validateRetryInterval(); + } + + private void validateRetryCount() { + int rc = retryCount(); + if (rc < 0) + throw new IllegalArgumentException(String.format("Invalid value: %d, retryCount should be (0-n)", rc)); + } + + private void validateRetryInterval() { + long ri = retryIntervalInSeconds(); + if (ri < 1) + throw new IllegalArgumentException(String.format("Invalid value: %d, retryInterval should be (1-n)", ri)); + } +} diff --git a/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/model/config/DmaapTimeoutConfig.java b/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/model/config/DmaapTimeoutConfig.java new file mode 100644 index 00000000..0ece899b --- /dev/null +++ b/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/model/config/DmaapTimeoutConfig.java @@ -0,0 +1,34 @@ +/* + * ============LICENSE_START==================================== + * DCAEGEN2-SERVICES-SDK + * ========================================================= + * Copyright (C) 2021 Nokia. All rights reserved. + * ========================================================= + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END===================================== + */ + +package org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config; + +import org.immutables.value.Value; + +import java.time.Duration; + +@Value.Immutable +public interface DmaapTimeoutConfig { + + @Value.Default + default Duration getTimeout() { + return Duration.ofSeconds(4); + } +} diff --git a/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/model/config/TimeoutConfig.java b/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/model/config/TimeoutConfig.java deleted file mode 100644 index 413bf8e5..00000000 --- a/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/model/config/TimeoutConfig.java +++ /dev/null @@ -1,34 +0,0 @@ -/* - * ============LICENSE_START==================================== - * DCAEGEN2-SERVICES-SDK - * ========================================================= - * Copyright (C) 2020 Nokia. All rights reserved. - * ========================================================= - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END===================================== - */ - -package org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config; - -import org.immutables.value.Value; - -import java.time.Duration; - -@Value.Immutable -public interface TimeoutConfig { - - @Value.Default - default Duration getTimeout() { - return Duration.ofSeconds(4); - } -} diff --git a/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/MessageRouterTestsUtils.java b/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/MessageRouterTestsUtils.java index 2b8027c1..1a315806 100644 --- a/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/MessageRouterTestsUtils.java +++ b/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/MessageRouterTestsUtils.java @@ -38,7 +38,7 @@ import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRo import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishResponse; import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterSubscribeRequest; import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterSubscribeResponse; -import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.ImmutableTimeoutConfig; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.ImmutableDmaapTimeoutConfig; import reactor.core.publisher.Flux; import java.time.Duration; @@ -56,7 +56,7 @@ public final class MessageRouterTestsUtils { return ImmutableMessageRouterPublishRequest.builder() .sinkDefinition(createMessageRouterSink(topicUrl)) .contentType(ContentType.APPLICATION_JSON) - .timeoutConfig(ImmutableTimeoutConfig.builder() + .timeoutConfig(ImmutableDmaapTimeoutConfig.builder() .timeout(timeout) .build()) .build(); @@ -86,7 +86,7 @@ public final class MessageRouterTestsUtils { return ImmutableMessageRouterSubscribeRequest .builder() - .timeoutConfig(ImmutableTimeoutConfig.builder() + .timeoutConfig(ImmutableDmaapTimeoutConfig.builder() .timeout(timeout) .build()) .sourceDefinition(getImmutableMessageRouterSource(topicUrl)) diff --git a/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/DMaapContainer.java b/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/DMaapContainer.java index 494ca62a..5b1984df 100644 --- a/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/DMaapContainer.java +++ b/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/DMaapContainer.java @@ -2,7 +2,7 @@ * ============LICENSE_START==================================== * DCAEGEN2-SERVICES-SDK * ========================================================= - * Copyright (C) 2019 Nokia. All rights reserved. + * Copyright (C) 2019-2021 Nokia. All rights reserved. * ========================================================= * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -27,11 +27,10 @@ import java.net.URL; final class DMaapContainer { private static final String MR_COMPOSE_RESOURCE_NAME = "dmaap-msg-router/message-router-compose.yml"; - private static final String DOCKER_COMPOSE_FILE_PATH = getDockerComposeFilePath( - MR_COMPOSE_RESOURCE_NAME); + private static final String DOCKER_COMPOSE_FILE_PATH = getDockerComposeFilePath(MR_COMPOSE_RESOURCE_NAME); static final int DMAAP_SERVICE_EXPOSED_PORT = 3904; static final String DMAAP_SERVICE_NAME = "dmaap"; - static final int PROXY_SERVICE_EXPOSED_PORT = 8666; + static final int PROXY_MOCK_SERVICE_EXPOSED_PORT = 1080; static final String LOCALHOST = "localhost"; private DMaapContainer() {} @@ -43,11 +42,11 @@ final class DMaapContainer { .withLocalCompose(true); } - private static String getDockerComposeFilePath(String resourceName){ + private static String getDockerComposeFilePath(String resourceName) { URL resource = DMaapContainer.class.getClassLoader() .getResource(resourceName); - if(resource != null) return resource.getFile(); + if (resource != null) return resource.getFile(); else throw new DockerComposeNotFoundException(String .format("File %s does not exist", resourceName)); } diff --git a/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterPublisherIT.java b/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterPublisherIT.java index 24cd2c34..f6ef94b7 100644 --- a/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterPublisherIT.java +++ b/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterPublisherIT.java @@ -2,7 +2,7 @@ * ============LICENSE_START==================================== * DCAEGEN2-SERVICES-SDK * ========================================================= - * Copyright (C) 2019-2020 Nokia. All rights reserved. + * Copyright (C) 2019-2021 Nokia. All rights reserved. * ========================================================= * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -22,16 +22,21 @@ package org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api; import com.google.gson.JsonElement; import com.google.gson.JsonObject; -import eu.rekawek.toxiproxy.Proxy; -import eu.rekawek.toxiproxy.ToxiproxyClient; import io.vavr.collection.List; import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import org.mockserver.client.MockServerClient; +import org.mockserver.matchers.TimeToLive; +import org.mockserver.matchers.Times; +import org.mockserver.verify.VerificationTimes; import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.ContentType; import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishRequest; import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishResponse; import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterSubscribeRequest; import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterSubscribeResponse; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.ImmutableDmaapRetryConfig; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.ImmutableMessageRouterPublisherConfig; import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.MessageRouterPublisherConfig; import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.MessageRouterSubscriberConfig; import org.testcontainers.containers.DockerComposeContainer; @@ -41,11 +46,11 @@ import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.test.StepVerifier; -import java.io.IOException; import java.time.Duration; import java.util.concurrent.TimeUnit; -import static eu.rekawek.toxiproxy.model.ToxicDirection.DOWNSTREAM; +import static org.mockserver.model.HttpRequest.request; +import static org.mockserver.model.HttpResponse.response; import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.MessageRouterTestsUtils.createMRSubscribeRequest; import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.MessageRouterTestsUtils.createPublishRequest; import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.MessageRouterTestsUtils.errorPublishResponse; @@ -56,14 +61,19 @@ import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.MessageR import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.MessageRouterTestsUtils.successPublishResponse; import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.MessageRouterTestsUtils.successSubscribeResponse; import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.DMaapContainer.DMAAP_SERVICE_EXPOSED_PORT; -import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.DMaapContainer.DMAAP_SERVICE_NAME; import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.DMaapContainer.LOCALHOST; -import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.DMaapContainer.PROXY_SERVICE_EXPOSED_PORT; +import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.DMaapContainer.PROXY_MOCK_SERVICE_EXPOSED_PORT; +import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.DMaapContainer.createContainerInstance; @Testcontainers class MessageRouterPublisherIT { @Container - private static final DockerComposeContainer CONTAINER = DMaapContainer.createContainerInstance(); + private static final DockerComposeContainer CONTAINER = createContainerInstance(); + private static final MockServerClient MOCK_SERVER_CLIENT = new MockServerClient( + LOCALHOST, PROXY_MOCK_SERVICE_EXPOSED_PORT); + private static String EVENTS_PATH; + private static String PROXY_MOCK_EVENTS_PATH; + private static final Duration TIMEOUT = Duration.ofSeconds(10); private static final String DMAAP_400_ERROR_RESPONSE_FORMAT = "400 Bad Request\n" + "{" @@ -85,22 +95,21 @@ class MessageRouterPublisherIT { + "}" + "}" + "}"; - private static Proxy DMAAP_PROXY; - private static String EVENTS_PATH; - private static String PROXY_EVENTS_PATH; + private final MessageRouterPublisher publisher = DmaapClientFactory .createMessageRouterPublisher(MessageRouterPublisherConfig.createDefault()); private final MessageRouterSubscriber subscriber = DmaapClientFactory .createMessageRouterSubscriber(MessageRouterSubscriberConfig.createDefault()); @BeforeAll - static void setUp() throws IOException { + static void setUp() { EVENTS_PATH = String.format("http://%s:%d/events", LOCALHOST, DMAAP_SERVICE_EXPOSED_PORT); - PROXY_EVENTS_PATH = String.format("http://%s:%d/events", LOCALHOST, PROXY_SERVICE_EXPOSED_PORT); + PROXY_MOCK_EVENTS_PATH = String.format("http://%s:%d/events", LOCALHOST, PROXY_MOCK_SERVICE_EXPOSED_PORT); + } - DMAAP_PROXY = new ToxiproxyClient().createProxy("dmaapProxy", - String.format("[::]:%s", PROXY_SERVICE_EXPOSED_PORT), - String.format("%s:%d", DMAAP_SERVICE_NAME, DMAAP_SERVICE_EXPOSED_PORT)); + @BeforeEach + void set() { + MOCK_SERVER_CLIENT.reset(); } @Test @@ -302,17 +311,18 @@ class MessageRouterPublisherIT { } @Test - void publisher_shouldHandleClientTimeoutErrorWhenTimeoutDefined() throws IOException { + void publisher_shouldHandleClientTimeoutErrorWhenTimeoutDefined() { //given - final String toxic = "latency-toxic"; - DMAAP_PROXY.toxics() - .latency(toxic, DOWNSTREAM, TimeUnit.SECONDS.toMillis(5)); final String topic = "TOPIC10"; final List singleJsonMessage = List.of("{\"message\":\"message1\"}"); final Flux messageBatch = jsonBatch(singleJsonMessage); final MessageRouterPublishRequest mrRequest = createPublishRequest( - String.format("%s/%s", PROXY_EVENTS_PATH, topic), Duration.ofSeconds(1)); + String.format("%s/%s", PROXY_MOCK_EVENTS_PATH, topic), Duration.ofSeconds(1)); final MessageRouterPublishResponse expectedResponse = errorPublishResponse(TIMEOUT_ERROR_MESSAGE); + final String path = String.format("/events/%s", topic); + MOCK_SERVER_CLIENT + .when(request().withPath(path), Times.once()) + .respond(response().withDelay(TimeUnit.SECONDS, 2)); //when final Flux result = publisher.put(mrRequest, messageBatch); @@ -323,7 +333,75 @@ class MessageRouterPublisherIT { .expectComplete() .verify(TIMEOUT); - //cleanup - DMAAP_PROXY.toxics().get(toxic).remove(); + MOCK_SERVER_CLIENT.verify(request().withPath(path), VerificationTimes.exactly(1)); + } + + @Test + void publisher_shouldRetryWhenRetryableHttpCodeAndSuccessfullyPublish() { + final String topic = "TOPIC11"; + final String topicUrl = String.format("%s/%s", PROXY_MOCK_EVENTS_PATH, topic); + + final List singleJsonMessage = List.of("{\"message\":\"message1\"}"); + final List expectedItems = getAsJsonElements(singleJsonMessage); + final Flux plainBatch = plainBatch(singleJsonMessage); + + final MessageRouterPublishRequest publishRequest = createPublishRequest(topicUrl); + final MessageRouterPublishResponse expectedResponse = successPublishResponse(expectedItems); + + final String path = String.format("/events/%s", topic); + MOCK_SERVER_CLIENT + .when(request().withPath(path), Times.once()) + .respond(response().withStatusCode(404)); + final MessageRouterPublisher publisher = DmaapClientFactory.createMessageRouterPublisher(retryConfig()); + + //when + final Flux result = publisher.put(publishRequest, plainBatch); + + //then + StepVerifier.create(result) + .expectNext(expectedResponse) + .expectComplete() + .verify(); + + MOCK_SERVER_CLIENT.verify(request().withPath(path), VerificationTimes.exactly(2)); + } + + @Test + void publisher_shouldRetryWhenClientTimeoutAndSuccessfullyPublish() { + final String topic = "TOPIC12"; + final String topicUrl = String.format("%s/%s", PROXY_MOCK_EVENTS_PATH, topic); + + final List singleJsonMessage = List.of("{\"message\":\"message1\"}"); + final List expectedItems = getAsJsonElements(singleJsonMessage); + final Flux plainBatch = plainBatch(singleJsonMessage); + + final MessageRouterPublishRequest publishRequest = createPublishRequest(topicUrl, Duration.ofSeconds(1)); + final MessageRouterPublishResponse expectedResponse = successPublishResponse(expectedItems); + + final String path = String.format("/events/%s", topic); + MOCK_SERVER_CLIENT + .when(request().withPath(path), Times.once()) + .respond(response().withDelay(TimeUnit.SECONDS, 10)); + final MessageRouterPublisher publisher = DmaapClientFactory.createMessageRouterPublisher(retryConfig()); + + //when + final Flux result = publisher.put(publishRequest, plainBatch); + + //then + StepVerifier.create(result) + .expectNext(expectedResponse) + .expectComplete() + .verify(); + + MOCK_SERVER_CLIENT.verify(request().withPath(path), VerificationTimes.exactly(2)); + } + + private MessageRouterPublisherConfig retryConfig() { + return ImmutableMessageRouterPublisherConfig.builder() + .retryConfig(ImmutableDmaapRetryConfig.builder() + .retryIntervalInSeconds(1) + .retryCount(1) + .build()) + .build(); } } diff --git a/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterPublisherTest.java b/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterPublisherTest.java index b0a07eda..82b6661c 100644 --- a/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterPublisherTest.java +++ b/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterPublisherTest.java @@ -2,7 +2,7 @@ * ============LICENSE_START==================================== * DCAEGEN2-SERVICES-SDK * ========================================================= - * Copyright (C) 2019-2020 Nokia. All rights reserved. + * Copyright (C) 2019-2021 Nokia. All rights reserved. * ========================================================= * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -25,14 +25,18 @@ import com.google.gson.JsonPrimitive; import io.vavr.collection.List; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.CsvSource; import org.onap.dcaegen2.services.sdk.model.streams.dmaap.ImmutableMessageRouterSink; import org.onap.dcaegen2.services.sdk.model.streams.dmaap.MessageRouterSink; import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.test.DummyHttpServer; import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.ContentType; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.DmaapResponse; import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.ImmutableMessageRouterPublishRequest; import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.ImmutableMessageRouterPublishResponse; import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishRequest; import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishResponse; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.ImmutableDmaapTimeoutConfig; import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.MessageRouterPublisherConfig; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; @@ -40,8 +44,10 @@ import reactor.test.StepVerifier; import java.time.Duration; +import static org.assertj.core.api.Assertions.assertThat; import static org.onap.dcaegen2.services.sdk.rest.services.adapters.http.test.DummyHttpServer.sendError; import static org.onap.dcaegen2.services.sdk.rest.services.adapters.http.test.DummyHttpServer.sendString; +import static org.onap.dcaegen2.services.sdk.rest.services.adapters.http.test.DummyHttpServer.sendWithDelay; /** * @author Piotr Jaszczyk @@ -50,12 +56,14 @@ import static org.onap.dcaegen2.services.sdk.rest.services.adapters.http.test.Du class MessageRouterPublisherTest { private static final String ERROR_MESSAGE = "Something went wrong"; + private static final String TIMEOUT_ERROR_MESSAGE = "408 Request Timeout"; private static final String SUCCESS_RESP_TOPIC_PATH = "/events/TOPIC"; + private static final String DELAY_RESP_TOPIC_PATH = "/events/DELAY"; private static final String FAILING_WITH_400_RESP_PATH = "/events/TOPIC400"; private static final String FAILING_WITH_401_RESP_PATH = "/events/TOPIC401"; private static final String FAILING_WITH_403_RESP_PATH = "/events/TOPIC403"; private static final String FAILING_WITH_404_RESP_PATH = "/events/TOPIC404"; - private static final String FAILING_WITH_500_TOPIC_PATH = "/events/TOPIC500"; + private static final String FAILING_WITH_500_RESP_PATH = "/events/TOPIC500"; private static final Duration TIMEOUT = Duration.ofSeconds(10); private static final Flux messageBatch = Flux.just("ala", "ma", "kota") .map(JsonPrimitive::new); @@ -69,29 +77,22 @@ class MessageRouterPublisherTest { @BeforeAll static void setUp() { server = DummyHttpServer.start(routes -> routes - .post(SUCCESS_RESP_TOPIC_PATH, (req, resp) -> - sendString(resp, Mono.just("OK"))) - .post(FAILING_WITH_400_RESP_PATH, (req, resp) -> - sendError(resp, 400, ERROR_MESSAGE)) - .post(FAILING_WITH_401_RESP_PATH, (req, resp) -> - sendError(resp, 401, ERROR_MESSAGE)) - .post(FAILING_WITH_403_RESP_PATH, (req, resp) -> - sendError(resp, 403, ERROR_MESSAGE)) - .post(FAILING_WITH_404_RESP_PATH, (req, resp) -> - sendError(resp, 404, ERROR_MESSAGE)) - .post(FAILING_WITH_500_TOPIC_PATH, (req, resp) -> - sendError(resp, 500, ERROR_MESSAGE)) + .post(SUCCESS_RESP_TOPIC_PATH, (req, resp) -> sendString(resp, Mono.just("OK"))) + .post(DELAY_RESP_TOPIC_PATH, (req, resp) -> sendWithDelay(resp, 200, TIMEOUT)) + .post(FAILING_WITH_400_RESP_PATH, (req, resp) -> sendError(resp, 400, ERROR_MESSAGE)) + .post(FAILING_WITH_401_RESP_PATH, (req, resp) -> sendError(resp, 401, ERROR_MESSAGE)) + .post(FAILING_WITH_403_RESP_PATH, (req, resp) -> sendError(resp, 403, ERROR_MESSAGE)) + .post(FAILING_WITH_404_RESP_PATH, (req, resp) -> sendError(resp, 404, ERROR_MESSAGE)) + .post(FAILING_WITH_500_RESP_PATH, (req, resp) -> sendError(resp, 500, ERROR_MESSAGE)) ); } @Test void test_put_givenMessageBatch_shouldMakeSuccessfulPostRequestReturningBatch() { //given - final MessageRouterPublishRequest mrRequest = createMRRequest(SUCCESS_RESP_TOPIC_PATH, - ContentType.TEXT_PLAIN); + final MessageRouterPublishRequest mrRequest = createTextPlainMRRequest(SUCCESS_RESP_TOPIC_PATH); final List expectedItems = messageBatchItems.map(JsonPrimitive::new); - //when final Flux result = sut.put(mrRequest, messageBatch); @@ -102,13 +103,18 @@ class MessageRouterPublisherTest { .verify(TIMEOUT); } - @Test - void publisher_shouldHandleBadRequestError() { + @ParameterizedTest + @CsvSource({ + FAILING_WITH_400_RESP_PATH + "," + "400 Bad Request", + FAILING_WITH_401_RESP_PATH + "," + "401 Unauthorized", + FAILING_WITH_403_RESP_PATH + "," + "403 Forbidden", + FAILING_WITH_404_RESP_PATH + "," + "404 Not Found", + FAILING_WITH_500_RESP_PATH + "," + "500 Internal Server Error" + }) + void publisher_shouldHandleError(String failingPath, String failReason) { //given - final MessageRouterPublishRequest mrRequest = createMRRequest(FAILING_WITH_400_RESP_PATH, - ContentType.TEXT_PLAIN); - final MessageRouterPublishResponse expectedResponse = createErrorResponse( - "400 Bad Request\n%s", ERROR_MESSAGE); + final MessageRouterPublishRequest mrRequest = createTextPlainMRRequest(failingPath); + final MessageRouterPublishResponse expectedResponse = createErrorResponse(failReason); //when final Flux result = sut.put(mrRequest, messageBatch); @@ -121,83 +127,40 @@ class MessageRouterPublisherTest { } @Test - void publisher_shouldHandleUnauthorizedError() { + void publisher_shouldHandleClientTimeoutError() { //given - final MessageRouterPublishRequest mrRequest = createMRRequest(FAILING_WITH_401_RESP_PATH, - ContentType.TEXT_PLAIN); - final MessageRouterPublishResponse expectedResponse = createErrorResponse( - "401 Unauthorized\n%s", ERROR_MESSAGE); + final Duration requestTimeout = Duration.ofMillis(1); + final MessageRouterPublishRequest mrRequest = createTextPlainMRRequest(DELAY_RESP_TOPIC_PATH, requestTimeout); //when final Flux result = sut.put(mrRequest, messageBatch); //then StepVerifier.create(result) - .expectNext(expectedResponse) - .expectComplete() - .verify(TIMEOUT); - } - - @Test - void publisher_shouldHandleForbiddenError() { - //given - final MessageRouterPublishRequest mrRequest = createMRRequest(FAILING_WITH_403_RESP_PATH, - ContentType.TEXT_PLAIN); - final MessageRouterPublishResponse expectedResponse = createErrorResponse( - "403 Forbidden\n%s", ERROR_MESSAGE); - - //when - final Flux result = sut - .put(mrRequest, messageBatch); - - //then - StepVerifier.create(result) - .expectNext(expectedResponse) + .consumeNextWith(this::assertTimeoutError) .expectComplete() .verify(TIMEOUT); } - @Test - void publisher_shouldHandleNotFoundError() { - //given - final MessageRouterPublishRequest mrRequest = createMRRequest(FAILING_WITH_404_RESP_PATH, - ContentType.TEXT_PLAIN); - final MessageRouterPublishResponse expectedResponse = createErrorResponse( - "404 Not Found\n%s", ERROR_MESSAGE); - - //when - final Flux result = sut - .put(mrRequest, messageBatch); - - //then - StepVerifier.create(result) - .expectNext(expectedResponse) - .expectComplete() - .verify(TIMEOUT); + private static MessageRouterPublishRequest createTextPlainMRRequest(String topicPath) { + final MessageRouterSink sinkDefinition = createMRSink(topicPath); + return ImmutableMessageRouterPublishRequest.builder() + .sinkDefinition(sinkDefinition) + .contentType(ContentType.TEXT_PLAIN) + .build(); } - @Test - void publisher_shouldHandleInternalServerError() { - //given - final MessageRouterPublishRequest mrRequest = createMRRequest(FAILING_WITH_500_TOPIC_PATH, - ContentType.TEXT_PLAIN); - final MessageRouterPublishResponse expectedResponse = createErrorResponse( - "500 Internal Server Error\n%s", ERROR_MESSAGE); - - //when - final Flux result = sut - .put(mrRequest, messageBatch); - - //then - StepVerifier.create(result) - .expectNext(expectedResponse) - .expectComplete() - .verify(TIMEOUT); + private static MessageRouterPublishRequest createTextPlainMRRequest(String topicPath, Duration timeout) { + final MessageRouterSink sinkDefinition = createMRSink(topicPath); + return ImmutableMessageRouterPublishRequest.builder() + .sinkDefinition(sinkDefinition) + .contentType(ContentType.TEXT_PLAIN) + .timeoutConfig(ImmutableDmaapTimeoutConfig.builder().timeout(timeout).build()) + .build(); } - - private MessageRouterPublishRequest createMRRequest(String topicPath, ContentType contentType) { - final MessageRouterSink sinkDefinition = ImmutableMessageRouterSink.builder() + private static MessageRouterSink createMRSink(String topicPath) { + return ImmutableMessageRouterSink.builder() .name("the topic") .topicUrl(String.format("http://%s:%d%s", server.host(), @@ -205,18 +168,19 @@ class MessageRouterPublisherTest { topicPath) ) .build(); - - return ImmutableMessageRouterPublishRequest.builder() - .sinkDefinition(sinkDefinition) - .contentType(contentType) - .build(); } - private MessageRouterPublishResponse createErrorResponse(String failReasonFormat, Object... formatArgs) { + private static MessageRouterPublishResponse createErrorResponse(String failReason) { + String failReasonFormat = failReason + "\n%s"; return ImmutableMessageRouterPublishResponse .builder() - .failReason(String.format(failReasonFormat, formatArgs)) + .failReason(String.format(failReasonFormat, ERROR_MESSAGE)) .build(); } + + private void assertTimeoutError(DmaapResponse response) { + assertThat(response.failed()).isTrue(); + assertThat(response.failReason()).startsWith(TIMEOUT_ERROR_MESSAGE); + } } diff --git a/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterSubscriberIT.java b/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterSubscriberIT.java index bd161aab..1f4e499d 100644 --- a/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterSubscriberIT.java +++ b/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterSubscriberIT.java @@ -22,14 +22,18 @@ package org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api; import com.google.gson.JsonElement; import com.google.gson.JsonObject; -import eu.rekawek.toxiproxy.Proxy; -import eu.rekawek.toxiproxy.ToxiproxyClient; import io.vavr.collection.List; import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import org.mockserver.client.MockServerClient; +import org.mockserver.matchers.Times; +import org.mockserver.verify.VerificationTimes; import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishRequest; import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterSubscribeRequest; import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterSubscribeResponse; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.ImmutableDmaapRetryConfig; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.ImmutableMessageRouterSubscriberConfig; import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.MessageRouterPublisherConfig; import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.MessageRouterSubscriberConfig; import org.testcontainers.containers.DockerComposeContainer; @@ -39,11 +43,11 @@ import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.test.StepVerifier; -import java.io.IOException; import java.time.Duration; import java.util.concurrent.TimeUnit; -import static eu.rekawek.toxiproxy.model.ToxicDirection.DOWNSTREAM; +import static org.mockserver.model.HttpRequest.request; +import static org.mockserver.model.HttpResponse.response; import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.MessageRouterTestsUtils.createMRSubscribeRequest; import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.MessageRouterTestsUtils.createPublishRequest; import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.MessageRouterTestsUtils.errorSubscribeResponse; @@ -52,19 +56,23 @@ import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.MessageR import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.MessageRouterTestsUtils.jsonBatch; import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.MessageRouterTestsUtils.registerTopic; import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.MessageRouterTestsUtils.successSubscribeResponse; - import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.DMaapContainer.DMAAP_SERVICE_EXPOSED_PORT; -import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.DMaapContainer.DMAAP_SERVICE_NAME; import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.DMaapContainer.LOCALHOST; -import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.DMaapContainer.PROXY_SERVICE_EXPOSED_PORT; +import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.DMaapContainer.PROXY_MOCK_SERVICE_EXPOSED_PORT; +import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.DMaapContainer.createContainerInstance; @Testcontainers class MessageRouterSubscriberIT { + @Container + private static final DockerComposeContainer CONTAINER = createContainerInstance(); + private static final MockServerClient MOCK_SERVER_CLIENT = new MockServerClient( + LOCALHOST, PROXY_MOCK_SERVICE_EXPOSED_PORT); + private static String EVENTS_PATH; + private static String PROXY_MOCK_EVENTS_PATH; + private static final Duration TIMEOUT = Duration.ofSeconds(10); private static final String CONSUMER_GROUP = "group1"; private static final String CONSUMER_ID = "consumer200"; - private static String PROXY_EVENTS_PATH; - private static Proxy DMAAP_PROXY; private static final String DMAAP_404_ERROR_RESPONSE_FORMAT = "404 Not Found\n" + "{" + "\"mrstatus\":3001," + @@ -85,27 +93,21 @@ class MessageRouterSubscriberIT { + "}" + "}"; - @Container - private static final DockerComposeContainer CONTAINER = DMaapContainer.createContainerInstance(); - - private static String EVENTS_PATH; - private MessageRouterPublisher publisher = DmaapClientFactory .createMessageRouterPublisher(MessageRouterPublisherConfig.createDefault()); private MessageRouterSubscriber subscriber = DmaapClientFactory .createMessageRouterSubscriber(MessageRouterSubscriberConfig.createDefault()); - @BeforeAll - static void setUp() throws IOException { + static void setUp() { EVENTS_PATH = String.format("http://%s:%d/events", LOCALHOST, DMAAP_SERVICE_EXPOSED_PORT); - PROXY_EVENTS_PATH = String.format("http://%s:%d/events", LOCALHOST, PROXY_SERVICE_EXPOSED_PORT); - - DMAAP_PROXY = new ToxiproxyClient().createProxy("dmaapProxy", - String.format("[::]:%s", PROXY_SERVICE_EXPOSED_PORT), - String.format("%s:%d", DMAAP_SERVICE_NAME, DMAAP_SERVICE_EXPOSED_PORT)); + PROXY_MOCK_EVENTS_PATH = String.format("http://%s:%d/events", LOCALHOST, PROXY_MOCK_SERVICE_EXPOSED_PORT); } + @BeforeEach + void set() { + MOCK_SERVER_CLIENT.reset(); + } @Test void subscriber_shouldHandleNoSuchTopicException() { @@ -128,7 +130,7 @@ class MessageRouterSubscriberIT { } @Test - void subscriberShouldHandleSingleItemResponse(){ + void subscriberShouldHandleSingleItemResponse() { //given final String topic = "TOPIC"; final String topicUrl = String.format("%s/%s", EVENTS_PATH, topic); @@ -207,7 +209,7 @@ class MessageRouterSubscriberIT { } @Test - void subscriber_shouldSubscribeToTopic(){ + void subscriber_shouldSubscribeToTopic() { //given final String topic = "TOPIC4"; final String topicUrl = String.format("%s/%s", EVENTS_PATH, topic); @@ -234,17 +236,16 @@ class MessageRouterSubscriberIT { } @Test - void subscriber_shouldHandleTimeoutException() throws IOException { + void subscriber_shouldHandleTimeoutException() { //given - final String topic = "newTopic"; + final String topic = "TOPIC5"; final MessageRouterSubscribeRequest mrSubscribeRequest = createMRSubscribeRequest( - String.format("%s/%s", PROXY_EVENTS_PATH, topic), CONSUMER_GROUP, CONSUMER_ID, Duration.ofSeconds(1)); - final MessageRouterSubscribeResponse expectedResponse = errorSubscribeResponse( - TIMEOUT_ERROR_MESSAGE); - - final String toxic = "latency-toxic"; - DMAAP_PROXY.toxics() - .latency(toxic, DOWNSTREAM, TimeUnit.SECONDS.toMillis(5)); + String.format("%s/%s", PROXY_MOCK_EVENTS_PATH, topic), CONSUMER_GROUP, CONSUMER_ID, Duration.ofSeconds(1)); + final MessageRouterSubscribeResponse expectedResponse = errorSubscribeResponse(TIMEOUT_ERROR_MESSAGE); + final String path = String.format("/events/%s/%s/%s", topic, CONSUMER_GROUP, CONSUMER_ID); + MOCK_SERVER_CLIENT + .when(request().withPath(path), Times.once()) + .respond(response().withDelay(TimeUnit.SECONDS, 5)); //when Mono response = subscriber @@ -256,7 +257,88 @@ class MessageRouterSubscriberIT { .expectComplete() .verify(TIMEOUT); - //cleanup - DMAAP_PROXY.toxics().get(toxic).remove(); + MOCK_SERVER_CLIENT.verify(request().withPath(path), VerificationTimes.exactly(1)); + } + + @Test + void subscriber_shouldRetryWhenRetryableHttpCodeAndSuccessfullySubscribe() { + //given + final String topic = "TOPIC6"; + final String proxyTopicUrl = String.format("%s/%s", PROXY_MOCK_EVENTS_PATH, topic); + final String topicUrl = String.format("%s/%s", EVENTS_PATH, topic); + final MessageRouterPublishRequest publishRequest = createPublishRequest(proxyTopicUrl); + final MessageRouterSubscribeRequest subscribeRequest = createMRSubscribeRequest(proxyTopicUrl, CONSUMER_GROUP, CONSUMER_ID); + + final List singleJsonMessage = List.of("{\"message\":\"message1\"}"); + final List expectedItems = getAsJsonElements(singleJsonMessage); + final Flux jsonMessageBatch = jsonBatch(singleJsonMessage); + final MessageRouterSubscribeResponse expectedResponse = successSubscribeResponse(expectedItems); + + final String path = String.format("/events/%s/%s/%s", topic, CONSUMER_GROUP, CONSUMER_ID); + MOCK_SERVER_CLIENT + .when(request().withPath(path), Times.once()) + .respond(response().withStatusCode(404)); + final MessageRouterSubscriber subscriber = DmaapClientFactory.createMessageRouterSubscriber(retryConfig()); + + //when + registerTopic(publisher, createPublishRequest(topicUrl), subscriber, + createMRSubscribeRequest(topicUrl, CONSUMER_GROUP, CONSUMER_ID)); + Mono response = publisher + .put(publishRequest, jsonMessageBatch) + .then(subscriber.get(subscribeRequest)); + + //then + StepVerifier.create(response) + .expectNext(expectedResponse) + .expectComplete() + .verify(); + + MOCK_SERVER_CLIENT.verify(request().withPath(path), VerificationTimes.exactly(2)); + } + + @Test + void subscriber_shouldRetryWhenClientTimeoutAndSuccessfullySubscribe() { + //given + final String topic = "TOPIC7"; + final String proxyTopicUrl = String.format("%s/%s", PROXY_MOCK_EVENTS_PATH, topic); + final String topicUrl = String.format("%s/%s", EVENTS_PATH, topic); + final MessageRouterPublishRequest publishRequest = createPublishRequest(proxyTopicUrl); + final MessageRouterSubscribeRequest subscribeRequest = createMRSubscribeRequest( + proxyTopicUrl, CONSUMER_GROUP, CONSUMER_ID, Duration.ofSeconds(1)); + + final List singleJsonMessage = List.of("{\"message\":\"message1\"}"); + final List expectedItems = getAsJsonElements(singleJsonMessage); + final Flux jsonMessageBatch = jsonBatch(singleJsonMessage); + final MessageRouterSubscribeResponse expectedResponse = successSubscribeResponse(expectedItems); + + final String path = String.format("/events/%s/%s/%s", topic, CONSUMER_GROUP, CONSUMER_ID); + MOCK_SERVER_CLIENT + .when(request().withPath(path), Times.once()) + .respond(response().withDelay(TimeUnit.SECONDS, 10)); + final MessageRouterSubscriber subscriber = DmaapClientFactory.createMessageRouterSubscriber(retryConfig()); + + //when + registerTopic(publisher, createPublishRequest(topicUrl), subscriber, + createMRSubscribeRequest(topicUrl, CONSUMER_GROUP, CONSUMER_ID)); + Mono response = publisher + .put(publishRequest, jsonMessageBatch) + .then(subscriber.get(subscribeRequest)); + + //then + StepVerifier.create(response) + .expectNext(expectedResponse) + .expectComplete() + .verify(); + + MOCK_SERVER_CLIENT.verify(request().withPath(path), VerificationTimes.exactly(2)); + } + + private MessageRouterSubscriberConfig retryConfig() { + return ImmutableMessageRouterSubscriberConfig.builder() + .retryConfig(ImmutableDmaapRetryConfig.builder() + .retryIntervalInSeconds(1) + .retryCount(1) + .build()) + .build(); } } diff --git a/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterSubscriberTest.java b/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterSubscriberTest.java index 18584789..06875394 100644 --- a/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterSubscriberTest.java +++ b/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterSubscriberTest.java @@ -2,7 +2,7 @@ * ============LICENSE_START==================================== * DCAEGEN2-SERVICES-SDK * ========================================================= - * Copyright (C) 2019 Nokia. All rights reserved. + * Copyright (C) 2019-2021 Nokia. All rights reserved. * ========================================================= * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -20,30 +20,34 @@ package org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api; -import static org.onap.dcaegen2.services.sdk.rest.services.adapters.http.test.DummyHttpServer.sendError; -import static org.onap.dcaegen2.services.sdk.rest.services.adapters.http.test.DummyHttpServer.sendResource; - import com.google.gson.JsonElement; import com.google.gson.JsonPrimitive; import io.vavr.collection.List; - -import java.time.Duration; - import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.CsvSource; import org.onap.dcaegen2.services.sdk.model.streams.dmaap.ImmutableMessageRouterSource; import org.onap.dcaegen2.services.sdk.model.streams.dmaap.MessageRouterSource; import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.test.DummyHttpServer; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.DmaapResponse; import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.ImmutableMessageRouterSubscribeRequest; import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.ImmutableMessageRouterSubscribeResponse; import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterSubscribeRequest; import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterSubscribeResponse; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.ImmutableDmaapTimeoutConfig; import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.MessageRouterSubscriberConfig; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; -import reactor.netty.http.server.HttpServerRoutes; import reactor.test.StepVerifier; +import java.time.Duration; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.onap.dcaegen2.services.sdk.rest.services.adapters.http.test.DummyHttpServer.sendError; +import static org.onap.dcaegen2.services.sdk.rest.services.adapters.http.test.DummyHttpServer.sendResource; +import static org.onap.dcaegen2.services.sdk.rest.services.adapters.http.test.DummyHttpServer.sendWithDelay; + /** * @author Piotr Jaszczyk * @since May 2019 @@ -51,8 +55,10 @@ import reactor.test.StepVerifier; class MessageRouterSubscriberTest { private static final Duration TIMEOUT = Duration.ofSeconds(10); private static final String ERROR_MESSAGE = "Something went wrong"; + private static final String TIMEOUT_ERROR_MESSAGE = "408 Request Timeout"; private static final String CONSUMER_GROUP = "group1"; private static final String SUCCESS_CONSUMER_ID = "consumer200"; + private static final String DELAY_CONSUMER_ID = "delay200"; private static final String FAILING_WITH_401_CONSUMER_ID = "consumer401"; private static final String FAILING_WITH_403_CONSUMER_ID = "consumer403"; private static final String FAILING_WITH_409_CONSUMER_ID = "consumer409"; @@ -63,6 +69,8 @@ class MessageRouterSubscriberTest { private static final String SUCCESS_RESP_PATH = String .format("%s/%s", CONSUMER_PATH, SUCCESS_CONSUMER_ID); + private static final String DELAY_RESP_PATH = String + .format("%s/%s", CONSUMER_PATH, DELAY_CONSUMER_ID); private static final String FAILING_WITH_401_RESP_PATH = String .format("%s/%s", CONSUMER_PATH, FAILING_WITH_401_CONSUMER_ID); private static final String FAILING_WITH_403_RESP_PATH = String @@ -83,7 +91,15 @@ class MessageRouterSubscriberTest { @BeforeAll static void setUp() { - DummyHttpServer server = DummyHttpServer.start(MessageRouterSubscriberTest::setRoutes); + DummyHttpServer server = DummyHttpServer.start(routes -> routes + .get(SUCCESS_RESP_PATH, (req, resp) -> + sendResource(resp, "/sample-mr-subscribe-response.json")) + .get(DELAY_RESP_PATH, (req, resp) -> sendWithDelay(resp, 200, TIMEOUT)) + .get(FAILING_WITH_401_RESP_PATH, (req, resp) -> sendError(resp, 401, ERROR_MESSAGE)) + .get(FAILING_WITH_403_RESP_PATH, (req, resp) -> sendError(resp, 403, ERROR_MESSAGE)) + .get(FAILING_WITH_409_RESP_PATH, (req, resp) -> sendError(resp, 409, ERROR_MESSAGE)) + .get(FAILING_WITH_429_RESP_PATH, (req, resp) -> sendError(resp, 429, ERROR_MESSAGE)) + .get(FAILING_WITH_500_RESP_PATH, (req, resp) -> sendError(resp, 500, ERROR_MESSAGE))); sourceDefinition = createMessageRouterSource(server); @@ -110,69 +126,19 @@ class MessageRouterSubscriberTest { .verify(TIMEOUT); } - @Test - void subscriber_shouldGetUnauthorizedErrorResponse() { - MessageRouterSubscribeRequest request = createFailingRequest(FAILING_WITH_401_CONSUMER_ID); - Mono response = sut.get(request); - - MessageRouterSubscribeResponse expectedResponse = createErrorResponse(String - .format("401 Unauthorized\n%s", ERROR_MESSAGE)); - - StepVerifier.create(response) - .expectNext(expectedResponse) - .expectComplete() - .verify(TIMEOUT); - } - - @Test - void subscriber_shouldGetForbiddenErrorResponse() { - MessageRouterSubscribeRequest request = createFailingRequest(FAILING_WITH_403_CONSUMER_ID); - Mono response = sut.get(request); - - MessageRouterSubscribeResponse expectedResponse = createErrorResponse(String - .format("403 Forbidden\n%s", ERROR_MESSAGE)); - - StepVerifier.create(response) - .expectNext(expectedResponse) - .expectComplete() - .verify(TIMEOUT); - } - - @Test - void subscriber_shouldGetConflictErrorResponse() { - MessageRouterSubscribeRequest request = createFailingRequest(FAILING_WITH_409_CONSUMER_ID); + @ParameterizedTest + @CsvSource({ + FAILING_WITH_401_CONSUMER_ID + "," + "401 Unauthorized", + FAILING_WITH_403_CONSUMER_ID + "," + "403 Forbidden", + FAILING_WITH_409_CONSUMER_ID + "," + "409 Conflict", + FAILING_WITH_429_CONSUMER_ID + "," + "429 Too Many Requests", + FAILING_WITH_500_CONSUMER_ID + "," + "500 Internal Server Error" + }) + void subscriber_shouldHandleError(String consumerId, String failReason) { + MessageRouterSubscribeRequest request = createFailingRequest(consumerId); Mono response = sut.get(request); - MessageRouterSubscribeResponse expectedResponse = createErrorResponse(String - .format("409 Conflict\n%s", ERROR_MESSAGE)); - - StepVerifier.create(response) - .expectNext(expectedResponse) - .expectComplete() - .verify(TIMEOUT); - } - - @Test - void subscriber_shouldGetTooManyRequestsErrorResponse() { - MessageRouterSubscribeRequest request = createFailingRequest(FAILING_WITH_429_CONSUMER_ID); - Mono response = sut.get(request); - - MessageRouterSubscribeResponse expectedResponse = createErrorResponse(String - .format("429 Too Many Requests\n%s", ERROR_MESSAGE)); - - StepVerifier.create(response) - .expectNext(expectedResponse) - .expectComplete() - .verify(TIMEOUT); - } - - @Test - void subscriber_shouldGetInternalServerErrorResponse() { - Mono response = sut - .get(mrFailingRequest); - - MessageRouterSubscribeResponse expectedResponse = createErrorResponse(String - .format("500 Internal Server Error\n%s", ERROR_MESSAGE)); + MessageRouterSubscribeResponse expectedResponse = createErrorResponse(failReason); StepVerifier.create(response) .expectNext(expectedResponse) @@ -226,20 +192,16 @@ class MessageRouterSubscriberTest { .verify(TIMEOUT); } - private static HttpServerRoutes setRoutes(HttpServerRoutes routes) { - return routes - .get(SUCCESS_RESP_PATH, (req, resp) -> - sendResource(resp, "/sample-mr-subscribe-response.json")) - .get(FAILING_WITH_401_RESP_PATH, (req, resp) -> - sendError(resp, 401, ERROR_MESSAGE)) - .get(FAILING_WITH_403_RESP_PATH, (req, resp) -> - sendError(resp, 403, ERROR_MESSAGE)) - .get(FAILING_WITH_409_RESP_PATH, (req, resp) -> - sendError(resp, 409, ERROR_MESSAGE)) - .get(FAILING_WITH_429_RESP_PATH, (req, resp) -> - sendError(resp, 429, ERROR_MESSAGE)) - .get(FAILING_WITH_500_RESP_PATH, (req, resp) -> - sendError(resp, 500, ERROR_MESSAGE)); + @Test + void subscriber_shouldHandleClientTimeoutError() { + Duration requestTimeout = Duration.ofMillis(1); + MessageRouterSubscribeRequest request = createDelayRequest(DELAY_CONSUMER_ID, requestTimeout); + Mono response = sut.get(request); + + StepVerifier.create(response) + .consumeNextWith(this::assertTimeoutError) + .expectComplete() + .verify(TIMEOUT); } private static MessageRouterSource createMessageRouterSource(DummyHttpServer server) { @@ -257,6 +219,15 @@ class MessageRouterSubscriberTest { .build(); } + private static MessageRouterSubscribeRequest createDelayRequest(String consumerId, Duration timeout) { + return ImmutableMessageRouterSubscribeRequest.builder() + .sourceDefinition(sourceDefinition) + .consumerGroup(CONSUMER_GROUP) + .consumerId(consumerId) + .timeoutConfig(ImmutableDmaapTimeoutConfig.builder().timeout(timeout).build()) + .build(); + } + private static MessageRouterSubscribeRequest createFailingRequest(String consumerId) { return ImmutableMessageRouterSubscribeRequest .builder() @@ -266,11 +237,17 @@ class MessageRouterSubscriberTest { .build(); } - private static MessageRouterSubscribeResponse createErrorResponse(String failReason) { + private MessageRouterSubscribeResponse createErrorResponse(String failReason) { + String failReasonFormat = failReason + "\n%s"; return ImmutableMessageRouterSubscribeResponse .builder() - .failReason(failReason) + .failReason(String.format(failReasonFormat, ERROR_MESSAGE)) .build(); } + + private void assertTimeoutError(DmaapResponse response) { + assertThat(response.failed()).isTrue(); + assertThat(response.failReason()).startsWith(TIMEOUT_ERROR_MESSAGE); + } } diff --git a/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/error/ClientErrorReasonPresenterTest.java b/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/error/ClientErrorReasonPresenterTest.java index 9b318d73..76d7a381 100644 --- a/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/error/ClientErrorReasonPresenterTest.java +++ b/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/error/ClientErrorReasonPresenterTest.java @@ -2,7 +2,7 @@ * ============LICENSE_START==================================== * DCAEGEN2-SERVICES-SDK * ========================================================= - * Copyright (C) 2020 Nokia. All rights reserved. + * Copyright (C) 2020-2021 Nokia. All rights reserved. * ========================================================= * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -34,7 +34,7 @@ class ClientErrorReasonPresenterTest { + "{\"requestError\":{\"serviceException\":{\"messageId\":\"messageId\",\"text\":\"text\"}}}"; //when - String actual = ClientErrorReasonPresenter.present(clientErrorReason); + String actual = new ClientErrorReasonPresenter().present(clientErrorReason); //then assertThat(actual).isEqualTo(expected); @@ -48,7 +48,7 @@ class ClientErrorReasonPresenterTest { + "{\"requestError\":{\"serviceException\":{\"messageId\":\"messageId\",\"text\":\"text\",\"variables\":[\"v1\",\"v2\"]}}}"; //when - String actual = ClientErrorReasonPresenter.present(clientErrorReason); + String actual = new ClientErrorReasonPresenter().present(clientErrorReason); //then assertThat(actual).isEqualTo(expected); diff --git a/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/impl/MessageRouterPublisherImplTest.java b/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/impl/MessageRouterPublisherImplTest.java index f29bfa27..2825a87c 100644 --- a/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/impl/MessageRouterPublisherImplTest.java +++ b/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/impl/MessageRouterPublisherImplTest.java @@ -2,7 +2,7 @@ * ============LICENSE_START==================================== * DCAEGEN2-SERVICES-SDK * ========================================================= - * Copyright (C) 2019-2020 Nokia. All rights reserved. + * Copyright (C) 2019-2021 Nokia. All rights reserved. * ========================================================= * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -39,6 +39,7 @@ import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.ImmutableHttpR import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.RxHttpClient; import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.ContentType; import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.MessageRouterPublisher; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.error.ClientErrorReasonPresenter; import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishRequest; import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishResponse; import reactor.core.publisher.Flux; @@ -69,9 +70,11 @@ class MessageRouterPublisherImplTest { private static final Duration TIMEOUT = Duration.ofSeconds(5); private static final String TOPIC_URL = "https://dmaap-mr/TOPIC"; private static final int MAX_BATCH_SIZE = 3; - public static final String TIMEOUT_ERROR_MESSAGE_HEADER = "408 Request Timeout"; + private static final String ERROR_MESSAGE = "Something went wrong"; private final RxHttpClient httpClient = mock(RxHttpClient.class); - private final MessageRouterPublisher cut = new MessageRouterPublisherImpl(httpClient, MAX_BATCH_SIZE, Duration.ofMinutes(1)); + private final ClientErrorReasonPresenter clientErrorReasonPresenter = mock(ClientErrorReasonPresenter.class); + private final MessageRouterPublisher cut = new MessageRouterPublisherImpl( + httpClient, MAX_BATCH_SIZE, Duration.ofMinutes(1), clientErrorReasonPresenter); private final ArgumentCaptor httpRequestArgumentCaptor = ArgumentCaptor.forClass(HttpRequest.class); private final MessageRouterPublishRequest plainPublishRequest = createPublishRequest(TOPIC_URL, ContentType.TEXT_PLAIN); private final MessageRouterPublishRequest jsonPublishRequest = createPublishRequest(TOPIC_URL); @@ -417,7 +420,10 @@ class MessageRouterPublisherImplTest { final List plainMessage = List.of("I", "like", "cookies"); final Flux plainMessagesMaxBatch = plainBatch(plainMessage); - given(httpClient.call(any(HttpRequest.class))).willReturn(Mono.error(ReadTimeoutException.INSTANCE)); + given(clientErrorReasonPresenter.present(any())) + .willReturn(ERROR_MESSAGE); + given(httpClient.call(any(HttpRequest.class))) + .willReturn(Mono.error(ReadTimeoutException.INSTANCE)); // when final Flux responses = cut @@ -439,9 +445,12 @@ class MessageRouterPublisherImplTest { final List parsedThreeMessages = getAsJsonObjects(threeJsonMessages); final Flux doubleJsonMessageBatch = jsonBatch(threeJsonMessages.appendAll(twoJsonMessages)); + given(clientErrorReasonPresenter.present(any())) + .willReturn(ERROR_MESSAGE); given(httpClient.call(any(HttpRequest.class))) .willReturn(Mono.just(successHttpResponse)) .willReturn(Mono.error(ReadTimeoutException.INSTANCE)); + // when final Flux responses = cut .put(jsonPublishRequest, doubleJsonMessageBatch); @@ -463,9 +472,12 @@ class MessageRouterPublisherImplTest { final List parsedTwoMessages = getAsJsonObjects(twoJsonMessages); final Flux doubleJsonMessageBatch = jsonBatch(threeJsonMessages.appendAll(twoJsonMessages)); + given(clientErrorReasonPresenter.present(any())) + .willReturn(ERROR_MESSAGE); given(httpClient.call(any(HttpRequest.class))) .willReturn(Mono.error(ReadTimeoutException.INSTANCE)) .willReturn(Mono.just(successHttpResponse)); + // when final Flux responses = cut .put(jsonPublishRequest, doubleJsonMessageBatch); @@ -540,7 +552,7 @@ class MessageRouterPublisherImplTest { private void assertTimeoutError(MessageRouterPublishResponse response) { assertThat(response.failed()).isTrue(); assertThat(response.items()).isEmpty(); - assertThat(response.failReason()).startsWith(TIMEOUT_ERROR_MESSAGE_HEADER); + assertThat(response.failReason()).isEqualTo(ERROR_MESSAGE); } private void verifySingleResponse(List threeMessages, diff --git a/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/impl/MessageRouterSubscriberImplTest.java b/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/impl/MessageRouterSubscriberImplTest.java index 1f97001e..0396eff9 100644 --- a/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/impl/MessageRouterSubscriberImplTest.java +++ b/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/impl/MessageRouterSubscriberImplTest.java @@ -35,6 +35,7 @@ import org.onap.dcaegen2.services.sdk.model.streams.dmaap.ImmutableMessageRouter import org.onap.dcaegen2.services.sdk.model.streams.dmaap.MessageRouterSource; import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.*; import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.MessageRouterSubscriber; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.error.ClientErrorReasonPresenter; import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.ImmutableMessageRouterSubscribeRequest; import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterSubscribeRequest; import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterSubscribeResponse; @@ -47,10 +48,12 @@ import reactor.core.publisher.Mono; */ class MessageRouterSubscriberImplTest { + private static final String ERROR_MESSAGE = "Something went wrong"; private final RxHttpClient httpClient = mock(RxHttpClient.class); + private final ClientErrorReasonPresenter clientErrorReasonPresenter = mock(ClientErrorReasonPresenter.class); private final MessageRouterSubscriberConfig clientConfig = MessageRouterSubscriberConfig.createDefault(); private final MessageRouterSubscriber - cut = new MessageRouterSubscriberImpl(httpClient, clientConfig.gsonInstance()); + cut = new MessageRouterSubscriberImpl(httpClient, clientConfig.gsonInstance(),clientErrorReasonPresenter); private final ArgumentCaptor httpRequestArgumentCaptor = ArgumentCaptor.forClass(HttpRequest.class); private final MessageRouterSource sourceDefinition = ImmutableMessageRouterSource.builder() @@ -136,7 +139,10 @@ class MessageRouterSubscriberImplTest { void getWithProperRequest_shouldReturnTimeoutError() { // given - given(httpClient.call(any(HttpRequest.class))).willReturn(Mono.error(ReadTimeoutException.INSTANCE)); + given(clientErrorReasonPresenter.present(any())) + .willReturn(ERROR_MESSAGE); + given(httpClient.call(any(HttpRequest.class))) + .willReturn(Mono.error(ReadTimeoutException.INSTANCE)); // when final Mono responses = cut @@ -145,7 +151,7 @@ class MessageRouterSubscriberImplTest { // then assertThat(response.failed()).isTrue(); - assertThat(response.failReason()).contains("408 Request Timeout"); + assertThat(response.failReason()).isEqualTo(ERROR_MESSAGE); assertThat(response.hasElements()).isFalse(); @@ -156,4 +162,4 @@ class MessageRouterSubscriberImplTest { mrRequest.consumerGroup(), mrRequest.consumerId())); assertThat(httpRequest.body()).isNull(); } -} \ No newline at end of file +} diff --git a/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/model/config/DmaapRetryConfigTest.java b/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/model/config/DmaapRetryConfigTest.java new file mode 100644 index 00000000..da3a88fe --- /dev/null +++ b/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/model/config/DmaapRetryConfigTest.java @@ -0,0 +1,71 @@ +/* + * ============LICENSE_START==================================== + * DCAEGEN2-SERVICES-SDK + * ========================================================= + * Copyright (C) 2021 Nokia. All rights reserved. + * ========================================================= + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END===================================== + */ + +package org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config; + +import org.junit.jupiter.api.Test; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.junit.jupiter.api.Assertions.assertThrows; + +class DmaapRetryConfigTest { + @Test + void shouldSuccessfullyCreateObject() { + DmaapRetryConfig retryConfig = ImmutableDmaapRetryConfig.builder() + .retryIntervalInSeconds(1) + .retryCount(0) + .build(); + + assertThat(retryConfig.retryIntervalInSeconds()).isOne(); + assertThat(retryConfig.retryCount()).isZero(); + } + + @Test + void shouldSuccessfullyCreateObjectForDefaults() { + DmaapRetryConfig retryConfig = ImmutableDmaapRetryConfig.builder().build(); + + assertThat(retryConfig.retryIntervalInSeconds()).isOne(); + assertThat(retryConfig.retryCount()).isEqualTo(3); + } + + @Test + void shouldThrowInvalidArgumentExceptionForInvalidRetryInterval() { + assertThrows(IllegalArgumentException.class, () -> withRetryInterval(0)); + assertThrows(IllegalArgumentException.class, () -> withRetryInterval(-3)); + } + + @Test + void shouldThrowInvalidArgumentExceptionForInvalidRetryCount() { + assertThrows(IllegalArgumentException.class, () -> withRetryCount(-1)); + assertThrows(IllegalArgumentException.class, () -> withRetryCount(-3)); + } + + private void withRetryInterval(int ri) { + ImmutableDmaapRetryConfig.builder() + .retryIntervalInSeconds(ri) + .build(); + } + + private void withRetryCount(int rc) { + ImmutableDmaapRetryConfig.builder() + .retryCount(rc) + .build(); + } +} diff --git a/rest-services/dmaap-client/src/test/resources/dmaap-msg-router/message-router-compose.yml b/rest-services/dmaap-client/src/test/resources/dmaap-msg-router/message-router-compose.yml index ab6641cb..26eb1763 100644 --- a/rest-services/dmaap-client/src/test/resources/dmaap-msg-router/message-router-compose.yml +++ b/rest-services/dmaap-client/src/test/resources/dmaap-msg-router/message-router-compose.yml @@ -67,11 +67,11 @@ services: - zookeeper - kafka - toxiproxy: - image: shopify/toxiproxy:2.1.4 + mockserver: + image: mockserver/mockserver:mockserver-5.11.2 + command: -serverPort 1090 -proxyRemotePort 3904 -proxyRemoteHost dmaap ports: - - "8474:8474" - - "8666:8666" + - "1080:1090" networks: - net depends_on: diff --git a/rest-services/http-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/adapters/http/RxHttpClient.java b/rest-services/http-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/adapters/http/RxHttpClient.java index 77b842d7..d0bdf414 100644 --- a/rest-services/http-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/adapters/http/RxHttpClient.java +++ b/rest-services/http-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/adapters/http/RxHttpClient.java @@ -2,7 +2,7 @@ * ============LICENSE_START==================================== * DCAEGEN2-SERVICES-SDK * ========================================================= - * Copyright (C) 2019-2020 Nokia. All rights reserved. + * Copyright (C) 2019-2021 Nokia. All rights reserved. * ========================================================= * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -17,18 +17,25 @@ * limitations under the License. * ============LICENSE_END===================================== */ + package org.onap.dcaegen2.services.sdk.rest.services.adapters.http; +import io.netty.handler.codec.http.HttpResponseStatus; +import io.vavr.collection.HashSet; import io.vavr.collection.Stream; import io.vavr.control.Option; +import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.config.RetryConfig; import org.onap.dcaegen2.services.sdk.rest.services.model.logging.RequestDiagnosticContext; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.netty.http.client.HttpClient; import reactor.netty.http.client.HttpClient.ResponseReceiver; import reactor.netty.http.client.HttpClientRequest; import reactor.netty.http.client.HttpClientResponse; +import reactor.util.retry.Retry; +import reactor.util.retry.RetryBackoffSpec; import java.util.stream.Collectors; @@ -39,17 +46,23 @@ public class RxHttpClient { private static final Logger LOGGER = LoggerFactory.getLogger(RxHttpClient.class); private final HttpClient httpClient; + private RetryConfig retryConfig; RxHttpClient(HttpClient httpClient) { this.httpClient = httpClient; } + RxHttpClient(HttpClient httpClient, RetryConfig retryConfig) { + this(httpClient); + this.retryConfig = retryConfig; + } + public Mono call(HttpRequest request) { - return prepareRequest(request) - .responseSingle((resp, content) -> - content.asByteArray() - .defaultIfEmpty(new byte[0]) - .map(bytes -> new NettyHttpResponse(request.url(), resp.status(), bytes))); + Mono httpResponseMono = response(request); + return Option.of(retryConfig) + .map(rc -> retryConfig(rc, request.diagnosticContext())) + .map(httpResponseMono::retryWhen) + .getOrElse(() -> httpResponseMono); } ResponseReceiver prepareRequest(HttpRequest request) { @@ -65,6 +78,27 @@ public class RxHttpClient { return prepareBody(request, theClient); } + private Mono response(HttpRequest request) { + return prepareRequest(request) + .responseSingle((resp, content) -> mapResponse(request.url(), resp.status(), content)); + } + + private Mono mapResponse(String url, HttpResponseStatus status, reactor.netty.ByteBufMono content) { + if (shouldRetry(status.code())) { + return Mono.error(new RetryConfig.RetryableException()); + } + return content.asByteArray() + .defaultIfEmpty(new byte[0]) + .map(bytes -> new NettyHttpResponse(url, status, bytes)); + } + + private boolean shouldRetry(int code) { + return Option.of(retryConfig) + .map(RetryConfig::retryableHttpResponseCodes) + .getOrElse(HashSet::empty) + .contains(code); + } + private ResponseReceiver prepareBody(HttpRequest request, HttpClient theClient) { if (request.body() == null) { return prepareBodyWithoutContents(request, theClient); @@ -79,7 +113,7 @@ public class RxHttpClient { return theClient .headers(hdrs -> hdrs.set(HttpHeaders.TRANSFER_ENCODING_TYPE, HttpHeaders.CHUNKED)) .request(request.method().asNetty()) - .send(request.body().contents()) + .send(Flux.from(request.body().contents())) .uri(request.url()); } @@ -87,7 +121,7 @@ public class RxHttpClient { return theClient .headers(hdrs -> hdrs.set(HttpHeaders.CONTENT_LENGTH, request.body().length().toString())) .request(request.method().asNetty()) - .send(request.body().contents()) + .send(Flux.from(request.body().contents())) .uri(request.url()); } @@ -114,4 +148,22 @@ public class RxHttpClient { context.withSlf4jMdc(LOGGER.isDebugEnabled(), () -> LOGGER.debug("Response status: {}", httpClientResponse.status())); } + + private RetryBackoffSpec retryConfig(RetryConfig retryConfig, RequestDiagnosticContext context) { + RetryBackoffSpec retry = Retry + .fixedDelay(retryConfig.retryCount(), retryConfig.retryInterval()) + .doBeforeRetry(retrySignal -> context.withSlf4jMdc( + LOGGER.isTraceEnabled(), () -> LOGGER.trace("Retry: {}", retrySignal))) + .filter(ex -> isRetryable(retryConfig, ex)); + + return Option.of(retryConfig.onRetryExhaustedException()) + .map(ex -> retry.onRetryExhaustedThrow((retryBackoffSpec, retrySignal) -> ex)) + .getOrElse(retry); + } + + private boolean isRetryable(RetryConfig retryConfig, Throwable ex) { + return retryConfig.retryableExceptions() + .toStream() + .exists(clazz -> clazz.isAssignableFrom(ex.getClass())); + } } diff --git a/rest-services/http-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/adapters/http/RxHttpClientFactory.java b/rest-services/http-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/adapters/http/RxHttpClientFactory.java index 9b23f1d9..118df52b 100644 --- a/rest-services/http-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/adapters/http/RxHttpClientFactory.java +++ b/rest-services/http-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/adapters/http/RxHttpClientFactory.java @@ -2,7 +2,7 @@ * ============LICENSE_START==================================== * DCAEGEN2-SERVICES-SDK * ========================================================= - * Copyright (C) 2019 Nokia. All rights reserved. + * Copyright (C) 2019-2021 Nokia. All rights reserved. * ========================================================= * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -21,7 +21,9 @@ package org.onap.dcaegen2.services.sdk.rest.services.adapters.http; import io.netty.handler.ssl.SslContext; +import io.vavr.control.Option; import org.jetbrains.annotations.NotNull; +import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.config.RxHttpClientConfig; import org.onap.dcaegen2.services.sdk.security.ssl.SecurityKeys; import org.onap.dcaegen2.services.sdk.security.ssl.SslFactory; import org.onap.dcaegen2.services.sdk.security.ssl.TrustStoreKeys; @@ -42,23 +44,53 @@ public final class RxHttpClientFactory { return new RxHttpClient(HttpClient.create()); } + public static RxHttpClient create(RxHttpClientConfig config) { + return createWithConfig(HttpClient.create(), config); + } public static RxHttpClient create(SecurityKeys securityKeys) { final SslContext context = SSL_FACTORY.createSecureClientContext(securityKeys); return create(context); } + public static RxHttpClient create(SecurityKeys securityKeys, RxHttpClientConfig config) { + final SslContext context = SSL_FACTORY.createSecureClientContext(securityKeys); + return create(context, config); + } + public static RxHttpClient create(TrustStoreKeys trustStoreKeys) { final SslContext context = SSL_FACTORY.createSecureClientContext(trustStoreKeys); return create(context); } + public static RxHttpClient create(TrustStoreKeys trustStoreKeys, RxHttpClientConfig config) { + final SslContext context = SSL_FACTORY.createSecureClientContext(trustStoreKeys); + return create(context, config); + } + public static RxHttpClient createInsecure() { final SslContext context = SSL_FACTORY.createInsecureClientContext(); return create(context); } + public static RxHttpClient createInsecure(RxHttpClientConfig config) { + final SslContext context = SSL_FACTORY.createInsecureClientContext(); + return create(context, config); + } + private static RxHttpClient create(@NotNull SslContext sslContext) { - return new RxHttpClient(HttpClient.create().secure(sslContextSpec -> sslContextSpec.sslContext(sslContext))); + HttpClient secure = HttpClient.create().secure(sslContextSpec -> sslContextSpec.sslContext(sslContext)); + return new RxHttpClient(secure); + } + + private static RxHttpClient create(@NotNull SslContext sslContext, RxHttpClientConfig config) { + HttpClient secure = HttpClient.create().secure(sslContextSpec -> sslContextSpec.sslContext(sslContext)); + return createWithConfig(secure, config); + } + + private static RxHttpClient createWithConfig(HttpClient httpClient, RxHttpClientConfig config) { + return Option.of(config.retryConfig()) + .map(retryConfig -> new RxHttpClient(httpClient, retryConfig)) + .getOrElse(() -> new RxHttpClient(httpClient)); } } diff --git a/rest-services/http-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/adapters/http/config/RetryConfig.java b/rest-services/http-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/adapters/http/config/RetryConfig.java new file mode 100644 index 00000000..a0ae1991 --- /dev/null +++ b/rest-services/http-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/adapters/http/config/RetryConfig.java @@ -0,0 +1,59 @@ +/* + * ============LICENSE_START==================================== + * DCAEGEN2-SERVICES-SDK + * ========================================================= + * Copyright (C) 2021 Nokia. All rights reserved. + * ========================================================= + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END===================================== + */ + +package org.onap.dcaegen2.services.sdk.rest.services.adapters.http.config; + +import io.vavr.collection.HashSet; +import io.vavr.collection.Set; +import org.immutables.value.Value; +import org.jetbrains.annotations.Nullable; + +import java.time.Duration; + +@Value.Immutable +public interface RetryConfig { + + int retryCount(); + + Duration retryInterval(); + + @Value.Default + default Set retryableHttpResponseCodes() { + return HashSet.empty(); + } + + @Value.Default + default Set> customRetryableExceptions() { + return HashSet.empty(); + } + + @Value.Derived + default Set> retryableExceptions() { + Set> result = customRetryableExceptions(); + if (retryableHttpResponseCodes().nonEmpty()) { + result = result.add(RetryableException.class); + } + return result; + } + + @Nullable RuntimeException onRetryExhaustedException(); + + class RetryableException extends RuntimeException {} +} diff --git a/rest-services/http-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/adapters/http/config/RxHttpClientConfig.java b/rest-services/http-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/adapters/http/config/RxHttpClientConfig.java new file mode 100644 index 00000000..78a88a47 --- /dev/null +++ b/rest-services/http-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/adapters/http/config/RxHttpClientConfig.java @@ -0,0 +1,29 @@ +/* + * ============LICENSE_START==================================== + * DCAEGEN2-SERVICES-SDK + * ========================================================= + * Copyright (C) 2021 Nokia. All rights reserved. + * ========================================================= + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END===================================== + */ + +package org.onap.dcaegen2.services.sdk.rest.services.adapters.http.config; + +import org.immutables.value.Value; +import org.jetbrains.annotations.Nullable; + +@Value.Immutable +public interface RxHttpClientConfig { + @Nullable RetryConfig retryConfig(); +} diff --git a/rest-services/http-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/adapters/http/test/DummyHttpServer.java b/rest-services/http-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/adapters/http/test/DummyHttpServer.java index 4795b00f..8ac0d1d5 100644 --- a/rest-services/http-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/adapters/http/test/DummyHttpServer.java +++ b/rest-services/http-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/adapters/http/test/DummyHttpServer.java @@ -2,7 +2,7 @@ * ============LICENSE_START==================================== * DCAEGEN2-SERVICES-SDK * ========================================================= - * Copyright (C) 2019 Nokia. All rights reserved. + * Copyright (C) 2019-2021 Nokia. All rights reserved. * ========================================================= * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -21,11 +21,8 @@ package org.onap.dcaegen2.services.sdk.rest.services.adapters.http.test; import io.vavr.CheckedFunction0; -import java.net.URL; -import java.nio.file.Files; -import java.nio.file.Paths; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.function.Consumer; +import io.vavr.Tuple3; +import io.vavr.control.Try; import org.reactivestreams.Publisher; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -35,6 +32,13 @@ import reactor.netty.http.server.HttpServer; import reactor.netty.http.server.HttpServerResponse; import reactor.netty.http.server.HttpServerRoutes; +import java.net.URL; +import java.nio.file.Files; +import java.nio.file.Paths; +import java.time.Duration; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Consumer; + /** * @author Piotr Jaszczyk * @since February 2019 @@ -63,11 +67,26 @@ public class DummyHttpServer { return responses[state.getAndIncrement()]; } + public static Publisher sendInOrderWithDelay(AtomicInteger counter, Tuple3... responses) { + Tuple3 tuple = responses[counter.get()]; + HttpServerResponse httpServerResponse = tuple._1; + Integer statusCode = tuple._2; + long timeout = tuple._3.toMillis(); + Try.run(() -> Thread.sleep(timeout)); + counter.incrementAndGet(); + return sendString(httpServerResponse.status(statusCode), Mono.just("OK")); + } + + public static Publisher sendWithDelay(HttpServerResponse response, int statusCode, Duration timeout) { + Try.run(() -> Thread.sleep(timeout.toMillis())); + return sendString(response.status(statusCode), Mono.just("OK")); + } + public static Publisher sendResource(HttpServerResponse httpServerResponse, String resourcePath) { return sendString(httpServerResponse, Mono.fromCallable(() -> readResource(resourcePath))); } - public static Publisher sendError(HttpServerResponse httpServerResponse, int statusCode, String message){ + public static Publisher sendError(HttpServerResponse httpServerResponse, int statusCode, String message) { return sendString(httpServerResponse.status(statusCode), Mono.just(message)); } @@ -79,6 +98,11 @@ public class DummyHttpServer { server.disposeNow(); } + public DummyHttpServer closeAndGet() { + server.disposeNow(); + return this; + } + public String host() { return server.host(); } diff --git a/rest-services/http-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/adapters/http/RxHttpClientIT.java b/rest-services/http-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/adapters/http/RxHttpClientIT.java index 6f3a0909..daf04c6e 100644 --- a/rest-services/http-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/adapters/http/RxHttpClientIT.java +++ b/rest-services/http-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/adapters/http/RxHttpClientIT.java @@ -2,7 +2,7 @@ * ============LICENSE_START==================================== * DCAEGEN2-SERVICES-SDK * ========================================================= - * Copyright (C) 2019-2020 Nokia. All rights reserved. + * Copyright (C) 2019-2021 Nokia. All rights reserved. * ========================================================= * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -22,33 +22,55 @@ package org.onap.dcaegen2.services.sdk.rest.services.adapters.http; import io.netty.handler.codec.http.HttpResponseStatus; import io.netty.handler.timeout.ReadTimeoutException; +import io.vavr.Tuple; +import io.vavr.collection.HashSet; import org.junit.jupiter.api.AfterAll; -import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; +import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.config.ImmutableRetryConfig; +import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.config.ImmutableRxHttpClientConfig; import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.exceptions.HttpException; import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.test.DummyHttpServer; import reactor.core.publisher.Mono; import reactor.test.StepVerifier; +import java.net.ConnectException; import java.net.MalformedURLException; import java.net.URL; import java.time.Duration; +import java.util.concurrent.atomic.AtomicInteger; import static org.assertj.core.api.Assertions.assertThat; +import static org.onap.dcaegen2.services.sdk.rest.services.adapters.http.test.DummyHttpServer.sendInOrderWithDelay; import static org.onap.dcaegen2.services.sdk.rest.services.adapters.http.test.DummyHttpServer.sendString; class RxHttpClientIT { private static final Duration TIMEOUT = Duration.ofHours(5); - private final RxHttpClient cut = RxHttpClientFactory.create(); - private static DummyHttpServer httpServer; - - @BeforeAll - static void setUpClass() { - httpServer = DummyHttpServer.start(routes -> routes - .get("/sample-get", (req, resp) -> sendString(resp, Mono.just("OK"))) - .get("/delayed-get", (req, resp) -> sendString(resp, Mono.just("OK").delayElement(Duration.ofMinutes(1)))) + private static final Duration NO_DELAY = Duration.ofSeconds(0); + private static final int RETRY_COUNT = 1; + private static final int EXPECTED_REQUESTS_WHEN_RETRY = RETRY_COUNT + 1; + private static final DummyHttpServer HTTP_SERVER = initialize(); + private static final DummyHttpServer DISPOSED_HTTP_SERVER = initialize().closeAndGet(); + private static final Mono OK = Mono.just("OK"); + private static final Duration RETRY_INTERVAL = Duration.ofMillis(1); + private static AtomicInteger REQUEST_COUNTER; + + private static DummyHttpServer initialize() { + return DummyHttpServer.start(routes -> routes + .get("/sample-get", (req, resp) -> sendString(resp, OK)) + .get("/delay-get", (req, resp) -> + sendInOrderWithDelay(REQUEST_COUNTER, Tuple.of(resp, 200, Duration.ofSeconds(3)))) .get("/sample-get-500", (req, resp) -> resp.status(HttpResponseStatus.INTERNAL_SERVER_ERROR).send()) + .get("/retry-get-500", (req, resp) -> + sendInOrderWithDelay(REQUEST_COUNTER, + Tuple.of(resp, 500, NO_DELAY), Tuple.of(resp, 500, NO_DELAY))) + .get("/retry-get-400", (req, resp) -> + sendInOrderWithDelay(REQUEST_COUNTER, Tuple.of(resp, 400, NO_DELAY))) + .get("/retry-get-500-200", (req, resp) -> + sendInOrderWithDelay(REQUEST_COUNTER, + Tuple.of(resp, 500, NO_DELAY), Tuple.of(resp, 200, NO_DELAY))) + .get("/retry-get-200", (req, resp) -> + sendInOrderWithDelay(REQUEST_COUNTER, Tuple.of(resp, 200, NO_DELAY))) .post("/headers-post", (req, resp) -> resp .sendString(Mono.just(req.requestHeaders().toString()))) .post("/echo-post", (req, resp) -> resp.send(req.receive().retain())) @@ -57,12 +79,7 @@ class RxHttpClientIT { @AfterAll static void tearDownClass() { - httpServer.close(); - } - - private ImmutableHttpRequest.Builder requestFor(String path) throws MalformedURLException { - return ImmutableHttpRequest.builder() - .url(new URL("http", httpServer.host(), httpServer.port(), path).toString()); + HTTP_SERVER.close(); } @Test @@ -71,6 +88,7 @@ class RxHttpClientIT { final HttpRequest httpRequest = requestFor("/sample-get") .method(HttpMethod.GET) .build(); + final RxHttpClient cut = RxHttpClientFactory.create(); // when final Mono bodyAsString = cut.call(httpRequest) @@ -90,6 +108,7 @@ class RxHttpClientIT { final HttpRequest httpRequest = requestFor("/sample-get-500") .method(HttpMethod.GET) .build(); + final RxHttpClient cut = RxHttpClientFactory.create(); // when final Mono bodyAsString = cut.call(httpRequest) @@ -110,6 +129,7 @@ class RxHttpClientIT { .method(HttpMethod.POST) .body(RequestBody.fromString(requestBody)) .build(); + final RxHttpClient cut = RxHttpClientFactory.create(); // when final Mono bodyAsString = cut.call(httpRequest) @@ -131,6 +151,7 @@ class RxHttpClientIT { .method(HttpMethod.POST) .body(RequestBody.chunkedFromString(Mono.just(requestBody))) .build(); + final RxHttpClient cut = RxHttpClientFactory.create(); // when final Mono bodyAsString = cut.call(httpRequest) @@ -155,6 +176,7 @@ class RxHttpClientIT { .method(HttpMethod.POST) .body(RequestBody.fromString(requestBody)) .build(); + final RxHttpClient cut = RxHttpClientFactory.create(); // when final Mono bodyAsString = cut.call(httpRequest) @@ -174,10 +196,12 @@ class RxHttpClientIT { @Test void getWithTimeoutError() throws Exception { // given - final HttpRequest httpRequest = requestFor("/delayed-get") + REQUEST_COUNTER = new AtomicInteger(); + final HttpRequest httpRequest = requestFor("/delay-get") .method(HttpMethod.GET) - .timeout(Duration.ofSeconds(1)) + .timeout(Duration.ofMillis(1)) .build(); + final RxHttpClient cut = RxHttpClientFactory.create(ImmutableRxHttpClientConfig.builder().build()); // when final Mono response = cut.call(httpRequest); @@ -186,5 +210,208 @@ class RxHttpClientIT { StepVerifier.create(response) .expectError(ReadTimeoutException.class) .verify(TIMEOUT); + assertNoServerResponse(); + } + + @Test + void getWithRetryExhaustedExceptionWhenClosedServer() throws Exception { + // given + REQUEST_COUNTER = new AtomicInteger(); + final HttpRequest httpRequest = requestForClosedServer("/sample-get") + .method(HttpMethod.GET) + .build(); + final RxHttpClient cut = RxHttpClientFactory.create(ImmutableRxHttpClientConfig.builder() + .retryConfig(defaultRetryConfig() + .customRetryableExceptions(HashSet.of(ConnectException.class)) + .build()) + .build()); + + // when + final Mono response = cut.call(httpRequest); + + // then + StepVerifier.create(response) + .expectError(IllegalStateException.class) + .verify(TIMEOUT); + assertNoServerResponse(); + } + + @Test + void getWithCustomRetryExhaustedExceptionWhenClosedServer() throws Exception { + // given + REQUEST_COUNTER = new AtomicInteger(); + final HttpRequest httpRequest = requestForClosedServer("/sample-get") + .method(HttpMethod.GET) + .build(); + final RxHttpClient cut = RxHttpClientFactory.create(ImmutableRxHttpClientConfig.builder() + .retryConfig(defaultRetryConfig() + .customRetryableExceptions(HashSet.of(ConnectException.class)) + .onRetryExhaustedException(ReadTimeoutException.INSTANCE) + .build()) + .build()); + + // when + final Mono response = cut.call(httpRequest); + + // then + StepVerifier.create(response) + .expectError(ReadTimeoutException.class) + .verify(TIMEOUT); + assertNoServerResponse(); + } + + @Test + void getWithRetryExhaustedExceptionWhen500() throws Exception { + // given + REQUEST_COUNTER = new AtomicInteger(); + final HttpRequest httpRequest = requestFor("/retry-get-500") + .method(HttpMethod.GET) + .build(); + final RxHttpClient cut = RxHttpClientFactory.create(ImmutableRxHttpClientConfig.builder() + .retryConfig(defaultRetryConfig() + .retryableHttpResponseCodes(HashSet.of(500)) + .build()) + .build()); + + // when + final Mono response = cut.call(httpRequest); + + // then + StepVerifier.create(response) + .expectError(IllegalStateException.class) + .verify(TIMEOUT); + assertRetry(); + } + + @Test + void getWithCustomRetryExhaustedExceptionWhen500() throws Exception { + // given + REQUEST_COUNTER = new AtomicInteger(); + final HttpRequest httpRequest = requestFor("/retry-get-500") + .method(HttpMethod.GET) + .build(); + final RxHttpClient cut = RxHttpClientFactory.create(ImmutableRxHttpClientConfig.builder() + .retryConfig(defaultRetryConfig() + .onRetryExhaustedException(ReadTimeoutException.INSTANCE) + .retryableHttpResponseCodes(HashSet.of(500)) + .build()) + .build()); + + // when + final Mono response = cut.call(httpRequest); + + // then + StepVerifier.create(response) + .expectError(ReadTimeoutException.class) + .verify(TIMEOUT); + assertRetry(); + } + + @Test + void getWithRetryWhen500AndThen200() throws Exception { + // given + REQUEST_COUNTER = new AtomicInteger(); + final HttpRequest httpRequest = requestFor("/retry-get-500-200") + .method(HttpMethod.GET) + .build(); + final RxHttpClient cut = RxHttpClientFactory.create(ImmutableRxHttpClientConfig.builder() + .retryConfig(defaultRetryConfig() + .retryableHttpResponseCodes(HashSet.of(500)) + .build()) + .build()); + + // when + final Mono bodyAsString = cut.call(httpRequest) + .doOnNext(HttpResponse::throwIfUnsuccessful) + .map(HttpResponse::bodyAsString); + + // then + StepVerifier.create(bodyAsString) + .expectNext("OK") + .expectComplete() + .verify(TIMEOUT); + assertRetry(); + } + + @Test + void getWithoutRetryWhen200() throws Exception { + // given + REQUEST_COUNTER = new AtomicInteger(); + final HttpRequest httpRequest = requestFor("/retry-get-200") + .method(HttpMethod.GET) + .build(); + final RxHttpClient cut = RxHttpClientFactory.create(ImmutableRxHttpClientConfig.builder() + .retryConfig(defaultRetryConfig() + .retryableHttpResponseCodes(HashSet.of(500)) + .build()) + .build()); + + // when + final Mono bodyAsString = cut.call(httpRequest) + .doOnNext(HttpResponse::throwIfUnsuccessful) + .map(HttpResponse::bodyAsString); + + // then + StepVerifier.create(bodyAsString) + .expectNext("OK") + .expectComplete() + .verify(TIMEOUT); + assertNoRetry(); + } + + @Test + void getWithoutRetryWhen400() throws Exception { + // given + REQUEST_COUNTER = new AtomicInteger(); + final HttpRequest httpRequest = requestFor("/retry-get-400") + .method(HttpMethod.GET) + .build(); + final RxHttpClient cut = RxHttpClientFactory.create(ImmutableRxHttpClientConfig.builder() + .retryConfig(defaultRetryConfig() + .retryableHttpResponseCodes(HashSet.of(500)) + .build()) + .build()); + + // when + Mono result = cut.call(httpRequest); + + // then + StepVerifier.create(result) + .consumeNextWith(this::assert400) + .expectComplete() + .verify(TIMEOUT); + assertNoRetry(); + } + + private ImmutableHttpRequest.Builder requestFor(String path) throws MalformedURLException { + return ImmutableHttpRequest.builder() + .url(new URL("http", HTTP_SERVER.host(), HTTP_SERVER.port(), path).toString()); + } + + private ImmutableHttpRequest.Builder requestForClosedServer(String path) throws MalformedURLException { + return ImmutableHttpRequest.builder() + .url(new URL("http", DISPOSED_HTTP_SERVER.host(), DISPOSED_HTTP_SERVER.port(), path).toString()); + } + + private ImmutableRetryConfig.Builder defaultRetryConfig() { + return ImmutableRetryConfig.builder() + .retryCount(RETRY_COUNT) + .retryInterval(RETRY_INTERVAL); + } + + private void assertRetry() { + assertThat(REQUEST_COUNTER.get()).isEqualTo(EXPECTED_REQUESTS_WHEN_RETRY); + } + + private void assertNoRetry() { + assertThat(REQUEST_COUNTER.get()).isOne(); + } + + private void assertNoServerResponse() { + assertThat(REQUEST_COUNTER.get()).isZero(); + } + + private void assert400(HttpResponse httpResponse) { + assertThat(httpResponse.statusCode()).isEqualTo(400); } } -- cgit 1.2.3-korg