diff options
Diffstat (limited to 'rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterSubscriberIT.java')
-rw-r--r-- | rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterSubscriberIT.java | 150 |
1 files changed, 116 insertions, 34 deletions
diff --git a/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterSubscriberIT.java b/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterSubscriberIT.java index bd161aab..1f4e499d 100644 --- a/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterSubscriberIT.java +++ b/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterSubscriberIT.java @@ -22,14 +22,18 @@ package org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api; import com.google.gson.JsonElement; import com.google.gson.JsonObject; -import eu.rekawek.toxiproxy.Proxy; -import eu.rekawek.toxiproxy.ToxiproxyClient; import io.vavr.collection.List; import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import org.mockserver.client.MockServerClient; +import org.mockserver.matchers.Times; +import org.mockserver.verify.VerificationTimes; import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishRequest; import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterSubscribeRequest; import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterSubscribeResponse; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.ImmutableDmaapRetryConfig; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.ImmutableMessageRouterSubscriberConfig; import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.MessageRouterPublisherConfig; import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.MessageRouterSubscriberConfig; import org.testcontainers.containers.DockerComposeContainer; @@ -39,11 +43,11 @@ import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.test.StepVerifier; -import java.io.IOException; import java.time.Duration; import java.util.concurrent.TimeUnit; -import static eu.rekawek.toxiproxy.model.ToxicDirection.DOWNSTREAM; +import static org.mockserver.model.HttpRequest.request; +import static org.mockserver.model.HttpResponse.response; import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.MessageRouterTestsUtils.createMRSubscribeRequest; import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.MessageRouterTestsUtils.createPublishRequest; import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.MessageRouterTestsUtils.errorSubscribeResponse; @@ -52,19 +56,23 @@ import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.MessageR import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.MessageRouterTestsUtils.jsonBatch; import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.MessageRouterTestsUtils.registerTopic; import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.MessageRouterTestsUtils.successSubscribeResponse; - import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.DMaapContainer.DMAAP_SERVICE_EXPOSED_PORT; -import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.DMaapContainer.DMAAP_SERVICE_NAME; import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.DMaapContainer.LOCALHOST; -import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.DMaapContainer.PROXY_SERVICE_EXPOSED_PORT; +import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.DMaapContainer.PROXY_MOCK_SERVICE_EXPOSED_PORT; +import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.DMaapContainer.createContainerInstance; @Testcontainers class MessageRouterSubscriberIT { + @Container + private static final DockerComposeContainer CONTAINER = createContainerInstance(); + private static final MockServerClient MOCK_SERVER_CLIENT = new MockServerClient( + LOCALHOST, PROXY_MOCK_SERVICE_EXPOSED_PORT); + private static String EVENTS_PATH; + private static String PROXY_MOCK_EVENTS_PATH; + private static final Duration TIMEOUT = Duration.ofSeconds(10); private static final String CONSUMER_GROUP = "group1"; private static final String CONSUMER_ID = "consumer200"; - private static String PROXY_EVENTS_PATH; - private static Proxy DMAAP_PROXY; private static final String DMAAP_404_ERROR_RESPONSE_FORMAT = "404 Not Found\n" + "{" + "\"mrstatus\":3001," + @@ -85,27 +93,21 @@ class MessageRouterSubscriberIT { + "}" + "}"; - @Container - private static final DockerComposeContainer CONTAINER = DMaapContainer.createContainerInstance(); - - private static String EVENTS_PATH; - private MessageRouterPublisher publisher = DmaapClientFactory .createMessageRouterPublisher(MessageRouterPublisherConfig.createDefault()); private MessageRouterSubscriber subscriber = DmaapClientFactory .createMessageRouterSubscriber(MessageRouterSubscriberConfig.createDefault()); - @BeforeAll - static void setUp() throws IOException { + static void setUp() { EVENTS_PATH = String.format("http://%s:%d/events", LOCALHOST, DMAAP_SERVICE_EXPOSED_PORT); - PROXY_EVENTS_PATH = String.format("http://%s:%d/events", LOCALHOST, PROXY_SERVICE_EXPOSED_PORT); - - DMAAP_PROXY = new ToxiproxyClient().createProxy("dmaapProxy", - String.format("[::]:%s", PROXY_SERVICE_EXPOSED_PORT), - String.format("%s:%d", DMAAP_SERVICE_NAME, DMAAP_SERVICE_EXPOSED_PORT)); + PROXY_MOCK_EVENTS_PATH = String.format("http://%s:%d/events", LOCALHOST, PROXY_MOCK_SERVICE_EXPOSED_PORT); } + @BeforeEach + void set() { + MOCK_SERVER_CLIENT.reset(); + } @Test void subscriber_shouldHandleNoSuchTopicException() { @@ -128,7 +130,7 @@ class MessageRouterSubscriberIT { } @Test - void subscriberShouldHandleSingleItemResponse(){ + void subscriberShouldHandleSingleItemResponse() { //given final String topic = "TOPIC"; final String topicUrl = String.format("%s/%s", EVENTS_PATH, topic); @@ -207,7 +209,7 @@ class MessageRouterSubscriberIT { } @Test - void subscriber_shouldSubscribeToTopic(){ + void subscriber_shouldSubscribeToTopic() { //given final String topic = "TOPIC4"; final String topicUrl = String.format("%s/%s", EVENTS_PATH, topic); @@ -234,17 +236,16 @@ class MessageRouterSubscriberIT { } @Test - void subscriber_shouldHandleTimeoutException() throws IOException { + void subscriber_shouldHandleTimeoutException() { //given - final String topic = "newTopic"; + final String topic = "TOPIC5"; final MessageRouterSubscribeRequest mrSubscribeRequest = createMRSubscribeRequest( - String.format("%s/%s", PROXY_EVENTS_PATH, topic), CONSUMER_GROUP, CONSUMER_ID, Duration.ofSeconds(1)); - final MessageRouterSubscribeResponse expectedResponse = errorSubscribeResponse( - TIMEOUT_ERROR_MESSAGE); - - final String toxic = "latency-toxic"; - DMAAP_PROXY.toxics() - .latency(toxic, DOWNSTREAM, TimeUnit.SECONDS.toMillis(5)); + String.format("%s/%s", PROXY_MOCK_EVENTS_PATH, topic), CONSUMER_GROUP, CONSUMER_ID, Duration.ofSeconds(1)); + final MessageRouterSubscribeResponse expectedResponse = errorSubscribeResponse(TIMEOUT_ERROR_MESSAGE); + final String path = String.format("/events/%s/%s/%s", topic, CONSUMER_GROUP, CONSUMER_ID); + MOCK_SERVER_CLIENT + .when(request().withPath(path), Times.once()) + .respond(response().withDelay(TimeUnit.SECONDS, 5)); //when Mono<MessageRouterSubscribeResponse> response = subscriber @@ -256,7 +257,88 @@ class MessageRouterSubscriberIT { .expectComplete() .verify(TIMEOUT); - //cleanup - DMAAP_PROXY.toxics().get(toxic).remove(); + MOCK_SERVER_CLIENT.verify(request().withPath(path), VerificationTimes.exactly(1)); + } + + @Test + void subscriber_shouldRetryWhenRetryableHttpCodeAndSuccessfullySubscribe() { + //given + final String topic = "TOPIC6"; + final String proxyTopicUrl = String.format("%s/%s", PROXY_MOCK_EVENTS_PATH, topic); + final String topicUrl = String.format("%s/%s", EVENTS_PATH, topic); + final MessageRouterPublishRequest publishRequest = createPublishRequest(proxyTopicUrl); + final MessageRouterSubscribeRequest subscribeRequest = createMRSubscribeRequest(proxyTopicUrl, CONSUMER_GROUP, CONSUMER_ID); + + final List<String> singleJsonMessage = List.of("{\"message\":\"message1\"}"); + final List<JsonElement> expectedItems = getAsJsonElements(singleJsonMessage); + final Flux<JsonObject> jsonMessageBatch = jsonBatch(singleJsonMessage); + final MessageRouterSubscribeResponse expectedResponse = successSubscribeResponse(expectedItems); + + final String path = String.format("/events/%s/%s/%s", topic, CONSUMER_GROUP, CONSUMER_ID); + MOCK_SERVER_CLIENT + .when(request().withPath(path), Times.once()) + .respond(response().withStatusCode(404)); + final MessageRouterSubscriber subscriber = DmaapClientFactory.createMessageRouterSubscriber(retryConfig()); + + //when + registerTopic(publisher, createPublishRequest(topicUrl), subscriber, + createMRSubscribeRequest(topicUrl, CONSUMER_GROUP, CONSUMER_ID)); + Mono<MessageRouterSubscribeResponse> response = publisher + .put(publishRequest, jsonMessageBatch) + .then(subscriber.get(subscribeRequest)); + + //then + StepVerifier.create(response) + .expectNext(expectedResponse) + .expectComplete() + .verify(); + + MOCK_SERVER_CLIENT.verify(request().withPath(path), VerificationTimes.exactly(2)); + } + + @Test + void subscriber_shouldRetryWhenClientTimeoutAndSuccessfullySubscribe() { + //given + final String topic = "TOPIC7"; + final String proxyTopicUrl = String.format("%s/%s", PROXY_MOCK_EVENTS_PATH, topic); + final String topicUrl = String.format("%s/%s", EVENTS_PATH, topic); + final MessageRouterPublishRequest publishRequest = createPublishRequest(proxyTopicUrl); + final MessageRouterSubscribeRequest subscribeRequest = createMRSubscribeRequest( + proxyTopicUrl, CONSUMER_GROUP, CONSUMER_ID, Duration.ofSeconds(1)); + + final List<String> singleJsonMessage = List.of("{\"message\":\"message1\"}"); + final List<JsonElement> expectedItems = getAsJsonElements(singleJsonMessage); + final Flux<JsonObject> jsonMessageBatch = jsonBatch(singleJsonMessage); + final MessageRouterSubscribeResponse expectedResponse = successSubscribeResponse(expectedItems); + + final String path = String.format("/events/%s/%s/%s", topic, CONSUMER_GROUP, CONSUMER_ID); + MOCK_SERVER_CLIENT + .when(request().withPath(path), Times.once()) + .respond(response().withDelay(TimeUnit.SECONDS, 10)); + final MessageRouterSubscriber subscriber = DmaapClientFactory.createMessageRouterSubscriber(retryConfig()); + + //when + registerTopic(publisher, createPublishRequest(topicUrl), subscriber, + createMRSubscribeRequest(topicUrl, CONSUMER_GROUP, CONSUMER_ID)); + Mono<MessageRouterSubscribeResponse> response = publisher + .put(publishRequest, jsonMessageBatch) + .then(subscriber.get(subscribeRequest)); + + //then + StepVerifier.create(response) + .expectNext(expectedResponse) + .expectComplete() + .verify(); + + MOCK_SERVER_CLIENT.verify(request().withPath(path), VerificationTimes.exactly(2)); + } + + private MessageRouterSubscriberConfig retryConfig() { + return ImmutableMessageRouterSubscriberConfig.builder() + .retryConfig(ImmutableDmaapRetryConfig.builder() + .retryIntervalInSeconds(1) + .retryCount(1) + .build()) + .build(); } } |