aboutsummaryrefslogtreecommitdiffstats
path: root/rest-services/dmaap-client/src
diff options
context:
space:
mode:
authortkogut <tomasz.kogut@nokia.com>2021-01-25 17:56:56 +0100
committertkogut <tomasz.kogut@nokia.com>2021-01-25 17:56:56 +0100
commit214d24db845fe1485f91b03971c40640601881ca (patch)
treed96311161b6c8c41ea1f08a472124e85713b3e53 /rest-services/dmaap-client/src
parent9b309b5e3905cb25d5d661c4428cc9d4ad0402a6 (diff)
Fix problem with resource releases when retry more than twice
Issue-ID: DCAEGEN2-1483 Signed-off-by: tkogut <tomasz.kogut@nokia.com> Change-Id: I8fc58e035226c7c2d499b23f0f31df7bbdb147d4
Diffstat (limited to 'rest-services/dmaap-client/src')
-rw-r--r--rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterPublisherIT.java56
-rw-r--r--rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterSubscriberIT.java63
2 files changed, 105 insertions, 14 deletions
diff --git a/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterPublisherIT.java b/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterPublisherIT.java
index f6ef94b7..159fc598 100644
--- a/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterPublisherIT.java
+++ b/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterPublisherIT.java
@@ -27,7 +27,6 @@ import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockserver.client.MockServerClient;
-import org.mockserver.matchers.TimeToLive;
import org.mockserver.matchers.Times;
import org.mockserver.verify.VerificationTimes;
import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.ContentType;
@@ -352,7 +351,9 @@ class MessageRouterPublisherIT {
MOCK_SERVER_CLIENT
.when(request().withPath(path), Times.once())
.respond(response().withStatusCode(404));
- final MessageRouterPublisher publisher = DmaapClientFactory.createMessageRouterPublisher(retryConfig());
+
+ final MessageRouterPublisher publisher = DmaapClientFactory.createMessageRouterPublisher(
+ retryConfig(1, 1));
//when
final Flux<MessageRouterPublishResponse> result = publisher.put(publishRequest, plainBatch);
@@ -381,8 +382,9 @@ class MessageRouterPublisherIT {
final String path = String.format("/events/%s", topic);
MOCK_SERVER_CLIENT
.when(request().withPath(path), Times.once())
- .respond(response().withDelay(TimeUnit.SECONDS, 10));
- final MessageRouterPublisher publisher = DmaapClientFactory.createMessageRouterPublisher(retryConfig());
+ .respond(response().withDelay(TimeUnit.SECONDS, 2));
+ final MessageRouterPublisher publisher = DmaapClientFactory.createMessageRouterPublisher(
+ retryConfig(1, 1));
//when
final Flux<MessageRouterPublishResponse> result = publisher.put(publishRequest, plainBatch);
@@ -396,11 +398,51 @@ class MessageRouterPublisherIT {
MOCK_SERVER_CLIENT.verify(request().withPath(path), VerificationTimes.exactly(2));
}
- private MessageRouterPublisherConfig retryConfig() {
+ @Test
+ void publisher_shouldRetryManyTimesAndSuccessfullyPublish() {
+ final String topic = "TOPIC13";
+ final String topicUrl = String.format("%s/%s", PROXY_MOCK_EVENTS_PATH, topic);
+
+ final List<String> twoJsonMessages = List.of("{\"message\":\"message1\"}",
+ "{\"differentMessage\":\"message2\"}");
+ final List<JsonElement> expectedItems = getAsJsonElements(twoJsonMessages);
+ final Flux<JsonElement> plainBatch = plainBatch(twoJsonMessages);
+
+ final MessageRouterPublishRequest publishRequest = createPublishRequest(topicUrl, Duration.ofSeconds(1));
+ final MessageRouterPublishResponse expectedResponse = successPublishResponse(expectedItems);
+
+ final String path = String.format("/events/%s", topic);
+ MOCK_SERVER_CLIENT
+ .when(request().withPath(path), Times.once())
+ .respond(response().withDelay(TimeUnit.SECONDS, 2));
+ MOCK_SERVER_CLIENT
+ .when(request().withPath(path), Times.once())
+ .respond(response().withStatusCode(404));
+ MOCK_SERVER_CLIENT
+ .when(request().withPath(path), Times.once())
+ .respond(response().withDelay(TimeUnit.SECONDS, 2));
+ MOCK_SERVER_CLIENT
+ .when(request().withPath(path), Times.once())
+ .respond(response().withStatusCode(500));
+ final MessageRouterPublisher publisher = DmaapClientFactory.createMessageRouterPublisher(retryConfig(1, 5));
+
+ //when
+ final Flux<MessageRouterPublishResponse> result = publisher.put(publishRequest, plainBatch);
+
+ //then
+ StepVerifier.create(result)
+ .expectNext(expectedResponse)
+ .expectComplete()
+ .verify();
+
+ MOCK_SERVER_CLIENT.verify(request().withPath(path), VerificationTimes.exactly(5));
+ }
+
+ private MessageRouterPublisherConfig retryConfig(int retryInterval, int retryCount) {
return ImmutableMessageRouterPublisherConfig.builder()
.retryConfig(ImmutableDmaapRetryConfig.builder()
- .retryIntervalInSeconds(1)
- .retryCount(1)
+ .retryIntervalInSeconds(retryInterval)
+ .retryCount(retryCount)
.build())
.build();
}
diff --git a/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterSubscriberIT.java b/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterSubscriberIT.java
index 1f4e499d..15c3bd8e 100644
--- a/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterSubscriberIT.java
+++ b/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterSubscriberIT.java
@@ -245,7 +245,7 @@ class MessageRouterSubscriberIT {
final String path = String.format("/events/%s/%s/%s", topic, CONSUMER_GROUP, CONSUMER_ID);
MOCK_SERVER_CLIENT
.when(request().withPath(path), Times.once())
- .respond(response().withDelay(TimeUnit.SECONDS, 5));
+ .respond(response().withDelay(TimeUnit.SECONDS, 2));
//when
Mono<MessageRouterSubscribeResponse> response = subscriber
@@ -278,7 +278,8 @@ class MessageRouterSubscriberIT {
MOCK_SERVER_CLIENT
.when(request().withPath(path), Times.once())
.respond(response().withStatusCode(404));
- final MessageRouterSubscriber subscriber = DmaapClientFactory.createMessageRouterSubscriber(retryConfig());
+ final MessageRouterSubscriber subscriber = DmaapClientFactory.createMessageRouterSubscriber(
+ retryConfig(1, 1));
//when
registerTopic(publisher, createPublishRequest(topicUrl), subscriber,
@@ -314,8 +315,9 @@ class MessageRouterSubscriberIT {
final String path = String.format("/events/%s/%s/%s", topic, CONSUMER_GROUP, CONSUMER_ID);
MOCK_SERVER_CLIENT
.when(request().withPath(path), Times.once())
- .respond(response().withDelay(TimeUnit.SECONDS, 10));
- final MessageRouterSubscriber subscriber = DmaapClientFactory.createMessageRouterSubscriber(retryConfig());
+ .respond(response().withDelay(TimeUnit.SECONDS, 2));
+ final MessageRouterSubscriber subscriber = DmaapClientFactory.createMessageRouterSubscriber(
+ retryConfig(1, 1));
//when
registerTopic(publisher, createPublishRequest(topicUrl), subscriber,
@@ -333,11 +335,58 @@ class MessageRouterSubscriberIT {
MOCK_SERVER_CLIENT.verify(request().withPath(path), VerificationTimes.exactly(2));
}
- private MessageRouterSubscriberConfig retryConfig() {
+ @Test
+ void subscriber_shouldRetryManyTimesAndSuccessfullySubscribe() {
+ //given
+ final String topic = "TOPIC8";
+ final String proxyTopicUrl = String.format("%s/%s", PROXY_MOCK_EVENTS_PATH, topic);
+ final String topicUrl = String.format("%s/%s", EVENTS_PATH, topic);
+ final MessageRouterPublishRequest publishRequest = createPublishRequest(proxyTopicUrl);
+ final MessageRouterSubscribeRequest subscribeRequest = createMRSubscribeRequest(
+ proxyTopicUrl, CONSUMER_GROUP, CONSUMER_ID, Duration.ofSeconds(1));
+
+ final List<String> singleJsonMessage = List.of("{\"message\":\"message1\"}");
+ final List<JsonElement> expectedItems = getAsJsonElements(singleJsonMessage);
+ final Flux<JsonObject> jsonMessageBatch = jsonBatch(singleJsonMessage);
+ final MessageRouterSubscribeResponse expectedResponse = successSubscribeResponse(expectedItems);
+
+ final String path = String.format("/events/%s/%s/%s", topic, CONSUMER_GROUP, CONSUMER_ID);
+ MOCK_SERVER_CLIENT
+ .when(request().withPath(path), Times.once())
+ .respond(response().withDelay(TimeUnit.SECONDS, 2));
+ MOCK_SERVER_CLIENT
+ .when(request().withPath(path), Times.once())
+ .respond(response().withStatusCode(404));
+ MOCK_SERVER_CLIENT
+ .when(request().withPath(path), Times.once())
+ .respond(response().withDelay(TimeUnit.SECONDS, 2));
+ MOCK_SERVER_CLIENT
+ .when(request().withPath(path), Times.once())
+ .respond(response().withStatusCode(500));
+ final MessageRouterSubscriber subscriber = DmaapClientFactory.createMessageRouterSubscriber(
+ retryConfig(1, 5));
+
+ //when
+ registerTopic(publisher, createPublishRequest(topicUrl), subscriber,
+ createMRSubscribeRequest(topicUrl, CONSUMER_GROUP, CONSUMER_ID));
+ Mono<MessageRouterSubscribeResponse> response = publisher
+ .put(publishRequest, jsonMessageBatch)
+ .then(subscriber.get(subscribeRequest));
+
+ //then
+ StepVerifier.create(response)
+ .expectNext(expectedResponse)
+ .expectComplete()
+ .verify();
+
+ MOCK_SERVER_CLIENT.verify(request().withPath(path), VerificationTimes.exactly(5));
+ }
+
+ private MessageRouterSubscriberConfig retryConfig(int retryInterval, int retryCount) {
return ImmutableMessageRouterSubscriberConfig.builder()
.retryConfig(ImmutableDmaapRetryConfig.builder()
- .retryIntervalInSeconds(1)
- .retryCount(1)
+ .retryIntervalInSeconds(retryInterval)
+ .retryCount(retryCount)
.build())
.build();
}