aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authortkogut <tomasz.kogut@nokia.com>2021-01-19 09:00:56 +0100
committertkogut <tomasz.kogut@nokia.com>2021-01-20 12:20:55 +0100
commit9b309b5e3905cb25d5d661c4428cc9d4ad0402a6 (patch)
tree58c9e881f694fde8347762b6c237de9423f33f23
parent286637d4a801ab6e933684500509eab308d2e3a6 (diff)
Support retry in DCAE-SDK DMaaP-Client
Issue-ID: DCAEGEN2-1483 Signed-off-by: tkogut <tomasz.kogut@nokia.com> Change-Id: Id3f98c0a9367f7c7c2c53ed3eba8805a5a6ab87e
-rw-r--r--pom.xml8
-rw-r--r--rest-services/dmaap-client/pom.xml15
-rw-r--r--rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/DmaapClientFactory.java41
-rw-r--r--rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/error/ClientErrorReasonPresenter.java10
-rw-r--r--rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/error/model/ClientError.java (renamed from rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/error/ClientError.java)4
-rw-r--r--rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/error/model/RequestError.java (renamed from rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/error/RequestError.java)4
-rw-r--r--rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/error/model/ServiceException.java (renamed from rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/error/ServiceException.java)4
-rw-r--r--rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/impl/MessageRouterPublisherImpl.java9
-rw-r--r--rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/impl/MessageRouterSubscriberImpl.java20
-rw-r--r--rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/model/DmaapRequest.java4
-rw-r--r--rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/model/config/DmaapClientConfiguration.java7
-rw-r--r--rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/model/config/DmaapRetryConfig.java64
-rw-r--r--rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/model/config/DmaapTimeoutConfig.java (renamed from rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/model/config/TimeoutConfig.java)4
-rw-r--r--rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/MessageRouterTestsUtils.java6
-rw-r--r--rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/DMaapContainer.java11
-rw-r--r--rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterPublisherIT.java124
-rw-r--r--rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterPublisherTest.java148
-rw-r--r--rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterSubscriberIT.java150
-rw-r--r--rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterSubscriberTest.java149
-rw-r--r--rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/error/ClientErrorReasonPresenterTest.java6
-rw-r--r--rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/impl/MessageRouterPublisherImplTest.java22
-rw-r--r--rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/impl/MessageRouterSubscriberImplTest.java14
-rw-r--r--rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/model/config/DmaapRetryConfigTest.java71
-rw-r--r--rest-services/dmaap-client/src/test/resources/dmaap-msg-router/message-router-compose.yml8
-rw-r--r--rest-services/http-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/adapters/http/RxHttpClient.java68
-rw-r--r--rest-services/http-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/adapters/http/RxHttpClientFactory.java36
-rw-r--r--rest-services/http-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/adapters/http/config/RetryConfig.java59
-rw-r--r--rest-services/http-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/adapters/http/config/RxHttpClientConfig.java29
-rw-r--r--rest-services/http-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/adapters/http/test/DummyHttpServer.java38
-rw-r--r--rest-services/http-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/adapters/http/RxHttpClientIT.java263
30 files changed, 1058 insertions, 338 deletions
diff --git a/pom.xml b/pom.xml
index 80b0c915..2c2ed16b 100644
--- a/pom.xml
+++ b/pom.xml
@@ -60,8 +60,8 @@
<properties>
<java.version>11</java.version>
- <junit-jupiter.version>5.3.1</junit-jupiter.version>
- <junit-vintage.version>5.3.1</junit-vintage.version>
+ <junit-jupiter.version>5.7.0</junit-jupiter.version>
+ <junit-vintage.version>5.7.0</junit-vintage.version>
<junit-platform.version>1.3.1</junit-platform.version>
<immutables.version>2.7.5</immutables.version>
<assertj-core.version>3.12.2</assertj-core.version>
@@ -74,11 +74,11 @@
<commons-text.version>1.6</commons-text.version>
<jetbrains-annotations.version>16.0.3</jetbrains-annotations.version>
<protoc-jar-maven-plugin.version>3.6.0.2</protoc-jar-maven-plugin.version>
- <testcontainers.version>1.15.0</testcontainers.version>
+ <testcontainers.version>1.15.1</testcontainers.version>
<spring.boot.version>2.4.0</spring.boot.version>
<system.rules.version>1.17.2</system.rules.version>
<openapi4j.version>1.0.3</openapi4j.version>
- <toxiproxy-java.version>2.1.4</toxiproxy-java.version>
+ <mockserver-client.version>5.11.2</mockserver-client.version>
<sonar.coverage.jacoco.xmlReportPaths>
${project.reporting.outputDirectory}/jacoco-ut/jacoco.xml
</sonar.coverage.jacoco.xmlReportPaths>
diff --git a/rest-services/dmaap-client/pom.xml b/rest-services/dmaap-client/pom.xml
index d619590f..b8620311 100644
--- a/rest-services/dmaap-client/pom.xml
+++ b/rest-services/dmaap-client/pom.xml
@@ -56,6 +56,10 @@
<artifactId>junit-jupiter-engine</artifactId>
</dependency>
<dependency>
+ <groupId>org.junit.jupiter</groupId>
+ <artifactId>junit-jupiter-params</artifactId>
+ </dependency>
+ <dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-test</artifactId>
<scope>test</scope>
@@ -76,11 +80,10 @@
<groupId>org.testcontainers</groupId>
<artifactId>junit-jupiter</artifactId>
</dependency>
- <dependency>
- <groupId>eu.rekawek.toxiproxy</groupId>
- <artifactId>toxiproxy-java</artifactId>
- <version>${toxiproxy-java.version}</version>
- <scope>test</scope>
- </dependency>
+ <dependency>
+ <groupId>org.mock-server</groupId>
+ <artifactId>mockserver-client-java</artifactId>
+ <version>${mockserver-client.version}</version>
+ </dependency>
</dependencies>
</project>
diff --git a/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/DmaapClientFactory.java b/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/DmaapClientFactory.java
index 3c27da10..9d255559 100644
--- a/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/DmaapClientFactory.java
+++ b/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/DmaapClientFactory.java
@@ -2,7 +2,7 @@
* ============LICENSE_START====================================
* DCAEGEN2-SERVICES-SDK
* =========================================================
- * Copyright (C) 2019 Nokia. All rights reserved.
+ * Copyright (C) 2019-2021 Nokia. All rights reserved.
* =========================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -19,17 +19,28 @@
*/
package org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api;
+import io.vavr.control.Option;
import org.jetbrains.annotations.NotNull;
import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.RxHttpClient;
import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.RxHttpClientFactory;
+import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.config.ImmutableRetryConfig;
+import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.config.ImmutableRxHttpClientConfig;
+import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.config.RetryConfig;
+import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.config.RxHttpClientConfig;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.error.ClientErrorReasonPresenter;
import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.impl.MessageRouterPublisherImpl;
import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.impl.MessageRouterSubscriberImpl;
import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.DmaapClientConfiguration;
import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.MessageRouterPublisherConfig;
import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.MessageRouterSubscriberConfig;
+import java.time.Duration;
+
+import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.DmaapRetryConfig.ON_RETRY_EXHAUSTED_EXCEPTION;
+import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.DmaapRetryConfig.RETRYABLE_EXCEPTIONS;
+import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.DmaapRetryConfig.RETRYABLE_HTTP_CODES;
+
/**
- *
* @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a>
* @since 1.1.4
*/
@@ -44,19 +55,37 @@ public final class DmaapClientFactory {
return new MessageRouterPublisherImpl(
createHttpClient(clientConfiguration),
clientConfiguration.maxBatchSize(),
- clientConfiguration.maxBatchDuration());
+ clientConfiguration.maxBatchDuration(),
+ new ClientErrorReasonPresenter());
}
public static @NotNull MessageRouterSubscriber createMessageRouterSubscriber(
@NotNull MessageRouterSubscriberConfig clientConfiguration) {
return new MessageRouterSubscriberImpl(
createHttpClient(clientConfiguration),
- clientConfiguration.gsonInstance());
+ clientConfiguration.gsonInstance(),
+ new ClientErrorReasonPresenter());
}
private static @NotNull RxHttpClient createHttpClient(DmaapClientConfiguration config) {
+ RxHttpClientConfig clientConfig = ImmutableRxHttpClientConfig.builder()
+ .retryConfig(createRetry(config))
+ .build();
return config.securityKeys() == null
- ? RxHttpClientFactory.create()
- : RxHttpClientFactory.create(config.securityKeys());
+ ? RxHttpClientFactory.create(clientConfig)
+ : RxHttpClientFactory.create(config.securityKeys(), clientConfig);
+ }
+
+ private static RetryConfig createRetry(DmaapClientConfiguration config) {
+ return Option.of(config.retryConfig())
+ .map(rc -> ImmutableRetryConfig.builder()
+ .retryInterval(Duration.ofSeconds(rc.retryIntervalInSeconds()))
+ .retryCount(rc.retryCount())
+ .retryableHttpResponseCodes(RETRYABLE_HTTP_CODES)
+ .customRetryableExceptions(RETRYABLE_EXCEPTIONS)
+ .onRetryExhaustedException(ON_RETRY_EXHAUSTED_EXCEPTION)
+ .build())
+ .getOrNull();
}
}
+
diff --git a/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/error/ClientErrorReasonPresenter.java b/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/error/ClientErrorReasonPresenter.java
index 6b22b378..1eaae78e 100644
--- a/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/error/ClientErrorReasonPresenter.java
+++ b/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/error/ClientErrorReasonPresenter.java
@@ -2,7 +2,7 @@
* ============LICENSE_START====================================
* DCAEGEN2-SERVICES-SDK
* =========================================================
- * Copyright (C) 2020 Nokia. All rights reserved.
+ * Copyright (C) 2020-2021 Nokia. All rights reserved.
* =========================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -23,15 +23,17 @@ package org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.error;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import io.vavr.control.Option;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.error.model.ClientError;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.error.model.ImmutableClientError;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.error.model.ImmutableRequestError;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.error.model.ImmutableServiceException;
public class ClientErrorReasonPresenter {
- private ClientErrorReasonPresenter() { }
-
private static final Gson GSON = new GsonBuilder().create();
private static final String PATTERN = "%s\n%s";
- public static String present(ClientErrorReason clientErrorReason) {
+ public String present(ClientErrorReason clientErrorReason) {
ImmutableServiceException simpleServiceException = ImmutableServiceException.builder()
.messageId(clientErrorReason.messageId())
.text(clientErrorReason.text())
diff --git a/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/error/ClientError.java b/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/error/model/ClientError.java
index 57187c80..d0cb35da 100644
--- a/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/error/ClientError.java
+++ b/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/error/model/ClientError.java
@@ -2,7 +2,7 @@
* ============LICENSE_START====================================
* DCAEGEN2-SERVICES-SDK
* =========================================================
- * Copyright (C) 2020 Nokia. All rights reserved.
+ * Copyright (C) 2020-2021 Nokia. All rights reserved.
* =========================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -18,7 +18,7 @@
* ============LICENSE_END=====================================
*/
-package org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.error;
+package org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.error.model;
import org.immutables.gson.Gson;
import org.immutables.value.Value;
diff --git a/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/error/RequestError.java b/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/error/model/RequestError.java
index 71e673fe..79b9a299 100644
--- a/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/error/RequestError.java
+++ b/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/error/model/RequestError.java
@@ -2,7 +2,7 @@
* ============LICENSE_START====================================
* DCAEGEN2-SERVICES-SDK
* =========================================================
- * Copyright (C) 2020 Nokia. All rights reserved.
+ * Copyright (C) 2020-2021 Nokia. All rights reserved.
* =========================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -18,7 +18,7 @@
* ============LICENSE_END=====================================
*/
-package org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.error;
+package org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.error.model;
import org.immutables.gson.Gson;
import org.immutables.value.Value;
diff --git a/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/error/ServiceException.java b/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/error/model/ServiceException.java
index e99330ac..a39fbc0b 100644
--- a/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/error/ServiceException.java
+++ b/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/error/model/ServiceException.java
@@ -2,7 +2,7 @@
* ============LICENSE_START====================================
* DCAEGEN2-SERVICES-SDK
* =========================================================
- * Copyright (C) 2020 Nokia. All rights reserved.
+ * Copyright (C) 2020-2021 Nokia. All rights reserved.
* =========================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -18,7 +18,7 @@
* ============LICENSE_END=====================================
*/
-package org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.error;
+package org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.error.model;
import org.immutables.gson.Gson;
import org.immutables.value.Value;
diff --git a/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/impl/MessageRouterPublisherImpl.java b/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/impl/MessageRouterPublisherImpl.java
index 16068da0..7d1b0a93 100644
--- a/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/impl/MessageRouterPublisherImpl.java
+++ b/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/impl/MessageRouterPublisherImpl.java
@@ -2,7 +2,7 @@
* ============LICENSE_START====================================
* DCAEGEN2-SERVICES-SDK
* =========================================================
- * Copyright (C) 2019-2020 Nokia. All rights reserved.
+ * Copyright (C) 2019-2021 Nokia. All rights reserved.
* =========================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -61,12 +61,15 @@ public class MessageRouterPublisherImpl implements MessageRouterPublisher {
private final RxHttpClient httpClient;
private final int maxBatchSize;
private final Duration maxBatchDuration;
+ private final ClientErrorReasonPresenter clientErrorReasonPresenter;
+
private static final Logger LOGGER = LoggerFactory.getLogger(MessageRouterPublisherImpl.class);
- public MessageRouterPublisherImpl(RxHttpClient httpClient, int maxBatchSize, Duration maxBatchDuration) {
+ public MessageRouterPublisherImpl(RxHttpClient httpClient, int maxBatchSize, Duration maxBatchDuration, ClientErrorReasonPresenter clientErrorReasonPresenter) {
this.httpClient = httpClient;
this.maxBatchSize = maxBatchSize;
this.maxBatchDuration = maxBatchDuration;
+ this.clientErrorReasonPresenter = clientErrorReasonPresenter;
}
@Override
@@ -124,7 +127,7 @@ public class MessageRouterPublisherImpl implements MessageRouterPublisher {
}
private Mono<MessageRouterPublishResponse> createErrorResponse(ClientErrorReason clientErrorReason) {
- String failReason = ClientErrorReasonPresenter.present(clientErrorReason);
+ String failReason = clientErrorReasonPresenter.present(clientErrorReason);
return Mono.just(ImmutableMessageRouterPublishResponse.builder()
.failReason(failReason)
.build());
diff --git a/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/impl/MessageRouterSubscriberImpl.java b/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/impl/MessageRouterSubscriberImpl.java
index f7ccf4f2..292a7157 100644
--- a/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/impl/MessageRouterSubscriberImpl.java
+++ b/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/impl/MessageRouterSubscriberImpl.java
@@ -20,16 +20,12 @@
package org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.impl;
-import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.impl.Commons.extractFailReason;
-
import com.google.gson.Gson;
import com.google.gson.JsonArray;
import com.google.gson.JsonElement;
import com.google.gson.JsonParser;
import io.netty.handler.timeout.ReadTimeoutException;
import io.vavr.collection.List;
-import java.nio.charset.StandardCharsets;
-
import io.vavr.control.Option;
import org.jetbrains.annotations.NotNull;
import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.HttpMethod;
@@ -48,6 +44,10 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Mono;
+import java.nio.charset.StandardCharsets;
+
+import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.impl.Commons.extractFailReason;
+
/**
* @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a>
* @since March 2019
@@ -55,11 +55,14 @@ import reactor.core.publisher.Mono;
public class MessageRouterSubscriberImpl implements MessageRouterSubscriber {
private final RxHttpClient httpClient;
private final Gson gson;
+ private final ClientErrorReasonPresenter clientErrorReasonPresenter;
private static final Logger LOGGER = LoggerFactory.getLogger(MessageRouterSubscriberImpl.class);
- public MessageRouterSubscriberImpl(RxHttpClient httpClient, Gson gson) {
+ public MessageRouterSubscriberImpl(RxHttpClient httpClient, Gson gson,
+ ClientErrorReasonPresenter clientErrorReasonPresenter) {
this.httpClient = httpClient;
this.gson = gson;
+ this.clientErrorReasonPresenter = clientErrorReasonPresenter;
}
@Override
@@ -67,7 +70,8 @@ public class MessageRouterSubscriberImpl implements MessageRouterSubscriber {
LOGGER.debug("Requesting new items from DMaaP MR: {}", request);
return httpClient.call(buildGetHttpRequest(request))
.map(this::buildGetResponse)
- .doOnError(ReadTimeoutException.class, e -> LOGGER.error("Timeout exception occurred when subscribe items from DMaaP MR", e))
+ .doOnError(ReadTimeoutException.class,
+ e -> LOGGER.error("Timeout exception occurred when subscribe items from DMaaP MR", e))
.onErrorResume(ReadTimeoutException.class, e -> createErrorResponse(ClientErrorReasons.TIMEOUT));
}
@@ -91,7 +95,7 @@ public class MessageRouterSubscriberImpl implements MessageRouterSubscriber {
: builder.failReason(extractFailReason(httpResponse)).build();
}
- private List<JsonElement> getAsJsonElements(HttpResponse httpResponse){
+ private List<JsonElement> getAsJsonElements(HttpResponse httpResponse) {
JsonArray bodyAsJsonArray = httpResponse
.bodyAsJson(StandardCharsets.UTF_8, gson, JsonArray.class);
@@ -104,7 +108,7 @@ public class MessageRouterSubscriberImpl implements MessageRouterSubscriber {
}
private Mono<MessageRouterSubscribeResponse> createErrorResponse(ClientErrorReason clientErrorReason) {
- String failReason = ClientErrorReasonPresenter.present(clientErrorReason);
+ String failReason = clientErrorReasonPresenter.present(clientErrorReason);
return Mono.just(ImmutableMessageRouterSubscribeResponse.builder()
.failReason(failReason)
.build());
diff --git a/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/model/DmaapRequest.java b/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/model/DmaapRequest.java
index 95c5e7d1..a5a87fbd 100644
--- a/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/model/DmaapRequest.java
+++ b/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/model/DmaapRequest.java
@@ -22,7 +22,7 @@ package org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model;
import org.immutables.value.Value;
import org.jetbrains.annotations.Nullable;
-import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.TimeoutConfig;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.DmaapTimeoutConfig;
import org.onap.dcaegen2.services.sdk.rest.services.model.logging.RequestDiagnosticContext;
/**
@@ -35,5 +35,5 @@ public interface DmaapRequest {
return RequestDiagnosticContext.create();
}
- @Nullable TimeoutConfig timeoutConfig();
+ @Nullable DmaapTimeoutConfig timeoutConfig();
}
diff --git a/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/model/config/DmaapClientConfiguration.java b/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/model/config/DmaapClientConfiguration.java
index ac677f02..3e283511 100644
--- a/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/model/config/DmaapClientConfiguration.java
+++ b/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/model/config/DmaapClientConfiguration.java
@@ -2,7 +2,7 @@
* ============LICENSE_START====================================
* DCAEGEN2-SERVICES-SDK
* =========================================================
- * Copyright (C) 2019 Nokia. All rights reserved.
+ * Copyright (C) 2019-2021 Nokia. All rights reserved.
* =========================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -32,5 +32,8 @@ public interface DmaapClientConfiguration {
default @Nullable SecurityKeys securityKeys() {
return null;
}
-
+ @Value.Default
+ default @Nullable DmaapRetryConfig retryConfig(){
+ return null;
+ }
}
diff --git a/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/model/config/DmaapRetryConfig.java b/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/model/config/DmaapRetryConfig.java
new file mode 100644
index 00000000..f82edfc9
--- /dev/null
+++ b/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/model/config/DmaapRetryConfig.java
@@ -0,0 +1,64 @@
+/*
+ * ============LICENSE_START====================================
+ * DCAEGEN2-SERVICES-SDK
+ * =========================================================
+ * Copyright (C) 2021 Nokia. All rights reserved.
+ * =========================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=====================================
+ */
+
+package org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config;
+
+import io.netty.handler.timeout.ReadTimeoutException;
+import io.vavr.collection.HashSet;
+import io.vavr.collection.Set;
+import org.immutables.value.Value;
+
+import java.net.ConnectException;
+
+@Value.Immutable
+public interface DmaapRetryConfig {
+
+ Set<Class<? extends Throwable>> RETRYABLE_EXCEPTIONS = HashSet.of(ReadTimeoutException.class, ConnectException.class);
+ RuntimeException ON_RETRY_EXHAUSTED_EXCEPTION = ReadTimeoutException.INSTANCE;
+ Set<Integer> RETRYABLE_HTTP_CODES = HashSet.of(404, 408, 413, 429, 500, 502, 503, 504);
+
+ @Value.Default
+ default int retryCount() {
+ return 3;
+ }
+
+ @Value.Default
+ default int retryIntervalInSeconds() {
+ return 1;
+ }
+
+ @Value.Check
+ default void validate() {
+ validateRetryCount();
+ validateRetryInterval();
+ }
+
+ private void validateRetryCount() {
+ int rc = retryCount();
+ if (rc < 0)
+ throw new IllegalArgumentException(String.format("Invalid value: %d, retryCount should be (0-n)", rc));
+ }
+
+ private void validateRetryInterval() {
+ long ri = retryIntervalInSeconds();
+ if (ri < 1)
+ throw new IllegalArgumentException(String.format("Invalid value: %d, retryInterval should be (1-n)", ri));
+ }
+}
diff --git a/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/model/config/TimeoutConfig.java b/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/model/config/DmaapTimeoutConfig.java
index 413bf8e5..0ece899b 100644
--- a/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/model/config/TimeoutConfig.java
+++ b/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/model/config/DmaapTimeoutConfig.java
@@ -2,7 +2,7 @@
* ============LICENSE_START====================================
* DCAEGEN2-SERVICES-SDK
* =========================================================
- * Copyright (C) 2020 Nokia. All rights reserved.
+ * Copyright (C) 2021 Nokia. All rights reserved.
* =========================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -25,7 +25,7 @@ import org.immutables.value.Value;
import java.time.Duration;
@Value.Immutable
-public interface TimeoutConfig {
+public interface DmaapTimeoutConfig {
@Value.Default
default Duration getTimeout() {
diff --git a/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/MessageRouterTestsUtils.java b/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/MessageRouterTestsUtils.java
index 2b8027c1..1a315806 100644
--- a/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/MessageRouterTestsUtils.java
+++ b/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/MessageRouterTestsUtils.java
@@ -38,7 +38,7 @@ import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRo
import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishResponse;
import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterSubscribeRequest;
import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterSubscribeResponse;
-import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.ImmutableTimeoutConfig;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.ImmutableDmaapTimeoutConfig;
import reactor.core.publisher.Flux;
import java.time.Duration;
@@ -56,7 +56,7 @@ public final class MessageRouterTestsUtils {
return ImmutableMessageRouterPublishRequest.builder()
.sinkDefinition(createMessageRouterSink(topicUrl))
.contentType(ContentType.APPLICATION_JSON)
- .timeoutConfig(ImmutableTimeoutConfig.builder()
+ .timeoutConfig(ImmutableDmaapTimeoutConfig.builder()
.timeout(timeout)
.build())
.build();
@@ -86,7 +86,7 @@ public final class MessageRouterTestsUtils {
return ImmutableMessageRouterSubscribeRequest
.builder()
- .timeoutConfig(ImmutableTimeoutConfig.builder()
+ .timeoutConfig(ImmutableDmaapTimeoutConfig.builder()
.timeout(timeout)
.build())
.sourceDefinition(getImmutableMessageRouterSource(topicUrl))
diff --git a/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/DMaapContainer.java b/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/DMaapContainer.java
index 494ca62a..5b1984df 100644
--- a/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/DMaapContainer.java
+++ b/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/DMaapContainer.java
@@ -2,7 +2,7 @@
* ============LICENSE_START====================================
* DCAEGEN2-SERVICES-SDK
* =========================================================
- * Copyright (C) 2019 Nokia. All rights reserved.
+ * Copyright (C) 2019-2021 Nokia. All rights reserved.
* =========================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -27,11 +27,10 @@ import java.net.URL;
final class DMaapContainer {
private static final String MR_COMPOSE_RESOURCE_NAME = "dmaap-msg-router/message-router-compose.yml";
- private static final String DOCKER_COMPOSE_FILE_PATH = getDockerComposeFilePath(
- MR_COMPOSE_RESOURCE_NAME);
+ private static final String DOCKER_COMPOSE_FILE_PATH = getDockerComposeFilePath(MR_COMPOSE_RESOURCE_NAME);
static final int DMAAP_SERVICE_EXPOSED_PORT = 3904;
static final String DMAAP_SERVICE_NAME = "dmaap";
- static final int PROXY_SERVICE_EXPOSED_PORT = 8666;
+ static final int PROXY_MOCK_SERVICE_EXPOSED_PORT = 1080;
static final String LOCALHOST = "localhost";
private DMaapContainer() {}
@@ -43,11 +42,11 @@ final class DMaapContainer {
.withLocalCompose(true);
}
- private static String getDockerComposeFilePath(String resourceName){
+ private static String getDockerComposeFilePath(String resourceName) {
URL resource = DMaapContainer.class.getClassLoader()
.getResource(resourceName);
- if(resource != null) return resource.getFile();
+ if (resource != null) return resource.getFile();
else throw new DockerComposeNotFoundException(String
.format("File %s does not exist", resourceName));
}
diff --git a/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterPublisherIT.java b/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterPublisherIT.java
index 24cd2c34..f6ef94b7 100644
--- a/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterPublisherIT.java
+++ b/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterPublisherIT.java
@@ -2,7 +2,7 @@
* ============LICENSE_START====================================
* DCAEGEN2-SERVICES-SDK
* =========================================================
- * Copyright (C) 2019-2020 Nokia. All rights reserved.
+ * Copyright (C) 2019-2021 Nokia. All rights reserved.
* =========================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -22,16 +22,21 @@ package org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api;
import com.google.gson.JsonElement;
import com.google.gson.JsonObject;
-import eu.rekawek.toxiproxy.Proxy;
-import eu.rekawek.toxiproxy.ToxiproxyClient;
import io.vavr.collection.List;
import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
+import org.mockserver.client.MockServerClient;
+import org.mockserver.matchers.TimeToLive;
+import org.mockserver.matchers.Times;
+import org.mockserver.verify.VerificationTimes;
import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.ContentType;
import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishRequest;
import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishResponse;
import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterSubscribeRequest;
import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterSubscribeResponse;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.ImmutableDmaapRetryConfig;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.ImmutableMessageRouterPublisherConfig;
import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.MessageRouterPublisherConfig;
import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.MessageRouterSubscriberConfig;
import org.testcontainers.containers.DockerComposeContainer;
@@ -41,11 +46,11 @@ import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.test.StepVerifier;
-import java.io.IOException;
import java.time.Duration;
import java.util.concurrent.TimeUnit;
-import static eu.rekawek.toxiproxy.model.ToxicDirection.DOWNSTREAM;
+import static org.mockserver.model.HttpRequest.request;
+import static org.mockserver.model.HttpResponse.response;
import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.MessageRouterTestsUtils.createMRSubscribeRequest;
import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.MessageRouterTestsUtils.createPublishRequest;
import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.MessageRouterTestsUtils.errorPublishResponse;
@@ -56,14 +61,19 @@ import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.MessageR
import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.MessageRouterTestsUtils.successPublishResponse;
import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.MessageRouterTestsUtils.successSubscribeResponse;
import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.DMaapContainer.DMAAP_SERVICE_EXPOSED_PORT;
-import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.DMaapContainer.DMAAP_SERVICE_NAME;
import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.DMaapContainer.LOCALHOST;
-import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.DMaapContainer.PROXY_SERVICE_EXPOSED_PORT;
+import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.DMaapContainer.PROXY_MOCK_SERVICE_EXPOSED_PORT;
+import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.DMaapContainer.createContainerInstance;
@Testcontainers
class MessageRouterPublisherIT {
@Container
- private static final DockerComposeContainer CONTAINER = DMaapContainer.createContainerInstance();
+ private static final DockerComposeContainer CONTAINER = createContainerInstance();
+ private static final MockServerClient MOCK_SERVER_CLIENT = new MockServerClient(
+ LOCALHOST, PROXY_MOCK_SERVICE_EXPOSED_PORT);
+ private static String EVENTS_PATH;
+ private static String PROXY_MOCK_EVENTS_PATH;
+
private static final Duration TIMEOUT = Duration.ofSeconds(10);
private static final String DMAAP_400_ERROR_RESPONSE_FORMAT = "400 Bad Request\n"
+ "{"
@@ -85,22 +95,21 @@ class MessageRouterPublisherIT {
+ "}"
+ "}"
+ "}";
- private static Proxy DMAAP_PROXY;
- private static String EVENTS_PATH;
- private static String PROXY_EVENTS_PATH;
+
private final MessageRouterPublisher publisher = DmaapClientFactory
.createMessageRouterPublisher(MessageRouterPublisherConfig.createDefault());
private final MessageRouterSubscriber subscriber = DmaapClientFactory
.createMessageRouterSubscriber(MessageRouterSubscriberConfig.createDefault());
@BeforeAll
- static void setUp() throws IOException {
+ static void setUp() {
EVENTS_PATH = String.format("http://%s:%d/events", LOCALHOST, DMAAP_SERVICE_EXPOSED_PORT);
- PROXY_EVENTS_PATH = String.format("http://%s:%d/events", LOCALHOST, PROXY_SERVICE_EXPOSED_PORT);
+ PROXY_MOCK_EVENTS_PATH = String.format("http://%s:%d/events", LOCALHOST, PROXY_MOCK_SERVICE_EXPOSED_PORT);
+ }
- DMAAP_PROXY = new ToxiproxyClient().createProxy("dmaapProxy",
- String.format("[::]:%s", PROXY_SERVICE_EXPOSED_PORT),
- String.format("%s:%d", DMAAP_SERVICE_NAME, DMAAP_SERVICE_EXPOSED_PORT));
+ @BeforeEach
+ void set() {
+ MOCK_SERVER_CLIENT.reset();
}
@Test
@@ -302,17 +311,18 @@ class MessageRouterPublisherIT {
}
@Test
- void publisher_shouldHandleClientTimeoutErrorWhenTimeoutDefined() throws IOException {
+ void publisher_shouldHandleClientTimeoutErrorWhenTimeoutDefined() {
//given
- final String toxic = "latency-toxic";
- DMAAP_PROXY.toxics()
- .latency(toxic, DOWNSTREAM, TimeUnit.SECONDS.toMillis(5));
final String topic = "TOPIC10";
final List<String> singleJsonMessage = List.of("{\"message\":\"message1\"}");
final Flux<JsonObject> messageBatch = jsonBatch(singleJsonMessage);
final MessageRouterPublishRequest mrRequest = createPublishRequest(
- String.format("%s/%s", PROXY_EVENTS_PATH, topic), Duration.ofSeconds(1));
+ String.format("%s/%s", PROXY_MOCK_EVENTS_PATH, topic), Duration.ofSeconds(1));
final MessageRouterPublishResponse expectedResponse = errorPublishResponse(TIMEOUT_ERROR_MESSAGE);
+ final String path = String.format("/events/%s", topic);
+ MOCK_SERVER_CLIENT
+ .when(request().withPath(path), Times.once())
+ .respond(response().withDelay(TimeUnit.SECONDS, 2));
//when
final Flux<MessageRouterPublishResponse> result = publisher.put(mrRequest, messageBatch);
@@ -323,7 +333,75 @@ class MessageRouterPublisherIT {
.expectComplete()
.verify(TIMEOUT);
- //cleanup
- DMAAP_PROXY.toxics().get(toxic).remove();
+ MOCK_SERVER_CLIENT.verify(request().withPath(path), VerificationTimes.exactly(1));
+ }
+
+ @Test
+ void publisher_shouldRetryWhenRetryableHttpCodeAndSuccessfullyPublish() {
+ final String topic = "TOPIC11";
+ final String topicUrl = String.format("%s/%s", PROXY_MOCK_EVENTS_PATH, topic);
+
+ final List<String> singleJsonMessage = List.of("{\"message\":\"message1\"}");
+ final List<JsonElement> expectedItems = getAsJsonElements(singleJsonMessage);
+ final Flux<JsonElement> plainBatch = plainBatch(singleJsonMessage);
+
+ final MessageRouterPublishRequest publishRequest = createPublishRequest(topicUrl);
+ final MessageRouterPublishResponse expectedResponse = successPublishResponse(expectedItems);
+
+ final String path = String.format("/events/%s", topic);
+ MOCK_SERVER_CLIENT
+ .when(request().withPath(path), Times.once())
+ .respond(response().withStatusCode(404));
+ final MessageRouterPublisher publisher = DmaapClientFactory.createMessageRouterPublisher(retryConfig());
+
+ //when
+ final Flux<MessageRouterPublishResponse> result = publisher.put(publishRequest, plainBatch);
+
+ //then
+ StepVerifier.create(result)
+ .expectNext(expectedResponse)
+ .expectComplete()
+ .verify();
+
+ MOCK_SERVER_CLIENT.verify(request().withPath(path), VerificationTimes.exactly(2));
+ }
+
+ @Test
+ void publisher_shouldRetryWhenClientTimeoutAndSuccessfullyPublish() {
+ final String topic = "TOPIC12";
+ final String topicUrl = String.format("%s/%s", PROXY_MOCK_EVENTS_PATH, topic);
+
+ final List<String> singleJsonMessage = List.of("{\"message\":\"message1\"}");
+ final List<JsonElement> expectedItems = getAsJsonElements(singleJsonMessage);
+ final Flux<JsonElement> plainBatch = plainBatch(singleJsonMessage);
+
+ final MessageRouterPublishRequest publishRequest = createPublishRequest(topicUrl, Duration.ofSeconds(1));
+ final MessageRouterPublishResponse expectedResponse = successPublishResponse(expectedItems);
+
+ final String path = String.format("/events/%s", topic);
+ MOCK_SERVER_CLIENT
+ .when(request().withPath(path), Times.once())
+ .respond(response().withDelay(TimeUnit.SECONDS, 10));
+ final MessageRouterPublisher publisher = DmaapClientFactory.createMessageRouterPublisher(retryConfig());
+
+ //when
+ final Flux<MessageRouterPublishResponse> result = publisher.put(publishRequest, plainBatch);
+
+ //then
+ StepVerifier.create(result)
+ .expectNext(expectedResponse)
+ .expectComplete()
+ .verify();
+
+ MOCK_SERVER_CLIENT.verify(request().withPath(path), VerificationTimes.exactly(2));
+ }
+
+ private MessageRouterPublisherConfig retryConfig() {
+ return ImmutableMessageRouterPublisherConfig.builder()
+ .retryConfig(ImmutableDmaapRetryConfig.builder()
+ .retryIntervalInSeconds(1)
+ .retryCount(1)
+ .build())
+ .build();
}
}
diff --git a/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterPublisherTest.java b/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterPublisherTest.java
index b0a07eda..82b6661c 100644
--- a/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterPublisherTest.java
+++ b/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterPublisherTest.java
@@ -2,7 +2,7 @@
* ============LICENSE_START====================================
* DCAEGEN2-SERVICES-SDK
* =========================================================
- * Copyright (C) 2019-2020 Nokia. All rights reserved.
+ * Copyright (C) 2019-2021 Nokia. All rights reserved.
* =========================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -25,14 +25,18 @@ import com.google.gson.JsonPrimitive;
import io.vavr.collection.List;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.CsvSource;
import org.onap.dcaegen2.services.sdk.model.streams.dmaap.ImmutableMessageRouterSink;
import org.onap.dcaegen2.services.sdk.model.streams.dmaap.MessageRouterSink;
import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.test.DummyHttpServer;
import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.ContentType;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.DmaapResponse;
import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.ImmutableMessageRouterPublishRequest;
import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.ImmutableMessageRouterPublishResponse;
import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishRequest;
import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishResponse;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.ImmutableDmaapTimeoutConfig;
import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.MessageRouterPublisherConfig;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
@@ -40,8 +44,10 @@ import reactor.test.StepVerifier;
import java.time.Duration;
+import static org.assertj.core.api.Assertions.assertThat;
import static org.onap.dcaegen2.services.sdk.rest.services.adapters.http.test.DummyHttpServer.sendError;
import static org.onap.dcaegen2.services.sdk.rest.services.adapters.http.test.DummyHttpServer.sendString;
+import static org.onap.dcaegen2.services.sdk.rest.services.adapters.http.test.DummyHttpServer.sendWithDelay;
/**
* @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a>
@@ -50,12 +56,14 @@ import static org.onap.dcaegen2.services.sdk.rest.services.adapters.http.test.Du
class MessageRouterPublisherTest {
private static final String ERROR_MESSAGE = "Something went wrong";
+ private static final String TIMEOUT_ERROR_MESSAGE = "408 Request Timeout";
private static final String SUCCESS_RESP_TOPIC_PATH = "/events/TOPIC";
+ private static final String DELAY_RESP_TOPIC_PATH = "/events/DELAY";
private static final String FAILING_WITH_400_RESP_PATH = "/events/TOPIC400";
private static final String FAILING_WITH_401_RESP_PATH = "/events/TOPIC401";
private static final String FAILING_WITH_403_RESP_PATH = "/events/TOPIC403";
private static final String FAILING_WITH_404_RESP_PATH = "/events/TOPIC404";
- private static final String FAILING_WITH_500_TOPIC_PATH = "/events/TOPIC500";
+ private static final String FAILING_WITH_500_RESP_PATH = "/events/TOPIC500";
private static final Duration TIMEOUT = Duration.ofSeconds(10);
private static final Flux<JsonPrimitive> messageBatch = Flux.just("ala", "ma", "kota")
.map(JsonPrimitive::new);
@@ -69,29 +77,22 @@ class MessageRouterPublisherTest {
@BeforeAll
static void setUp() {
server = DummyHttpServer.start(routes -> routes
- .post(SUCCESS_RESP_TOPIC_PATH, (req, resp) ->
- sendString(resp, Mono.just("OK")))
- .post(FAILING_WITH_400_RESP_PATH, (req, resp) ->
- sendError(resp, 400, ERROR_MESSAGE))
- .post(FAILING_WITH_401_RESP_PATH, (req, resp) ->
- sendError(resp, 401, ERROR_MESSAGE))
- .post(FAILING_WITH_403_RESP_PATH, (req, resp) ->
- sendError(resp, 403, ERROR_MESSAGE))
- .post(FAILING_WITH_404_RESP_PATH, (req, resp) ->
- sendError(resp, 404, ERROR_MESSAGE))
- .post(FAILING_WITH_500_TOPIC_PATH, (req, resp) ->
- sendError(resp, 500, ERROR_MESSAGE))
+ .post(SUCCESS_RESP_TOPIC_PATH, (req, resp) -> sendString(resp, Mono.just("OK")))
+ .post(DELAY_RESP_TOPIC_PATH, (req, resp) -> sendWithDelay(resp, 200, TIMEOUT))
+ .post(FAILING_WITH_400_RESP_PATH, (req, resp) -> sendError(resp, 400, ERROR_MESSAGE))
+ .post(FAILING_WITH_401_RESP_PATH, (req, resp) -> sendError(resp, 401, ERROR_MESSAGE))
+ .post(FAILING_WITH_403_RESP_PATH, (req, resp) -> sendError(resp, 403, ERROR_MESSAGE))
+ .post(FAILING_WITH_404_RESP_PATH, (req, resp) -> sendError(resp, 404, ERROR_MESSAGE))
+ .post(FAILING_WITH_500_RESP_PATH, (req, resp) -> sendError(resp, 500, ERROR_MESSAGE))
);
}
@Test
void test_put_givenMessageBatch_shouldMakeSuccessfulPostRequestReturningBatch() {
//given
- final MessageRouterPublishRequest mrRequest = createMRRequest(SUCCESS_RESP_TOPIC_PATH,
- ContentType.TEXT_PLAIN);
+ final MessageRouterPublishRequest mrRequest = createTextPlainMRRequest(SUCCESS_RESP_TOPIC_PATH);
final List<JsonElement> expectedItems = messageBatchItems.map(JsonPrimitive::new);
-
//when
final Flux<MessageRouterPublishResponse> result = sut.put(mrRequest, messageBatch);
@@ -102,13 +103,18 @@ class MessageRouterPublisherTest {
.verify(TIMEOUT);
}
- @Test
- void publisher_shouldHandleBadRequestError() {
+ @ParameterizedTest
+ @CsvSource({
+ FAILING_WITH_400_RESP_PATH + "," + "400 Bad Request",
+ FAILING_WITH_401_RESP_PATH + "," + "401 Unauthorized",
+ FAILING_WITH_403_RESP_PATH + "," + "403 Forbidden",
+ FAILING_WITH_404_RESP_PATH + "," + "404 Not Found",
+ FAILING_WITH_500_RESP_PATH + "," + "500 Internal Server Error"
+ })
+ void publisher_shouldHandleError(String failingPath, String failReason) {
//given
- final MessageRouterPublishRequest mrRequest = createMRRequest(FAILING_WITH_400_RESP_PATH,
- ContentType.TEXT_PLAIN);
- final MessageRouterPublishResponse expectedResponse = createErrorResponse(
- "400 Bad Request\n%s", ERROR_MESSAGE);
+ final MessageRouterPublishRequest mrRequest = createTextPlainMRRequest(failingPath);
+ final MessageRouterPublishResponse expectedResponse = createErrorResponse(failReason);
//when
final Flux<MessageRouterPublishResponse> result = sut.put(mrRequest, messageBatch);
@@ -121,83 +127,40 @@ class MessageRouterPublisherTest {
}
@Test
- void publisher_shouldHandleUnauthorizedError() {
+ void publisher_shouldHandleClientTimeoutError() {
//given
- final MessageRouterPublishRequest mrRequest = createMRRequest(FAILING_WITH_401_RESP_PATH,
- ContentType.TEXT_PLAIN);
- final MessageRouterPublishResponse expectedResponse = createErrorResponse(
- "401 Unauthorized\n%s", ERROR_MESSAGE);
+ final Duration requestTimeout = Duration.ofMillis(1);
+ final MessageRouterPublishRequest mrRequest = createTextPlainMRRequest(DELAY_RESP_TOPIC_PATH, requestTimeout);
//when
final Flux<MessageRouterPublishResponse> result = sut.put(mrRequest, messageBatch);
//then
StepVerifier.create(result)
- .expectNext(expectedResponse)
- .expectComplete()
- .verify(TIMEOUT);
- }
-
- @Test
- void publisher_shouldHandleForbiddenError() {
- //given
- final MessageRouterPublishRequest mrRequest = createMRRequest(FAILING_WITH_403_RESP_PATH,
- ContentType.TEXT_PLAIN);
- final MessageRouterPublishResponse expectedResponse = createErrorResponse(
- "403 Forbidden\n%s", ERROR_MESSAGE);
-
- //when
- final Flux<MessageRouterPublishResponse> result = sut
- .put(mrRequest, messageBatch);
-
- //then
- StepVerifier.create(result)
- .expectNext(expectedResponse)
+ .consumeNextWith(this::assertTimeoutError)
.expectComplete()
.verify(TIMEOUT);
}
- @Test
- void publisher_shouldHandleNotFoundError() {
- //given
- final MessageRouterPublishRequest mrRequest = createMRRequest(FAILING_WITH_404_RESP_PATH,
- ContentType.TEXT_PLAIN);
- final MessageRouterPublishResponse expectedResponse = createErrorResponse(
- "404 Not Found\n%s", ERROR_MESSAGE);
-
- //when
- final Flux<MessageRouterPublishResponse> result = sut
- .put(mrRequest, messageBatch);
-
- //then
- StepVerifier.create(result)
- .expectNext(expectedResponse)
- .expectComplete()
- .verify(TIMEOUT);
+ private static MessageRouterPublishRequest createTextPlainMRRequest(String topicPath) {
+ final MessageRouterSink sinkDefinition = createMRSink(topicPath);
+ return ImmutableMessageRouterPublishRequest.builder()
+ .sinkDefinition(sinkDefinition)
+ .contentType(ContentType.TEXT_PLAIN)
+ .build();
}
- @Test
- void publisher_shouldHandleInternalServerError() {
- //given
- final MessageRouterPublishRequest mrRequest = createMRRequest(FAILING_WITH_500_TOPIC_PATH,
- ContentType.TEXT_PLAIN);
- final MessageRouterPublishResponse expectedResponse = createErrorResponse(
- "500 Internal Server Error\n%s", ERROR_MESSAGE);
-
- //when
- final Flux<MessageRouterPublishResponse> result = sut
- .put(mrRequest, messageBatch);
-
- //then
- StepVerifier.create(result)
- .expectNext(expectedResponse)
- .expectComplete()
- .verify(TIMEOUT);
+ private static MessageRouterPublishRequest createTextPlainMRRequest(String topicPath, Duration timeout) {
+ final MessageRouterSink sinkDefinition = createMRSink(topicPath);
+ return ImmutableMessageRouterPublishRequest.builder()
+ .sinkDefinition(sinkDefinition)
+ .contentType(ContentType.TEXT_PLAIN)
+ .timeoutConfig(ImmutableDmaapTimeoutConfig.builder().timeout(timeout).build())
+ .build();
}
-
- private MessageRouterPublishRequest createMRRequest(String topicPath, ContentType contentType) {
- final MessageRouterSink sinkDefinition = ImmutableMessageRouterSink.builder()
+ private static MessageRouterSink createMRSink(String topicPath) {
+ return ImmutableMessageRouterSink.builder()
.name("the topic")
.topicUrl(String.format("http://%s:%d%s",
server.host(),
@@ -205,18 +168,19 @@ class MessageRouterPublisherTest {
topicPath)
)
.build();
-
- return ImmutableMessageRouterPublishRequest.builder()
- .sinkDefinition(sinkDefinition)
- .contentType(contentType)
- .build();
}
- private MessageRouterPublishResponse createErrorResponse(String failReasonFormat, Object... formatArgs) {
+ private static MessageRouterPublishResponse createErrorResponse(String failReason) {
+ String failReasonFormat = failReason + "\n%s";
return ImmutableMessageRouterPublishResponse
.builder()
- .failReason(String.format(failReasonFormat, formatArgs))
+ .failReason(String.format(failReasonFormat, ERROR_MESSAGE))
.build();
}
+
+ private void assertTimeoutError(DmaapResponse response) {
+ assertThat(response.failed()).isTrue();
+ assertThat(response.failReason()).startsWith(TIMEOUT_ERROR_MESSAGE);
+ }
}
diff --git a/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterSubscriberIT.java b/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterSubscriberIT.java
index bd161aab..1f4e499d 100644
--- a/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterSubscriberIT.java
+++ b/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterSubscriberIT.java
@@ -22,14 +22,18 @@ package org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api;
import com.google.gson.JsonElement;
import com.google.gson.JsonObject;
-import eu.rekawek.toxiproxy.Proxy;
-import eu.rekawek.toxiproxy.ToxiproxyClient;
import io.vavr.collection.List;
import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
+import org.mockserver.client.MockServerClient;
+import org.mockserver.matchers.Times;
+import org.mockserver.verify.VerificationTimes;
import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishRequest;
import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterSubscribeRequest;
import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterSubscribeResponse;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.ImmutableDmaapRetryConfig;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.ImmutableMessageRouterSubscriberConfig;
import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.MessageRouterPublisherConfig;
import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.MessageRouterSubscriberConfig;
import org.testcontainers.containers.DockerComposeContainer;
@@ -39,11 +43,11 @@ import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.test.StepVerifier;
-import java.io.IOException;
import java.time.Duration;
import java.util.concurrent.TimeUnit;
-import static eu.rekawek.toxiproxy.model.ToxicDirection.DOWNSTREAM;
+import static org.mockserver.model.HttpRequest.request;
+import static org.mockserver.model.HttpResponse.response;
import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.MessageRouterTestsUtils.createMRSubscribeRequest;
import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.MessageRouterTestsUtils.createPublishRequest;
import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.MessageRouterTestsUtils.errorSubscribeResponse;
@@ -52,19 +56,23 @@ import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.MessageR
import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.MessageRouterTestsUtils.jsonBatch;
import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.MessageRouterTestsUtils.registerTopic;
import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.MessageRouterTestsUtils.successSubscribeResponse;
-
import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.DMaapContainer.DMAAP_SERVICE_EXPOSED_PORT;
-import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.DMaapContainer.DMAAP_SERVICE_NAME;
import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.DMaapContainer.LOCALHOST;
-import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.DMaapContainer.PROXY_SERVICE_EXPOSED_PORT;
+import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.DMaapContainer.PROXY_MOCK_SERVICE_EXPOSED_PORT;
+import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.DMaapContainer.createContainerInstance;
@Testcontainers
class MessageRouterSubscriberIT {
+ @Container
+ private static final DockerComposeContainer CONTAINER = createContainerInstance();
+ private static final MockServerClient MOCK_SERVER_CLIENT = new MockServerClient(
+ LOCALHOST, PROXY_MOCK_SERVICE_EXPOSED_PORT);
+ private static String EVENTS_PATH;
+ private static String PROXY_MOCK_EVENTS_PATH;
+
private static final Duration TIMEOUT = Duration.ofSeconds(10);
private static final String CONSUMER_GROUP = "group1";
private static final String CONSUMER_ID = "consumer200";
- private static String PROXY_EVENTS_PATH;
- private static Proxy DMAAP_PROXY;
private static final String DMAAP_404_ERROR_RESPONSE_FORMAT = "404 Not Found\n" +
"{" +
"\"mrstatus\":3001," +
@@ -85,27 +93,21 @@ class MessageRouterSubscriberIT {
+ "}"
+ "}";
- @Container
- private static final DockerComposeContainer CONTAINER = DMaapContainer.createContainerInstance();
-
- private static String EVENTS_PATH;
-
private MessageRouterPublisher publisher = DmaapClientFactory
.createMessageRouterPublisher(MessageRouterPublisherConfig.createDefault());
private MessageRouterSubscriber subscriber = DmaapClientFactory
.createMessageRouterSubscriber(MessageRouterSubscriberConfig.createDefault());
-
@BeforeAll
- static void setUp() throws IOException {
+ static void setUp() {
EVENTS_PATH = String.format("http://%s:%d/events", LOCALHOST, DMAAP_SERVICE_EXPOSED_PORT);
- PROXY_EVENTS_PATH = String.format("http://%s:%d/events", LOCALHOST, PROXY_SERVICE_EXPOSED_PORT);
-
- DMAAP_PROXY = new ToxiproxyClient().createProxy("dmaapProxy",
- String.format("[::]:%s", PROXY_SERVICE_EXPOSED_PORT),
- String.format("%s:%d", DMAAP_SERVICE_NAME, DMAAP_SERVICE_EXPOSED_PORT));
+ PROXY_MOCK_EVENTS_PATH = String.format("http://%s:%d/events", LOCALHOST, PROXY_MOCK_SERVICE_EXPOSED_PORT);
}
+ @BeforeEach
+ void set() {
+ MOCK_SERVER_CLIENT.reset();
+ }
@Test
void subscriber_shouldHandleNoSuchTopicException() {
@@ -128,7 +130,7 @@ class MessageRouterSubscriberIT {
}
@Test
- void subscriberShouldHandleSingleItemResponse(){
+ void subscriberShouldHandleSingleItemResponse() {
//given
final String topic = "TOPIC";
final String topicUrl = String.format("%s/%s", EVENTS_PATH, topic);
@@ -207,7 +209,7 @@ class MessageRouterSubscriberIT {
}
@Test
- void subscriber_shouldSubscribeToTopic(){
+ void subscriber_shouldSubscribeToTopic() {
//given
final String topic = "TOPIC4";
final String topicUrl = String.format("%s/%s", EVENTS_PATH, topic);
@@ -234,17 +236,16 @@ class MessageRouterSubscriberIT {
}
@Test
- void subscriber_shouldHandleTimeoutException() throws IOException {
+ void subscriber_shouldHandleTimeoutException() {
//given
- final String topic = "newTopic";
+ final String topic = "TOPIC5";
final MessageRouterSubscribeRequest mrSubscribeRequest = createMRSubscribeRequest(
- String.format("%s/%s", PROXY_EVENTS_PATH, topic), CONSUMER_GROUP, CONSUMER_ID, Duration.ofSeconds(1));
- final MessageRouterSubscribeResponse expectedResponse = errorSubscribeResponse(
- TIMEOUT_ERROR_MESSAGE);
-
- final String toxic = "latency-toxic";
- DMAAP_PROXY.toxics()
- .latency(toxic, DOWNSTREAM, TimeUnit.SECONDS.toMillis(5));
+ String.format("%s/%s", PROXY_MOCK_EVENTS_PATH, topic), CONSUMER_GROUP, CONSUMER_ID, Duration.ofSeconds(1));
+ final MessageRouterSubscribeResponse expectedResponse = errorSubscribeResponse(TIMEOUT_ERROR_MESSAGE);
+ final String path = String.format("/events/%s/%s/%s", topic, CONSUMER_GROUP, CONSUMER_ID);
+ MOCK_SERVER_CLIENT
+ .when(request().withPath(path), Times.once())
+ .respond(response().withDelay(TimeUnit.SECONDS, 5));
//when
Mono<MessageRouterSubscribeResponse> response = subscriber
@@ -256,7 +257,88 @@ class MessageRouterSubscriberIT {
.expectComplete()
.verify(TIMEOUT);
- //cleanup
- DMAAP_PROXY.toxics().get(toxic).remove();
+ MOCK_SERVER_CLIENT.verify(request().withPath(path), VerificationTimes.exactly(1));
+ }
+
+ @Test
+ void subscriber_shouldRetryWhenRetryableHttpCodeAndSuccessfullySubscribe() {
+ //given
+ final String topic = "TOPIC6";
+ final String proxyTopicUrl = String.format("%s/%s", PROXY_MOCK_EVENTS_PATH, topic);
+ final String topicUrl = String.format("%s/%s", EVENTS_PATH, topic);
+ final MessageRouterPublishRequest publishRequest = createPublishRequest(proxyTopicUrl);
+ final MessageRouterSubscribeRequest subscribeRequest = createMRSubscribeRequest(proxyTopicUrl, CONSUMER_GROUP, CONSUMER_ID);
+
+ final List<String> singleJsonMessage = List.of("{\"message\":\"message1\"}");
+ final List<JsonElement> expectedItems = getAsJsonElements(singleJsonMessage);
+ final Flux<JsonObject> jsonMessageBatch = jsonBatch(singleJsonMessage);
+ final MessageRouterSubscribeResponse expectedResponse = successSubscribeResponse(expectedItems);
+
+ final String path = String.format("/events/%s/%s/%s", topic, CONSUMER_GROUP, CONSUMER_ID);
+ MOCK_SERVER_CLIENT
+ .when(request().withPath(path), Times.once())
+ .respond(response().withStatusCode(404));
+ final MessageRouterSubscriber subscriber = DmaapClientFactory.createMessageRouterSubscriber(retryConfig());
+
+ //when
+ registerTopic(publisher, createPublishRequest(topicUrl), subscriber,
+ createMRSubscribeRequest(topicUrl, CONSUMER_GROUP, CONSUMER_ID));
+ Mono<MessageRouterSubscribeResponse> response = publisher
+ .put(publishRequest, jsonMessageBatch)
+ .then(subscriber.get(subscribeRequest));
+
+ //then
+ StepVerifier.create(response)
+ .expectNext(expectedResponse)
+ .expectComplete()
+ .verify();
+
+ MOCK_SERVER_CLIENT.verify(request().withPath(path), VerificationTimes.exactly(2));
+ }
+
+ @Test
+ void subscriber_shouldRetryWhenClientTimeoutAndSuccessfullySubscribe() {
+ //given
+ final String topic = "TOPIC7";
+ final String proxyTopicUrl = String.format("%s/%s", PROXY_MOCK_EVENTS_PATH, topic);
+ final String topicUrl = String.format("%s/%s", EVENTS_PATH, topic);
+ final MessageRouterPublishRequest publishRequest = createPublishRequest(proxyTopicUrl);
+ final MessageRouterSubscribeRequest subscribeRequest = createMRSubscribeRequest(
+ proxyTopicUrl, CONSUMER_GROUP, CONSUMER_ID, Duration.ofSeconds(1));
+
+ final List<String> singleJsonMessage = List.of("{\"message\":\"message1\"}");
+ final List<JsonElement> expectedItems = getAsJsonElements(singleJsonMessage);
+ final Flux<JsonObject> jsonMessageBatch = jsonBatch(singleJsonMessage);
+ final MessageRouterSubscribeResponse expectedResponse = successSubscribeResponse(expectedItems);
+
+ final String path = String.format("/events/%s/%s/%s", topic, CONSUMER_GROUP, CONSUMER_ID);
+ MOCK_SERVER_CLIENT
+ .when(request().withPath(path), Times.once())
+ .respond(response().withDelay(TimeUnit.SECONDS, 10));
+ final MessageRouterSubscriber subscriber = DmaapClientFactory.createMessageRouterSubscriber(retryConfig());
+
+ //when
+ registerTopic(publisher, createPublishRequest(topicUrl), subscriber,
+ createMRSubscribeRequest(topicUrl, CONSUMER_GROUP, CONSUMER_ID));
+ Mono<MessageRouterSubscribeResponse> response = publisher
+ .put(publishRequest, jsonMessageBatch)
+ .then(subscriber.get(subscribeRequest));
+
+ //then
+ StepVerifier.create(response)
+ .expectNext(expectedResponse)
+ .expectComplete()
+ .verify();
+
+ MOCK_SERVER_CLIENT.verify(request().withPath(path), VerificationTimes.exactly(2));
+ }
+
+ private MessageRouterSubscriberConfig retryConfig() {
+ return ImmutableMessageRouterSubscriberConfig.builder()
+ .retryConfig(ImmutableDmaapRetryConfig.builder()
+ .retryIntervalInSeconds(1)
+ .retryCount(1)
+ .build())
+ .build();
}
}
diff --git a/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterSubscriberTest.java b/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterSubscriberTest.java
index 18584789..06875394 100644
--- a/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterSubscriberTest.java
+++ b/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterSubscriberTest.java
@@ -2,7 +2,7 @@
* ============LICENSE_START====================================
* DCAEGEN2-SERVICES-SDK
* =========================================================
- * Copyright (C) 2019 Nokia. All rights reserved.
+ * Copyright (C) 2019-2021 Nokia. All rights reserved.
* =========================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -20,30 +20,34 @@
package org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api;
-import static org.onap.dcaegen2.services.sdk.rest.services.adapters.http.test.DummyHttpServer.sendError;
-import static org.onap.dcaegen2.services.sdk.rest.services.adapters.http.test.DummyHttpServer.sendResource;
-
import com.google.gson.JsonElement;
import com.google.gson.JsonPrimitive;
import io.vavr.collection.List;
-
-import java.time.Duration;
-
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.CsvSource;
import org.onap.dcaegen2.services.sdk.model.streams.dmaap.ImmutableMessageRouterSource;
import org.onap.dcaegen2.services.sdk.model.streams.dmaap.MessageRouterSource;
import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.test.DummyHttpServer;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.DmaapResponse;
import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.ImmutableMessageRouterSubscribeRequest;
import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.ImmutableMessageRouterSubscribeResponse;
import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterSubscribeRequest;
import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterSubscribeResponse;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.ImmutableDmaapTimeoutConfig;
import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.MessageRouterSubscriberConfig;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
-import reactor.netty.http.server.HttpServerRoutes;
import reactor.test.StepVerifier;
+import java.time.Duration;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.onap.dcaegen2.services.sdk.rest.services.adapters.http.test.DummyHttpServer.sendError;
+import static org.onap.dcaegen2.services.sdk.rest.services.adapters.http.test.DummyHttpServer.sendResource;
+import static org.onap.dcaegen2.services.sdk.rest.services.adapters.http.test.DummyHttpServer.sendWithDelay;
+
/**
* @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a>
* @since May 2019
@@ -51,8 +55,10 @@ import reactor.test.StepVerifier;
class MessageRouterSubscriberTest {
private static final Duration TIMEOUT = Duration.ofSeconds(10);
private static final String ERROR_MESSAGE = "Something went wrong";
+ private static final String TIMEOUT_ERROR_MESSAGE = "408 Request Timeout";
private static final String CONSUMER_GROUP = "group1";
private static final String SUCCESS_CONSUMER_ID = "consumer200";
+ private static final String DELAY_CONSUMER_ID = "delay200";
private static final String FAILING_WITH_401_CONSUMER_ID = "consumer401";
private static final String FAILING_WITH_403_CONSUMER_ID = "consumer403";
private static final String FAILING_WITH_409_CONSUMER_ID = "consumer409";
@@ -63,6 +69,8 @@ class MessageRouterSubscriberTest {
private static final String SUCCESS_RESP_PATH = String
.format("%s/%s", CONSUMER_PATH, SUCCESS_CONSUMER_ID);
+ private static final String DELAY_RESP_PATH = String
+ .format("%s/%s", CONSUMER_PATH, DELAY_CONSUMER_ID);
private static final String FAILING_WITH_401_RESP_PATH = String
.format("%s/%s", CONSUMER_PATH, FAILING_WITH_401_CONSUMER_ID);
private static final String FAILING_WITH_403_RESP_PATH = String
@@ -83,7 +91,15 @@ class MessageRouterSubscriberTest {
@BeforeAll
static void setUp() {
- DummyHttpServer server = DummyHttpServer.start(MessageRouterSubscriberTest::setRoutes);
+ DummyHttpServer server = DummyHttpServer.start(routes -> routes
+ .get(SUCCESS_RESP_PATH, (req, resp) ->
+ sendResource(resp, "/sample-mr-subscribe-response.json"))
+ .get(DELAY_RESP_PATH, (req, resp) -> sendWithDelay(resp, 200, TIMEOUT))
+ .get(FAILING_WITH_401_RESP_PATH, (req, resp) -> sendError(resp, 401, ERROR_MESSAGE))
+ .get(FAILING_WITH_403_RESP_PATH, (req, resp) -> sendError(resp, 403, ERROR_MESSAGE))
+ .get(FAILING_WITH_409_RESP_PATH, (req, resp) -> sendError(resp, 409, ERROR_MESSAGE))
+ .get(FAILING_WITH_429_RESP_PATH, (req, resp) -> sendError(resp, 429, ERROR_MESSAGE))
+ .get(FAILING_WITH_500_RESP_PATH, (req, resp) -> sendError(resp, 500, ERROR_MESSAGE)));
sourceDefinition = createMessageRouterSource(server);
@@ -110,69 +126,19 @@ class MessageRouterSubscriberTest {
.verify(TIMEOUT);
}
- @Test
- void subscriber_shouldGetUnauthorizedErrorResponse() {
- MessageRouterSubscribeRequest request = createFailingRequest(FAILING_WITH_401_CONSUMER_ID);
- Mono<MessageRouterSubscribeResponse> response = sut.get(request);
-
- MessageRouterSubscribeResponse expectedResponse = createErrorResponse(String
- .format("401 Unauthorized\n%s", ERROR_MESSAGE));
-
- StepVerifier.create(response)
- .expectNext(expectedResponse)
- .expectComplete()
- .verify(TIMEOUT);
- }
-
- @Test
- void subscriber_shouldGetForbiddenErrorResponse() {
- MessageRouterSubscribeRequest request = createFailingRequest(FAILING_WITH_403_CONSUMER_ID);
- Mono<MessageRouterSubscribeResponse> response = sut.get(request);
-
- MessageRouterSubscribeResponse expectedResponse = createErrorResponse(String
- .format("403 Forbidden\n%s", ERROR_MESSAGE));
-
- StepVerifier.create(response)
- .expectNext(expectedResponse)
- .expectComplete()
- .verify(TIMEOUT);
- }
-
- @Test
- void subscriber_shouldGetConflictErrorResponse() {
- MessageRouterSubscribeRequest request = createFailingRequest(FAILING_WITH_409_CONSUMER_ID);
+ @ParameterizedTest
+ @CsvSource({
+ FAILING_WITH_401_CONSUMER_ID + "," + "401 Unauthorized",
+ FAILING_WITH_403_CONSUMER_ID + "," + "403 Forbidden",
+ FAILING_WITH_409_CONSUMER_ID + "," + "409 Conflict",
+ FAILING_WITH_429_CONSUMER_ID + "," + "429 Too Many Requests",
+ FAILING_WITH_500_CONSUMER_ID + "," + "500 Internal Server Error"
+ })
+ void subscriber_shouldHandleError(String consumerId, String failReason) {
+ MessageRouterSubscribeRequest request = createFailingRequest(consumerId);
Mono<MessageRouterSubscribeResponse> response = sut.get(request);
- MessageRouterSubscribeResponse expectedResponse = createErrorResponse(String
- .format("409 Conflict\n%s", ERROR_MESSAGE));
-
- StepVerifier.create(response)
- .expectNext(expectedResponse)
- .expectComplete()
- .verify(TIMEOUT);
- }
-
- @Test
- void subscriber_shouldGetTooManyRequestsErrorResponse() {
- MessageRouterSubscribeRequest request = createFailingRequest(FAILING_WITH_429_CONSUMER_ID);
- Mono<MessageRouterSubscribeResponse> response = sut.get(request);
-
- MessageRouterSubscribeResponse expectedResponse = createErrorResponse(String
- .format("429 Too Many Requests\n%s", ERROR_MESSAGE));
-
- StepVerifier.create(response)
- .expectNext(expectedResponse)
- .expectComplete()
- .verify(TIMEOUT);
- }
-
- @Test
- void subscriber_shouldGetInternalServerErrorResponse() {
- Mono<MessageRouterSubscribeResponse> response = sut
- .get(mrFailingRequest);
-
- MessageRouterSubscribeResponse expectedResponse = createErrorResponse(String
- .format("500 Internal Server Error\n%s", ERROR_MESSAGE));
+ MessageRouterSubscribeResponse expectedResponse = createErrorResponse(failReason);
StepVerifier.create(response)
.expectNext(expectedResponse)
@@ -226,20 +192,16 @@ class MessageRouterSubscriberTest {
.verify(TIMEOUT);
}
- private static HttpServerRoutes setRoutes(HttpServerRoutes routes) {
- return routes
- .get(SUCCESS_RESP_PATH, (req, resp) ->
- sendResource(resp, "/sample-mr-subscribe-response.json"))
- .get(FAILING_WITH_401_RESP_PATH, (req, resp) ->
- sendError(resp, 401, ERROR_MESSAGE))
- .get(FAILING_WITH_403_RESP_PATH, (req, resp) ->
- sendError(resp, 403, ERROR_MESSAGE))
- .get(FAILING_WITH_409_RESP_PATH, (req, resp) ->
- sendError(resp, 409, ERROR_MESSAGE))
- .get(FAILING_WITH_429_RESP_PATH, (req, resp) ->
- sendError(resp, 429, ERROR_MESSAGE))
- .get(FAILING_WITH_500_RESP_PATH, (req, resp) ->
- sendError(resp, 500, ERROR_MESSAGE));
+ @Test
+ void subscriber_shouldHandleClientTimeoutError() {
+ Duration requestTimeout = Duration.ofMillis(1);
+ MessageRouterSubscribeRequest request = createDelayRequest(DELAY_CONSUMER_ID, requestTimeout);
+ Mono<MessageRouterSubscribeResponse> response = sut.get(request);
+
+ StepVerifier.create(response)
+ .consumeNextWith(this::assertTimeoutError)
+ .expectComplete()
+ .verify(TIMEOUT);
}
private static MessageRouterSource createMessageRouterSource(DummyHttpServer server) {
@@ -257,6 +219,15 @@ class MessageRouterSubscriberTest {
.build();
}
+ private static MessageRouterSubscribeRequest createDelayRequest(String consumerId, Duration timeout) {
+ return ImmutableMessageRouterSubscribeRequest.builder()
+ .sourceDefinition(sourceDefinition)
+ .consumerGroup(CONSUMER_GROUP)
+ .consumerId(consumerId)
+ .timeoutConfig(ImmutableDmaapTimeoutConfig.builder().timeout(timeout).build())
+ .build();
+ }
+
private static MessageRouterSubscribeRequest createFailingRequest(String consumerId) {
return ImmutableMessageRouterSubscribeRequest
.builder()
@@ -266,11 +237,17 @@ class MessageRouterSubscriberTest {
.build();
}
- private static MessageRouterSubscribeResponse createErrorResponse(String failReason) {
+ private MessageRouterSubscribeResponse createErrorResponse(String failReason) {
+ String failReasonFormat = failReason + "\n%s";
return ImmutableMessageRouterSubscribeResponse
.builder()
- .failReason(failReason)
+ .failReason(String.format(failReasonFormat, ERROR_MESSAGE))
.build();
}
+
+ private void assertTimeoutError(DmaapResponse response) {
+ assertThat(response.failed()).isTrue();
+ assertThat(response.failReason()).startsWith(TIMEOUT_ERROR_MESSAGE);
+ }
}
diff --git a/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/error/ClientErrorReasonPresenterTest.java b/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/error/ClientErrorReasonPresenterTest.java
index 9b318d73..76d7a381 100644
--- a/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/error/ClientErrorReasonPresenterTest.java
+++ b/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/error/ClientErrorReasonPresenterTest.java
@@ -2,7 +2,7 @@
* ============LICENSE_START====================================
* DCAEGEN2-SERVICES-SDK
* =========================================================
- * Copyright (C) 2020 Nokia. All rights reserved.
+ * Copyright (C) 2020-2021 Nokia. All rights reserved.
* =========================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -34,7 +34,7 @@ class ClientErrorReasonPresenterTest {
+ "{\"requestError\":{\"serviceException\":{\"messageId\":\"messageId\",\"text\":\"text\"}}}";
//when
- String actual = ClientErrorReasonPresenter.present(clientErrorReason);
+ String actual = new ClientErrorReasonPresenter().present(clientErrorReason);
//then
assertThat(actual).isEqualTo(expected);
@@ -48,7 +48,7 @@ class ClientErrorReasonPresenterTest {
+ "{\"requestError\":{\"serviceException\":{\"messageId\":\"messageId\",\"text\":\"text\",\"variables\":[\"v1\",\"v2\"]}}}";
//when
- String actual = ClientErrorReasonPresenter.present(clientErrorReason);
+ String actual = new ClientErrorReasonPresenter().present(clientErrorReason);
//then
assertThat(actual).isEqualTo(expected);
diff --git a/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/impl/MessageRouterPublisherImplTest.java b/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/impl/MessageRouterPublisherImplTest.java
index f29bfa27..2825a87c 100644
--- a/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/impl/MessageRouterPublisherImplTest.java
+++ b/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/impl/MessageRouterPublisherImplTest.java
@@ -2,7 +2,7 @@
* ============LICENSE_START====================================
* DCAEGEN2-SERVICES-SDK
* =========================================================
- * Copyright (C) 2019-2020 Nokia. All rights reserved.
+ * Copyright (C) 2019-2021 Nokia. All rights reserved.
* =========================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -39,6 +39,7 @@ import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.ImmutableHttpR
import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.RxHttpClient;
import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.ContentType;
import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.MessageRouterPublisher;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.error.ClientErrorReasonPresenter;
import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishRequest;
import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishResponse;
import reactor.core.publisher.Flux;
@@ -69,9 +70,11 @@ class MessageRouterPublisherImplTest {
private static final Duration TIMEOUT = Duration.ofSeconds(5);
private static final String TOPIC_URL = "https://dmaap-mr/TOPIC";
private static final int MAX_BATCH_SIZE = 3;
- public static final String TIMEOUT_ERROR_MESSAGE_HEADER = "408 Request Timeout";
+ private static final String ERROR_MESSAGE = "Something went wrong";
private final RxHttpClient httpClient = mock(RxHttpClient.class);
- private final MessageRouterPublisher cut = new MessageRouterPublisherImpl(httpClient, MAX_BATCH_SIZE, Duration.ofMinutes(1));
+ private final ClientErrorReasonPresenter clientErrorReasonPresenter = mock(ClientErrorReasonPresenter.class);
+ private final MessageRouterPublisher cut = new MessageRouterPublisherImpl(
+ httpClient, MAX_BATCH_SIZE, Duration.ofMinutes(1), clientErrorReasonPresenter);
private final ArgumentCaptor<HttpRequest> httpRequestArgumentCaptor = ArgumentCaptor.forClass(HttpRequest.class);
private final MessageRouterPublishRequest plainPublishRequest = createPublishRequest(TOPIC_URL, ContentType.TEXT_PLAIN);
private final MessageRouterPublishRequest jsonPublishRequest = createPublishRequest(TOPIC_URL);
@@ -417,7 +420,10 @@ class MessageRouterPublisherImplTest {
final List<String> plainMessage = List.of("I", "like", "cookies");
final Flux<JsonElement> plainMessagesMaxBatch = plainBatch(plainMessage);
- given(httpClient.call(any(HttpRequest.class))).willReturn(Mono.error(ReadTimeoutException.INSTANCE));
+ given(clientErrorReasonPresenter.present(any()))
+ .willReturn(ERROR_MESSAGE);
+ given(httpClient.call(any(HttpRequest.class)))
+ .willReturn(Mono.error(ReadTimeoutException.INSTANCE));
// when
final Flux<MessageRouterPublishResponse> responses = cut
@@ -439,9 +445,12 @@ class MessageRouterPublisherImplTest {
final List<JsonObject> parsedThreeMessages = getAsJsonObjects(threeJsonMessages);
final Flux<JsonObject> doubleJsonMessageBatch = jsonBatch(threeJsonMessages.appendAll(twoJsonMessages));
+ given(clientErrorReasonPresenter.present(any()))
+ .willReturn(ERROR_MESSAGE);
given(httpClient.call(any(HttpRequest.class)))
.willReturn(Mono.just(successHttpResponse))
.willReturn(Mono.error(ReadTimeoutException.INSTANCE));
+
// when
final Flux<MessageRouterPublishResponse> responses = cut
.put(jsonPublishRequest, doubleJsonMessageBatch);
@@ -463,9 +472,12 @@ class MessageRouterPublisherImplTest {
final List<JsonObject> parsedTwoMessages = getAsJsonObjects(twoJsonMessages);
final Flux<JsonObject> doubleJsonMessageBatch = jsonBatch(threeJsonMessages.appendAll(twoJsonMessages));
+ given(clientErrorReasonPresenter.present(any()))
+ .willReturn(ERROR_MESSAGE);
given(httpClient.call(any(HttpRequest.class)))
.willReturn(Mono.error(ReadTimeoutException.INSTANCE))
.willReturn(Mono.just(successHttpResponse));
+
// when
final Flux<MessageRouterPublishResponse> responses = cut
.put(jsonPublishRequest, doubleJsonMessageBatch);
@@ -540,7 +552,7 @@ class MessageRouterPublisherImplTest {
private void assertTimeoutError(MessageRouterPublishResponse response) {
assertThat(response.failed()).isTrue();
assertThat(response.items()).isEmpty();
- assertThat(response.failReason()).startsWith(TIMEOUT_ERROR_MESSAGE_HEADER);
+ assertThat(response.failReason()).isEqualTo(ERROR_MESSAGE);
}
private void verifySingleResponse(List<? extends JsonElement> threeMessages,
diff --git a/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/impl/MessageRouterSubscriberImplTest.java b/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/impl/MessageRouterSubscriberImplTest.java
index 1f97001e..0396eff9 100644
--- a/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/impl/MessageRouterSubscriberImplTest.java
+++ b/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/impl/MessageRouterSubscriberImplTest.java
@@ -35,6 +35,7 @@ import org.onap.dcaegen2.services.sdk.model.streams.dmaap.ImmutableMessageRouter
import org.onap.dcaegen2.services.sdk.model.streams.dmaap.MessageRouterSource;
import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.*;
import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.MessageRouterSubscriber;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.error.ClientErrorReasonPresenter;
import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.ImmutableMessageRouterSubscribeRequest;
import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterSubscribeRequest;
import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterSubscribeResponse;
@@ -47,10 +48,12 @@ import reactor.core.publisher.Mono;
*/
class MessageRouterSubscriberImplTest {
+ private static final String ERROR_MESSAGE = "Something went wrong";
private final RxHttpClient httpClient = mock(RxHttpClient.class);
+ private final ClientErrorReasonPresenter clientErrorReasonPresenter = mock(ClientErrorReasonPresenter.class);
private final MessageRouterSubscriberConfig clientConfig = MessageRouterSubscriberConfig.createDefault();
private final MessageRouterSubscriber
- cut = new MessageRouterSubscriberImpl(httpClient, clientConfig.gsonInstance());
+ cut = new MessageRouterSubscriberImpl(httpClient, clientConfig.gsonInstance(),clientErrorReasonPresenter);
private final ArgumentCaptor<HttpRequest> httpRequestArgumentCaptor = ArgumentCaptor.forClass(HttpRequest.class);
private final MessageRouterSource sourceDefinition = ImmutableMessageRouterSource.builder()
@@ -136,7 +139,10 @@ class MessageRouterSubscriberImplTest {
void getWithProperRequest_shouldReturnTimeoutError() {
// given
- given(httpClient.call(any(HttpRequest.class))).willReturn(Mono.error(ReadTimeoutException.INSTANCE));
+ given(clientErrorReasonPresenter.present(any()))
+ .willReturn(ERROR_MESSAGE);
+ given(httpClient.call(any(HttpRequest.class)))
+ .willReturn(Mono.error(ReadTimeoutException.INSTANCE));
// when
final Mono<MessageRouterSubscribeResponse> responses = cut
@@ -145,7 +151,7 @@ class MessageRouterSubscriberImplTest {
// then
assertThat(response.failed()).isTrue();
- assertThat(response.failReason()).contains("408 Request Timeout");
+ assertThat(response.failReason()).isEqualTo(ERROR_MESSAGE);
assertThat(response.hasElements()).isFalse();
@@ -156,4 +162,4 @@ class MessageRouterSubscriberImplTest {
mrRequest.consumerGroup(), mrRequest.consumerId()));
assertThat(httpRequest.body()).isNull();
}
-} \ No newline at end of file
+}
diff --git a/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/model/config/DmaapRetryConfigTest.java b/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/model/config/DmaapRetryConfigTest.java
new file mode 100644
index 00000000..da3a88fe
--- /dev/null
+++ b/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/model/config/DmaapRetryConfigTest.java
@@ -0,0 +1,71 @@
+/*
+ * ============LICENSE_START====================================
+ * DCAEGEN2-SERVICES-SDK
+ * =========================================================
+ * Copyright (C) 2021 Nokia. All rights reserved.
+ * =========================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=====================================
+ */
+
+package org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config;
+
+import org.junit.jupiter.api.Test;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+class DmaapRetryConfigTest {
+ @Test
+ void shouldSuccessfullyCreateObject() {
+ DmaapRetryConfig retryConfig = ImmutableDmaapRetryConfig.builder()
+ .retryIntervalInSeconds(1)
+ .retryCount(0)
+ .build();
+
+ assertThat(retryConfig.retryIntervalInSeconds()).isOne();
+ assertThat(retryConfig.retryCount()).isZero();
+ }
+
+ @Test
+ void shouldSuccessfullyCreateObjectForDefaults() {
+ DmaapRetryConfig retryConfig = ImmutableDmaapRetryConfig.builder().build();
+
+ assertThat(retryConfig.retryIntervalInSeconds()).isOne();
+ assertThat(retryConfig.retryCount()).isEqualTo(3);
+ }
+
+ @Test
+ void shouldThrowInvalidArgumentExceptionForInvalidRetryInterval() {
+ assertThrows(IllegalArgumentException.class, () -> withRetryInterval(0));
+ assertThrows(IllegalArgumentException.class, () -> withRetryInterval(-3));
+ }
+
+ @Test
+ void shouldThrowInvalidArgumentExceptionForInvalidRetryCount() {
+ assertThrows(IllegalArgumentException.class, () -> withRetryCount(-1));
+ assertThrows(IllegalArgumentException.class, () -> withRetryCount(-3));
+ }
+
+ private void withRetryInterval(int ri) {
+ ImmutableDmaapRetryConfig.builder()
+ .retryIntervalInSeconds(ri)
+ .build();
+ }
+
+ private void withRetryCount(int rc) {
+ ImmutableDmaapRetryConfig.builder()
+ .retryCount(rc)
+ .build();
+ }
+}
diff --git a/rest-services/dmaap-client/src/test/resources/dmaap-msg-router/message-router-compose.yml b/rest-services/dmaap-client/src/test/resources/dmaap-msg-router/message-router-compose.yml
index ab6641cb..26eb1763 100644
--- a/rest-services/dmaap-client/src/test/resources/dmaap-msg-router/message-router-compose.yml
+++ b/rest-services/dmaap-client/src/test/resources/dmaap-msg-router/message-router-compose.yml
@@ -67,11 +67,11 @@ services:
- zookeeper
- kafka
- toxiproxy:
- image: shopify/toxiproxy:2.1.4
+ mockserver:
+ image: mockserver/mockserver:mockserver-5.11.2
+ command: -serverPort 1090 -proxyRemotePort 3904 -proxyRemoteHost dmaap
ports:
- - "8474:8474"
- - "8666:8666"
+ - "1080:1090"
networks:
- net
depends_on:
diff --git a/rest-services/http-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/adapters/http/RxHttpClient.java b/rest-services/http-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/adapters/http/RxHttpClient.java
index 77b842d7..d0bdf414 100644
--- a/rest-services/http-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/adapters/http/RxHttpClient.java
+++ b/rest-services/http-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/adapters/http/RxHttpClient.java
@@ -2,7 +2,7 @@
* ============LICENSE_START====================================
* DCAEGEN2-SERVICES-SDK
* =========================================================
- * Copyright (C) 2019-2020 Nokia. All rights reserved.
+ * Copyright (C) 2019-2021 Nokia. All rights reserved.
* =========================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -17,18 +17,25 @@
* limitations under the License.
* ============LICENSE_END=====================================
*/
+
package org.onap.dcaegen2.services.sdk.rest.services.adapters.http;
+import io.netty.handler.codec.http.HttpResponseStatus;
+import io.vavr.collection.HashSet;
import io.vavr.collection.Stream;
import io.vavr.control.Option;
+import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.config.RetryConfig;
import org.onap.dcaegen2.services.sdk.rest.services.model.logging.RequestDiagnosticContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.netty.http.client.HttpClient;
import reactor.netty.http.client.HttpClient.ResponseReceiver;
import reactor.netty.http.client.HttpClientRequest;
import reactor.netty.http.client.HttpClientResponse;
+import reactor.util.retry.Retry;
+import reactor.util.retry.RetryBackoffSpec;
import java.util.stream.Collectors;
@@ -39,17 +46,23 @@ public class RxHttpClient {
private static final Logger LOGGER = LoggerFactory.getLogger(RxHttpClient.class);
private final HttpClient httpClient;
+ private RetryConfig retryConfig;
RxHttpClient(HttpClient httpClient) {
this.httpClient = httpClient;
}
+ RxHttpClient(HttpClient httpClient, RetryConfig retryConfig) {
+ this(httpClient);
+ this.retryConfig = retryConfig;
+ }
+
public Mono<HttpResponse> call(HttpRequest request) {
- return prepareRequest(request)
- .responseSingle((resp, content) ->
- content.asByteArray()
- .defaultIfEmpty(new byte[0])
- .map(bytes -> new NettyHttpResponse(request.url(), resp.status(), bytes)));
+ Mono<HttpResponse> httpResponseMono = response(request);
+ return Option.of(retryConfig)
+ .map(rc -> retryConfig(rc, request.diagnosticContext()))
+ .map(httpResponseMono::retryWhen)
+ .getOrElse(() -> httpResponseMono);
}
ResponseReceiver<?> prepareRequest(HttpRequest request) {
@@ -65,6 +78,27 @@ public class RxHttpClient {
return prepareBody(request, theClient);
}
+ private Mono<HttpResponse> response(HttpRequest request) {
+ return prepareRequest(request)
+ .responseSingle((resp, content) -> mapResponse(request.url(), resp.status(), content));
+ }
+
+ private Mono<HttpResponse> mapResponse(String url, HttpResponseStatus status, reactor.netty.ByteBufMono content) {
+ if (shouldRetry(status.code())) {
+ return Mono.error(new RetryConfig.RetryableException());
+ }
+ return content.asByteArray()
+ .defaultIfEmpty(new byte[0])
+ .map(bytes -> new NettyHttpResponse(url, status, bytes));
+ }
+
+ private boolean shouldRetry(int code) {
+ return Option.of(retryConfig)
+ .map(RetryConfig::retryableHttpResponseCodes)
+ .getOrElse(HashSet::empty)
+ .contains(code);
+ }
+
private ResponseReceiver<?> prepareBody(HttpRequest request, HttpClient theClient) {
if (request.body() == null) {
return prepareBodyWithoutContents(request, theClient);
@@ -79,7 +113,7 @@ public class RxHttpClient {
return theClient
.headers(hdrs -> hdrs.set(HttpHeaders.TRANSFER_ENCODING_TYPE, HttpHeaders.CHUNKED))
.request(request.method().asNetty())
- .send(request.body().contents())
+ .send(Flux.from(request.body().contents()))
.uri(request.url());
}
@@ -87,7 +121,7 @@ public class RxHttpClient {
return theClient
.headers(hdrs -> hdrs.set(HttpHeaders.CONTENT_LENGTH, request.body().length().toString()))
.request(request.method().asNetty())
- .send(request.body().contents())
+ .send(Flux.from(request.body().contents()))
.uri(request.url());
}
@@ -114,4 +148,22 @@ public class RxHttpClient {
context.withSlf4jMdc(LOGGER.isDebugEnabled(),
() -> LOGGER.debug("Response status: {}", httpClientResponse.status()));
}
+
+ private RetryBackoffSpec retryConfig(RetryConfig retryConfig, RequestDiagnosticContext context) {
+ RetryBackoffSpec retry = Retry
+ .fixedDelay(retryConfig.retryCount(), retryConfig.retryInterval())
+ .doBeforeRetry(retrySignal -> context.withSlf4jMdc(
+ LOGGER.isTraceEnabled(), () -> LOGGER.trace("Retry: {}", retrySignal)))
+ .filter(ex -> isRetryable(retryConfig, ex));
+
+ return Option.of(retryConfig.onRetryExhaustedException())
+ .map(ex -> retry.onRetryExhaustedThrow((retryBackoffSpec, retrySignal) -> ex))
+ .getOrElse(retry);
+ }
+
+ private boolean isRetryable(RetryConfig retryConfig, Throwable ex) {
+ return retryConfig.retryableExceptions()
+ .toStream()
+ .exists(clazz -> clazz.isAssignableFrom(ex.getClass()));
+ }
}
diff --git a/rest-services/http-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/adapters/http/RxHttpClientFactory.java b/rest-services/http-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/adapters/http/RxHttpClientFactory.java
index 9b23f1d9..118df52b 100644
--- a/rest-services/http-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/adapters/http/RxHttpClientFactory.java
+++ b/rest-services/http-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/adapters/http/RxHttpClientFactory.java
@@ -2,7 +2,7 @@
* ============LICENSE_START====================================
* DCAEGEN2-SERVICES-SDK
* =========================================================
- * Copyright (C) 2019 Nokia. All rights reserved.
+ * Copyright (C) 2019-2021 Nokia. All rights reserved.
* =========================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -21,7 +21,9 @@
package org.onap.dcaegen2.services.sdk.rest.services.adapters.http;
import io.netty.handler.ssl.SslContext;
+import io.vavr.control.Option;
import org.jetbrains.annotations.NotNull;
+import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.config.RxHttpClientConfig;
import org.onap.dcaegen2.services.sdk.security.ssl.SecurityKeys;
import org.onap.dcaegen2.services.sdk.security.ssl.SslFactory;
import org.onap.dcaegen2.services.sdk.security.ssl.TrustStoreKeys;
@@ -42,23 +44,53 @@ public final class RxHttpClientFactory {
return new RxHttpClient(HttpClient.create());
}
+ public static RxHttpClient create(RxHttpClientConfig config) {
+ return createWithConfig(HttpClient.create(), config);
+ }
public static RxHttpClient create(SecurityKeys securityKeys) {
final SslContext context = SSL_FACTORY.createSecureClientContext(securityKeys);
return create(context);
}
+ public static RxHttpClient create(SecurityKeys securityKeys, RxHttpClientConfig config) {
+ final SslContext context = SSL_FACTORY.createSecureClientContext(securityKeys);
+ return create(context, config);
+ }
+
public static RxHttpClient create(TrustStoreKeys trustStoreKeys) {
final SslContext context = SSL_FACTORY.createSecureClientContext(trustStoreKeys);
return create(context);
}
+ public static RxHttpClient create(TrustStoreKeys trustStoreKeys, RxHttpClientConfig config) {
+ final SslContext context = SSL_FACTORY.createSecureClientContext(trustStoreKeys);
+ return create(context, config);
+ }
+
public static RxHttpClient createInsecure() {
final SslContext context = SSL_FACTORY.createInsecureClientContext();
return create(context);
}
+ public static RxHttpClient createInsecure(RxHttpClientConfig config) {
+ final SslContext context = SSL_FACTORY.createInsecureClientContext();
+ return create(context, config);
+ }
+
private static RxHttpClient create(@NotNull SslContext sslContext) {
- return new RxHttpClient(HttpClient.create().secure(sslContextSpec -> sslContextSpec.sslContext(sslContext)));
+ HttpClient secure = HttpClient.create().secure(sslContextSpec -> sslContextSpec.sslContext(sslContext));
+ return new RxHttpClient(secure);
+ }
+
+ private static RxHttpClient create(@NotNull SslContext sslContext, RxHttpClientConfig config) {
+ HttpClient secure = HttpClient.create().secure(sslContextSpec -> sslContextSpec.sslContext(sslContext));
+ return createWithConfig(secure, config);
+ }
+
+ private static RxHttpClient createWithConfig(HttpClient httpClient, RxHttpClientConfig config) {
+ return Option.of(config.retryConfig())
+ .map(retryConfig -> new RxHttpClient(httpClient, retryConfig))
+ .getOrElse(() -> new RxHttpClient(httpClient));
}
}
diff --git a/rest-services/http-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/adapters/http/config/RetryConfig.java b/rest-services/http-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/adapters/http/config/RetryConfig.java
new file mode 100644
index 00000000..a0ae1991
--- /dev/null
+++ b/rest-services/http-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/adapters/http/config/RetryConfig.java
@@ -0,0 +1,59 @@
+/*
+ * ============LICENSE_START====================================
+ * DCAEGEN2-SERVICES-SDK
+ * =========================================================
+ * Copyright (C) 2021 Nokia. All rights reserved.
+ * =========================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=====================================
+ */
+
+package org.onap.dcaegen2.services.sdk.rest.services.adapters.http.config;
+
+import io.vavr.collection.HashSet;
+import io.vavr.collection.Set;
+import org.immutables.value.Value;
+import org.jetbrains.annotations.Nullable;
+
+import java.time.Duration;
+
+@Value.Immutable
+public interface RetryConfig {
+
+ int retryCount();
+
+ Duration retryInterval();
+
+ @Value.Default
+ default Set<Integer> retryableHttpResponseCodes() {
+ return HashSet.empty();
+ }
+
+ @Value.Default
+ default Set<Class<? extends Throwable>> customRetryableExceptions() {
+ return HashSet.empty();
+ }
+
+ @Value.Derived
+ default Set<Class<? extends Throwable>> retryableExceptions() {
+ Set<Class<? extends Throwable>> result = customRetryableExceptions();
+ if (retryableHttpResponseCodes().nonEmpty()) {
+ result = result.add(RetryableException.class);
+ }
+ return result;
+ }
+
+ @Nullable RuntimeException onRetryExhaustedException();
+
+ class RetryableException extends RuntimeException {}
+}
diff --git a/rest-services/http-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/adapters/http/config/RxHttpClientConfig.java b/rest-services/http-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/adapters/http/config/RxHttpClientConfig.java
new file mode 100644
index 00000000..78a88a47
--- /dev/null
+++ b/rest-services/http-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/adapters/http/config/RxHttpClientConfig.java
@@ -0,0 +1,29 @@
+/*
+ * ============LICENSE_START====================================
+ * DCAEGEN2-SERVICES-SDK
+ * =========================================================
+ * Copyright (C) 2021 Nokia. All rights reserved.
+ * =========================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=====================================
+ */
+
+package org.onap.dcaegen2.services.sdk.rest.services.adapters.http.config;
+
+import org.immutables.value.Value;
+import org.jetbrains.annotations.Nullable;
+
+@Value.Immutable
+public interface RxHttpClientConfig {
+ @Nullable RetryConfig retryConfig();
+}
diff --git a/rest-services/http-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/adapters/http/test/DummyHttpServer.java b/rest-services/http-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/adapters/http/test/DummyHttpServer.java
index 4795b00f..8ac0d1d5 100644
--- a/rest-services/http-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/adapters/http/test/DummyHttpServer.java
+++ b/rest-services/http-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/adapters/http/test/DummyHttpServer.java
@@ -2,7 +2,7 @@
* ============LICENSE_START====================================
* DCAEGEN2-SERVICES-SDK
* =========================================================
- * Copyright (C) 2019 Nokia. All rights reserved.
+ * Copyright (C) 2019-2021 Nokia. All rights reserved.
* =========================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -21,11 +21,8 @@
package org.onap.dcaegen2.services.sdk.rest.services.adapters.http.test;
import io.vavr.CheckedFunction0;
-import java.net.URL;
-import java.nio.file.Files;
-import java.nio.file.Paths;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.function.Consumer;
+import io.vavr.Tuple3;
+import io.vavr.control.Try;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -35,6 +32,13 @@ import reactor.netty.http.server.HttpServer;
import reactor.netty.http.server.HttpServerResponse;
import reactor.netty.http.server.HttpServerRoutes;
+import java.net.URL;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.time.Duration;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Consumer;
+
/**
* @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a>
* @since February 2019
@@ -63,11 +67,26 @@ public class DummyHttpServer {
return responses[state.getAndIncrement()];
}
+ public static Publisher<Void> sendInOrderWithDelay(AtomicInteger counter, Tuple3<HttpServerResponse, Integer, Duration>... responses) {
+ Tuple3<HttpServerResponse, Integer, Duration> tuple = responses[counter.get()];
+ HttpServerResponse httpServerResponse = tuple._1;
+ Integer statusCode = tuple._2;
+ long timeout = tuple._3.toMillis();
+ Try.run(() -> Thread.sleep(timeout));
+ counter.incrementAndGet();
+ return sendString(httpServerResponse.status(statusCode), Mono.just("OK"));
+ }
+
+ public static Publisher<Void> sendWithDelay(HttpServerResponse response, int statusCode, Duration timeout) {
+ Try.run(() -> Thread.sleep(timeout.toMillis()));
+ return sendString(response.status(statusCode), Mono.just("OK"));
+ }
+
public static Publisher<Void> sendResource(HttpServerResponse httpServerResponse, String resourcePath) {
return sendString(httpServerResponse, Mono.fromCallable(() -> readResource(resourcePath)));
}
- public static Publisher<Void> sendError(HttpServerResponse httpServerResponse, int statusCode, String message){
+ public static Publisher<Void> sendError(HttpServerResponse httpServerResponse, int statusCode, String message) {
return sendString(httpServerResponse.status(statusCode), Mono.just(message));
}
@@ -79,6 +98,11 @@ public class DummyHttpServer {
server.disposeNow();
}
+ public DummyHttpServer closeAndGet() {
+ server.disposeNow();
+ return this;
+ }
+
public String host() {
return server.host();
}
diff --git a/rest-services/http-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/adapters/http/RxHttpClientIT.java b/rest-services/http-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/adapters/http/RxHttpClientIT.java
index 6f3a0909..daf04c6e 100644
--- a/rest-services/http-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/adapters/http/RxHttpClientIT.java
+++ b/rest-services/http-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/adapters/http/RxHttpClientIT.java
@@ -2,7 +2,7 @@
* ============LICENSE_START====================================
* DCAEGEN2-SERVICES-SDK
* =========================================================
- * Copyright (C) 2019-2020 Nokia. All rights reserved.
+ * Copyright (C) 2019-2021 Nokia. All rights reserved.
* =========================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -22,33 +22,55 @@ package org.onap.dcaegen2.services.sdk.rest.services.adapters.http;
import io.netty.handler.codec.http.HttpResponseStatus;
import io.netty.handler.timeout.ReadTimeoutException;
+import io.vavr.Tuple;
+import io.vavr.collection.HashSet;
import org.junit.jupiter.api.AfterAll;
-import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
+import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.config.ImmutableRetryConfig;
+import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.config.ImmutableRxHttpClientConfig;
import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.exceptions.HttpException;
import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.test.DummyHttpServer;
import reactor.core.publisher.Mono;
import reactor.test.StepVerifier;
+import java.net.ConnectException;
import java.net.MalformedURLException;
import java.net.URL;
import java.time.Duration;
+import java.util.concurrent.atomic.AtomicInteger;
import static org.assertj.core.api.Assertions.assertThat;
+import static org.onap.dcaegen2.services.sdk.rest.services.adapters.http.test.DummyHttpServer.sendInOrderWithDelay;
import static org.onap.dcaegen2.services.sdk.rest.services.adapters.http.test.DummyHttpServer.sendString;
class RxHttpClientIT {
private static final Duration TIMEOUT = Duration.ofHours(5);
- private final RxHttpClient cut = RxHttpClientFactory.create();
- private static DummyHttpServer httpServer;
-
- @BeforeAll
- static void setUpClass() {
- httpServer = DummyHttpServer.start(routes -> routes
- .get("/sample-get", (req, resp) -> sendString(resp, Mono.just("OK")))
- .get("/delayed-get", (req, resp) -> sendString(resp, Mono.just("OK").delayElement(Duration.ofMinutes(1))))
+ private static final Duration NO_DELAY = Duration.ofSeconds(0);
+ private static final int RETRY_COUNT = 1;
+ private static final int EXPECTED_REQUESTS_WHEN_RETRY = RETRY_COUNT + 1;
+ private static final DummyHttpServer HTTP_SERVER = initialize();
+ private static final DummyHttpServer DISPOSED_HTTP_SERVER = initialize().closeAndGet();
+ private static final Mono<String> OK = Mono.just("OK");
+ private static final Duration RETRY_INTERVAL = Duration.ofMillis(1);
+ private static AtomicInteger REQUEST_COUNTER;
+
+ private static DummyHttpServer initialize() {
+ return DummyHttpServer.start(routes -> routes
+ .get("/sample-get", (req, resp) -> sendString(resp, OK))
+ .get("/delay-get", (req, resp) ->
+ sendInOrderWithDelay(REQUEST_COUNTER, Tuple.of(resp, 200, Duration.ofSeconds(3))))
.get("/sample-get-500", (req, resp) -> resp.status(HttpResponseStatus.INTERNAL_SERVER_ERROR).send())
+ .get("/retry-get-500", (req, resp) ->
+ sendInOrderWithDelay(REQUEST_COUNTER,
+ Tuple.of(resp, 500, NO_DELAY), Tuple.of(resp, 500, NO_DELAY)))
+ .get("/retry-get-400", (req, resp) ->
+ sendInOrderWithDelay(REQUEST_COUNTER, Tuple.of(resp, 400, NO_DELAY)))
+ .get("/retry-get-500-200", (req, resp) ->
+ sendInOrderWithDelay(REQUEST_COUNTER,
+ Tuple.of(resp, 500, NO_DELAY), Tuple.of(resp, 200, NO_DELAY)))
+ .get("/retry-get-200", (req, resp) ->
+ sendInOrderWithDelay(REQUEST_COUNTER, Tuple.of(resp, 200, NO_DELAY)))
.post("/headers-post", (req, resp) -> resp
.sendString(Mono.just(req.requestHeaders().toString())))
.post("/echo-post", (req, resp) -> resp.send(req.receive().retain()))
@@ -57,12 +79,7 @@ class RxHttpClientIT {
@AfterAll
static void tearDownClass() {
- httpServer.close();
- }
-
- private ImmutableHttpRequest.Builder requestFor(String path) throws MalformedURLException {
- return ImmutableHttpRequest.builder()
- .url(new URL("http", httpServer.host(), httpServer.port(), path).toString());
+ HTTP_SERVER.close();
}
@Test
@@ -71,6 +88,7 @@ class RxHttpClientIT {
final HttpRequest httpRequest = requestFor("/sample-get")
.method(HttpMethod.GET)
.build();
+ final RxHttpClient cut = RxHttpClientFactory.create();
// when
final Mono<String> bodyAsString = cut.call(httpRequest)
@@ -90,6 +108,7 @@ class RxHttpClientIT {
final HttpRequest httpRequest = requestFor("/sample-get-500")
.method(HttpMethod.GET)
.build();
+ final RxHttpClient cut = RxHttpClientFactory.create();
// when
final Mono<String> bodyAsString = cut.call(httpRequest)
@@ -110,6 +129,7 @@ class RxHttpClientIT {
.method(HttpMethod.POST)
.body(RequestBody.fromString(requestBody))
.build();
+ final RxHttpClient cut = RxHttpClientFactory.create();
// when
final Mono<String> bodyAsString = cut.call(httpRequest)
@@ -131,6 +151,7 @@ class RxHttpClientIT {
.method(HttpMethod.POST)
.body(RequestBody.chunkedFromString(Mono.just(requestBody)))
.build();
+ final RxHttpClient cut = RxHttpClientFactory.create();
// when
final Mono<String> bodyAsString = cut.call(httpRequest)
@@ -155,6 +176,7 @@ class RxHttpClientIT {
.method(HttpMethod.POST)
.body(RequestBody.fromString(requestBody))
.build();
+ final RxHttpClient cut = RxHttpClientFactory.create();
// when
final Mono<String> bodyAsString = cut.call(httpRequest)
@@ -174,10 +196,12 @@ class RxHttpClientIT {
@Test
void getWithTimeoutError() throws Exception {
// given
- final HttpRequest httpRequest = requestFor("/delayed-get")
+ REQUEST_COUNTER = new AtomicInteger();
+ final HttpRequest httpRequest = requestFor("/delay-get")
.method(HttpMethod.GET)
- .timeout(Duration.ofSeconds(1))
+ .timeout(Duration.ofMillis(1))
.build();
+ final RxHttpClient cut = RxHttpClientFactory.create(ImmutableRxHttpClientConfig.builder().build());
// when
final Mono<HttpResponse> response = cut.call(httpRequest);
@@ -186,5 +210,208 @@ class RxHttpClientIT {
StepVerifier.create(response)
.expectError(ReadTimeoutException.class)
.verify(TIMEOUT);
+ assertNoServerResponse();
+ }
+
+ @Test
+ void getWithRetryExhaustedExceptionWhenClosedServer() throws Exception {
+ // given
+ REQUEST_COUNTER = new AtomicInteger();
+ final HttpRequest httpRequest = requestForClosedServer("/sample-get")
+ .method(HttpMethod.GET)
+ .build();
+ final RxHttpClient cut = RxHttpClientFactory.create(ImmutableRxHttpClientConfig.builder()
+ .retryConfig(defaultRetryConfig()
+ .customRetryableExceptions(HashSet.of(ConnectException.class))
+ .build())
+ .build());
+
+ // when
+ final Mono<HttpResponse> response = cut.call(httpRequest);
+
+ // then
+ StepVerifier.create(response)
+ .expectError(IllegalStateException.class)
+ .verify(TIMEOUT);
+ assertNoServerResponse();
+ }
+
+ @Test
+ void getWithCustomRetryExhaustedExceptionWhenClosedServer() throws Exception {
+ // given
+ REQUEST_COUNTER = new AtomicInteger();
+ final HttpRequest httpRequest = requestForClosedServer("/sample-get")
+ .method(HttpMethod.GET)
+ .build();
+ final RxHttpClient cut = RxHttpClientFactory.create(ImmutableRxHttpClientConfig.builder()
+ .retryConfig(defaultRetryConfig()
+ .customRetryableExceptions(HashSet.of(ConnectException.class))
+ .onRetryExhaustedException(ReadTimeoutException.INSTANCE)
+ .build())
+ .build());
+
+ // when
+ final Mono<HttpResponse> response = cut.call(httpRequest);
+
+ // then
+ StepVerifier.create(response)
+ .expectError(ReadTimeoutException.class)
+ .verify(TIMEOUT);
+ assertNoServerResponse();
+ }
+
+ @Test
+ void getWithRetryExhaustedExceptionWhen500() throws Exception {
+ // given
+ REQUEST_COUNTER = new AtomicInteger();
+ final HttpRequest httpRequest = requestFor("/retry-get-500")
+ .method(HttpMethod.GET)
+ .build();
+ final RxHttpClient cut = RxHttpClientFactory.create(ImmutableRxHttpClientConfig.builder()
+ .retryConfig(defaultRetryConfig()
+ .retryableHttpResponseCodes(HashSet.of(500))
+ .build())
+ .build());
+
+ // when
+ final Mono<HttpResponse> response = cut.call(httpRequest);
+
+ // then
+ StepVerifier.create(response)
+ .expectError(IllegalStateException.class)
+ .verify(TIMEOUT);
+ assertRetry();
+ }
+
+ @Test
+ void getWithCustomRetryExhaustedExceptionWhen500() throws Exception {
+ // given
+ REQUEST_COUNTER = new AtomicInteger();
+ final HttpRequest httpRequest = requestFor("/retry-get-500")
+ .method(HttpMethod.GET)
+ .build();
+ final RxHttpClient cut = RxHttpClientFactory.create(ImmutableRxHttpClientConfig.builder()
+ .retryConfig(defaultRetryConfig()
+ .onRetryExhaustedException(ReadTimeoutException.INSTANCE)
+ .retryableHttpResponseCodes(HashSet.of(500))
+ .build())
+ .build());
+
+ // when
+ final Mono<HttpResponse> response = cut.call(httpRequest);
+
+ // then
+ StepVerifier.create(response)
+ .expectError(ReadTimeoutException.class)
+ .verify(TIMEOUT);
+ assertRetry();
+ }
+
+ @Test
+ void getWithRetryWhen500AndThen200() throws Exception {
+ // given
+ REQUEST_COUNTER = new AtomicInteger();
+ final HttpRequest httpRequest = requestFor("/retry-get-500-200")
+ .method(HttpMethod.GET)
+ .build();
+ final RxHttpClient cut = RxHttpClientFactory.create(ImmutableRxHttpClientConfig.builder()
+ .retryConfig(defaultRetryConfig()
+ .retryableHttpResponseCodes(HashSet.of(500))
+ .build())
+ .build());
+
+ // when
+ final Mono<String> bodyAsString = cut.call(httpRequest)
+ .doOnNext(HttpResponse::throwIfUnsuccessful)
+ .map(HttpResponse::bodyAsString);
+
+ // then
+ StepVerifier.create(bodyAsString)
+ .expectNext("OK")
+ .expectComplete()
+ .verify(TIMEOUT);
+ assertRetry();
+ }
+
+ @Test
+ void getWithoutRetryWhen200() throws Exception {
+ // given
+ REQUEST_COUNTER = new AtomicInteger();
+ final HttpRequest httpRequest = requestFor("/retry-get-200")
+ .method(HttpMethod.GET)
+ .build();
+ final RxHttpClient cut = RxHttpClientFactory.create(ImmutableRxHttpClientConfig.builder()
+ .retryConfig(defaultRetryConfig()
+ .retryableHttpResponseCodes(HashSet.of(500))
+ .build())
+ .build());
+
+ // when
+ final Mono<String> bodyAsString = cut.call(httpRequest)
+ .doOnNext(HttpResponse::throwIfUnsuccessful)
+ .map(HttpResponse::bodyAsString);
+
+ // then
+ StepVerifier.create(bodyAsString)
+ .expectNext("OK")
+ .expectComplete()
+ .verify(TIMEOUT);
+ assertNoRetry();
+ }
+
+ @Test
+ void getWithoutRetryWhen400() throws Exception {
+ // given
+ REQUEST_COUNTER = new AtomicInteger();
+ final HttpRequest httpRequest = requestFor("/retry-get-400")
+ .method(HttpMethod.GET)
+ .build();
+ final RxHttpClient cut = RxHttpClientFactory.create(ImmutableRxHttpClientConfig.builder()
+ .retryConfig(defaultRetryConfig()
+ .retryableHttpResponseCodes(HashSet.of(500))
+ .build())
+ .build());
+
+ // when
+ Mono<HttpResponse> result = cut.call(httpRequest);
+
+ // then
+ StepVerifier.create(result)
+ .consumeNextWith(this::assert400)
+ .expectComplete()
+ .verify(TIMEOUT);
+ assertNoRetry();
+ }
+
+ private ImmutableHttpRequest.Builder requestFor(String path) throws MalformedURLException {
+ return ImmutableHttpRequest.builder()
+ .url(new URL("http", HTTP_SERVER.host(), HTTP_SERVER.port(), path).toString());
+ }
+
+ private ImmutableHttpRequest.Builder requestForClosedServer(String path) throws MalformedURLException {
+ return ImmutableHttpRequest.builder()
+ .url(new URL("http", DISPOSED_HTTP_SERVER.host(), DISPOSED_HTTP_SERVER.port(), path).toString());
+ }
+
+ private ImmutableRetryConfig.Builder defaultRetryConfig() {
+ return ImmutableRetryConfig.builder()
+ .retryCount(RETRY_COUNT)
+ .retryInterval(RETRY_INTERVAL);
+ }
+
+ private void assertRetry() {
+ assertThat(REQUEST_COUNTER.get()).isEqualTo(EXPECTED_REQUESTS_WHEN_RETRY);
+ }
+
+ private void assertNoRetry() {
+ assertThat(REQUEST_COUNTER.get()).isOne();
+ }
+
+ private void assertNoServerResponse() {
+ assertThat(REQUEST_COUNTER.get()).isZero();
+ }
+
+ private void assert400(HttpResponse httpResponse) {
+ assertThat(httpResponse.statusCode()).isEqualTo(400);
}
}