aboutsummaryrefslogtreecommitdiffstats
path: root/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterSubscriberIT.java
diff options
context:
space:
mode:
Diffstat (limited to 'rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterSubscriberIT.java')
-rw-r--r--rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterSubscriberIT.java63
1 files changed, 56 insertions, 7 deletions
diff --git a/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterSubscriberIT.java b/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterSubscriberIT.java
index 1f4e499d..15c3bd8e 100644
--- a/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterSubscriberIT.java
+++ b/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterSubscriberIT.java
@@ -245,7 +245,7 @@ class MessageRouterSubscriberIT {
final String path = String.format("/events/%s/%s/%s", topic, CONSUMER_GROUP, CONSUMER_ID);
MOCK_SERVER_CLIENT
.when(request().withPath(path), Times.once())
- .respond(response().withDelay(TimeUnit.SECONDS, 5));
+ .respond(response().withDelay(TimeUnit.SECONDS, 2));
//when
Mono<MessageRouterSubscribeResponse> response = subscriber
@@ -278,7 +278,8 @@ class MessageRouterSubscriberIT {
MOCK_SERVER_CLIENT
.when(request().withPath(path), Times.once())
.respond(response().withStatusCode(404));
- final MessageRouterSubscriber subscriber = DmaapClientFactory.createMessageRouterSubscriber(retryConfig());
+ final MessageRouterSubscriber subscriber = DmaapClientFactory.createMessageRouterSubscriber(
+ retryConfig(1, 1));
//when
registerTopic(publisher, createPublishRequest(topicUrl), subscriber,
@@ -314,8 +315,9 @@ class MessageRouterSubscriberIT {
final String path = String.format("/events/%s/%s/%s", topic, CONSUMER_GROUP, CONSUMER_ID);
MOCK_SERVER_CLIENT
.when(request().withPath(path), Times.once())
- .respond(response().withDelay(TimeUnit.SECONDS, 10));
- final MessageRouterSubscriber subscriber = DmaapClientFactory.createMessageRouterSubscriber(retryConfig());
+ .respond(response().withDelay(TimeUnit.SECONDS, 2));
+ final MessageRouterSubscriber subscriber = DmaapClientFactory.createMessageRouterSubscriber(
+ retryConfig(1, 1));
//when
registerTopic(publisher, createPublishRequest(topicUrl), subscriber,
@@ -333,11 +335,58 @@ class MessageRouterSubscriberIT {
MOCK_SERVER_CLIENT.verify(request().withPath(path), VerificationTimes.exactly(2));
}
- private MessageRouterSubscriberConfig retryConfig() {
+ @Test
+ void subscriber_shouldRetryManyTimesAndSuccessfullySubscribe() {
+ //given
+ final String topic = "TOPIC8";
+ final String proxyTopicUrl = String.format("%s/%s", PROXY_MOCK_EVENTS_PATH, topic);
+ final String topicUrl = String.format("%s/%s", EVENTS_PATH, topic);
+ final MessageRouterPublishRequest publishRequest = createPublishRequest(proxyTopicUrl);
+ final MessageRouterSubscribeRequest subscribeRequest = createMRSubscribeRequest(
+ proxyTopicUrl, CONSUMER_GROUP, CONSUMER_ID, Duration.ofSeconds(1));
+
+ final List<String> singleJsonMessage = List.of("{\"message\":\"message1\"}");
+ final List<JsonElement> expectedItems = getAsJsonElements(singleJsonMessage);
+ final Flux<JsonObject> jsonMessageBatch = jsonBatch(singleJsonMessage);
+ final MessageRouterSubscribeResponse expectedResponse = successSubscribeResponse(expectedItems);
+
+ final String path = String.format("/events/%s/%s/%s", topic, CONSUMER_GROUP, CONSUMER_ID);
+ MOCK_SERVER_CLIENT
+ .when(request().withPath(path), Times.once())
+ .respond(response().withDelay(TimeUnit.SECONDS, 2));
+ MOCK_SERVER_CLIENT
+ .when(request().withPath(path), Times.once())
+ .respond(response().withStatusCode(404));
+ MOCK_SERVER_CLIENT
+ .when(request().withPath(path), Times.once())
+ .respond(response().withDelay(TimeUnit.SECONDS, 2));
+ MOCK_SERVER_CLIENT
+ .when(request().withPath(path), Times.once())
+ .respond(response().withStatusCode(500));
+ final MessageRouterSubscriber subscriber = DmaapClientFactory.createMessageRouterSubscriber(
+ retryConfig(1, 5));
+
+ //when
+ registerTopic(publisher, createPublishRequest(topicUrl), subscriber,
+ createMRSubscribeRequest(topicUrl, CONSUMER_GROUP, CONSUMER_ID));
+ Mono<MessageRouterSubscribeResponse> response = publisher
+ .put(publishRequest, jsonMessageBatch)
+ .then(subscriber.get(subscribeRequest));
+
+ //then
+ StepVerifier.create(response)
+ .expectNext(expectedResponse)
+ .expectComplete()
+ .verify();
+
+ MOCK_SERVER_CLIENT.verify(request().withPath(path), VerificationTimes.exactly(5));
+ }
+
+ private MessageRouterSubscriberConfig retryConfig(int retryInterval, int retryCount) {
return ImmutableMessageRouterSubscriberConfig.builder()
.retryConfig(ImmutableDmaapRetryConfig.builder()
- .retryIntervalInSeconds(1)
- .retryCount(1)
+ .retryIntervalInSeconds(retryInterval)
+ .retryCount(retryCount)
.build())
.build();
}