aboutsummaryrefslogtreecommitdiffstats
path: root/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterSubscriberIT.java
diff options
context:
space:
mode:
Diffstat (limited to 'rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterSubscriberIT.java')
-rw-r--r--rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterSubscriberIT.java321
1 files changed, 126 insertions, 195 deletions
diff --git a/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterSubscriberIT.java b/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterSubscriberIT.java
index 225d3539..c2e96b58 100644
--- a/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterSubscriberIT.java
+++ b/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterSubscriberIT.java
@@ -20,102 +20,73 @@
package org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api;
-import static org.onap.dcaegen2.services.sdk.rest.services.adapters.http.test.DummyHttpServer.sendError;
-import static org.onap.dcaegen2.services.sdk.rest.services.adapters.http.test.DummyHttpServer.sendResource;
+import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.MessageRouterTestsUtils.*;
import com.google.gson.JsonElement;
-import com.google.gson.JsonPrimitive;
+import com.google.gson.JsonObject;
import io.vavr.collection.List;
import java.time.Duration;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
-import org.onap.dcaegen2.services.sdk.model.streams.dmaap.ImmutableMessageRouterSource;
-import org.onap.dcaegen2.services.sdk.model.streams.dmaap.MessageRouterSource;
-import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.test.DummyHttpServer;
-import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.ImmutableMessageRouterSubscribeRequest;
-import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.ImmutableMessageRouterSubscribeResponse;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishRequest;
import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterSubscribeRequest;
import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterSubscribeResponse;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.MessageRouterPublisherConfig;
import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.MessageRouterSubscriberConfig;
+import org.testcontainers.containers.DockerComposeContainer;
+import org.testcontainers.junit.jupiter.Container;
+import org.testcontainers.junit.jupiter.Testcontainers;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
-import reactor.netty.http.server.HttpServerRoutes;
import reactor.test.StepVerifier;
-/**
- * @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a>
- * @since May 2019
- */
+@Testcontainers
class MessageRouterSubscriberIT {
private static final Duration TIMEOUT = Duration.ofSeconds(10);
- private static final String ERROR_MESSAGE = "Something went wrong";
private static final String CONSUMER_GROUP = "group1";
- private static final String SUCCESS_CONSUMER_ID = "consumer200";
- private static final String FAILING_WITH_401_CONSUMER_ID = "consumer401";
- private static final String FAILING_WITH_403_CONSUMER_ID = "consumer403";
- private static final String FAILING_WITH_409_CONSUMER_ID = "consumer409";
- private static final String FAILING_WITH_429_CONSUMER_ID = "consumer429";
- private static final String FAILING_WITH_500_CONSUMER_ID = "consumer500";
-
- private static final String CONSUMER_PATH = String.format("/events/TOPIC/%s", CONSUMER_GROUP);
-
- private static final String SUCCESS_RESP_PATH = String
- .format("%s/%s", CONSUMER_PATH, SUCCESS_CONSUMER_ID);
- private static final String FAILING_WITH_401_RESP_PATH = String
- .format("%s/%s", CONSUMER_PATH, FAILING_WITH_401_CONSUMER_ID);
- private static final String FAILING_WITH_403_RESP_PATH = String
- .format("%s/%s", CONSUMER_PATH, FAILING_WITH_403_CONSUMER_ID);
- private static final String FAILING_WITH_409_RESP_PATH = String
- .format("%s/%s", CONSUMER_PATH, FAILING_WITH_409_CONSUMER_ID);
- private static final String FAILING_WITH_429_RESP_PATH = String
- .format("%s/%s", CONSUMER_PATH, FAILING_WITH_429_CONSUMER_ID);
- private static final String FAILING_WITH_500_RESP_PATH = String
- .format("%s/%s", CONSUMER_PATH, FAILING_WITH_500_CONSUMER_ID);
-
- private static MessageRouterSubscribeRequest mrSuccessRequest;
- private static MessageRouterSubscribeRequest mrFailingRequest;
- private MessageRouterSubscriber sut = DmaapClientFactory
+ private static final String CONSUMER_ID = "consumer200";
+ private static final String DMAAP_404_ERROR_RESPONSE_FORMAT = "404 Not Found\n" +
+ "{" +
+ "\"mrstatus\":3001," +
+ "\"helpURL\":\"http://onap.readthedocs.io\"," +
+ "\"message\":\"No such topic exists.-[%s]\"," +
+ "\"status\":404" +
+ "}";
+
+ @Container
+ private static final DockerComposeContainer CONTAINER = DMaapContainer.createContainerInstance();
+
+ private static String EVENTS_PATH;
+
+ private MessageRouterPublisher publisher = DmaapClientFactory
+ .createMessageRouterPublisher(MessageRouterPublisherConfig.createDefault());
+ private MessageRouterSubscriber subscriber = DmaapClientFactory
.createMessageRouterSubscriber(MessageRouterSubscriberConfig.createDefault());
- private static MessageRouterSource sourceDefinition;
@BeforeAll
static void setUp() {
- DummyHttpServer server = DummyHttpServer.start(MessageRouterSubscriberIT::setRoutes);
-
- sourceDefinition = createMessageRouterSource(server);
-
- mrSuccessRequest = createSuccessRequest();
-
- mrFailingRequest = createFailingRequest(FAILING_WITH_500_CONSUMER_ID);
- }
-
- @Test
- void subscriber_shouldGetCorrectResponse(){
- Mono<MessageRouterSubscribeResponse> response = sut
- .get(mrSuccessRequest);
-
- List<String> expectedItems = List.of("I", "like", "pizza");
-
- MessageRouterSubscribeResponse expectedResponse = ImmutableMessageRouterSubscribeResponse
- .builder()
- .items(expectedItems.map(JsonPrimitive::new))
- .build();
-
- StepVerifier.create(response)
- .expectNext(expectedResponse)
- .expectComplete()
- .verify(TIMEOUT);
+ EVENTS_PATH = String.format("http://%s:%d/events",
+ CONTAINER.getServiceHost(DMaapContainer.DMAAP_SERVICE_NAME,
+ DMaapContainer.DMAAP_SERVICE_EXPOSED_PORT),
+ CONTAINER.getServicePort(DMaapContainer.DMAAP_SERVICE_NAME,
+ DMaapContainer.DMAAP_SERVICE_EXPOSED_PORT));
}
@Test
- void subscriber_shouldGetUnauthorizedErrorResponse(){
- MessageRouterSubscribeRequest request = createFailingRequest(FAILING_WITH_401_CONSUMER_ID);
- Mono<MessageRouterSubscribeResponse> response = sut.get(request);
-
- MessageRouterSubscribeResponse expectedResponse = createErrorResponse(String
- .format("401 Unauthorized\n%s", ERROR_MESSAGE));
-
+ void subscriber_shouldHandleNoSuchTopicException() {
+ //given
+ final String topic = "newTopic";
+ final MessageRouterSubscribeRequest mrSubscribeRequest = createMRSubscribeRequest(
+ String.format("%s/%s", EVENTS_PATH, topic), CONSUMER_GROUP, CONSUMER_ID);
+ final MessageRouterSubscribeResponse expectedResponse = errorSubscribeResponse(
+ DMAAP_404_ERROR_RESPONSE_FORMAT, topic);
+
+ //when
+ Mono<MessageRouterSubscribeResponse> response = subscriber
+ .get(mrSubscribeRequest);
+
+ //then
StepVerifier.create(response)
.expectNext(expectedResponse)
.expectComplete()
@@ -123,151 +94,111 @@ class MessageRouterSubscriberIT {
}
@Test
- void subscriber_shouldGetForbiddenErrorResponse(){
- MessageRouterSubscribeRequest request = createFailingRequest(FAILING_WITH_403_CONSUMER_ID);
- Mono<MessageRouterSubscribeResponse> response = sut.get(request);
-
- MessageRouterSubscribeResponse expectedResponse = createErrorResponse(String
- .format("403 Forbidden\n%s", ERROR_MESSAGE));
-
+ void subscriberShouldHandleSingleItemResponse(){
+ //given
+ final String topic = "TOPIC";
+ final String topicUrl = String.format("%s/%s", EVENTS_PATH, topic);
+ final MessageRouterPublishRequest publishRequest = createPublishRequest(topicUrl);
+ final MessageRouterSubscribeRequest subscribeRequest = createMRSubscribeRequest(topicUrl, CONSUMER_GROUP, CONSUMER_ID);
+
+ final List<String> singleJsonMessage = List.of("{\"message\":\"message1\"}");
+ final List<JsonElement> expectedItems = getAsJsonElements(singleJsonMessage);
+ final Flux<JsonObject> jsonMessageBatch = jsonBatch(singleJsonMessage);
+ final MessageRouterSubscribeResponse expectedResponse = successSubscribeResponse(expectedItems);
+
+ //when
+ registerTopic(publisher, publishRequest, subscriber, subscribeRequest);
+ Mono<MessageRouterSubscribeResponse> response = publisher
+ .put(publishRequest, jsonMessageBatch)
+ .then(subscriber.get(subscribeRequest));
+
+ //then
StepVerifier.create(response)
.expectNext(expectedResponse)
.expectComplete()
- .verify(TIMEOUT);
+ .verify();
}
@Test
- void subscriber_shouldGetConflictErrorResponse(){
- MessageRouterSubscribeRequest request = createFailingRequest(FAILING_WITH_409_CONSUMER_ID);
- Mono<MessageRouterSubscribeResponse> response = sut.get(request);
-
- MessageRouterSubscribeResponse expectedResponse = createErrorResponse(String
- .format("409 Conflict\n%s", ERROR_MESSAGE));
-
+ void subscriber_shouldHandleMultipleItemsResponse() {
+ //given
+ final String topic = "TOPIC2";
+ final String topicUrl = String.format("%s/%s", EVENTS_PATH, topic);
+ final MessageRouterPublishRequest publishRequest = createPublishRequest(topicUrl);
+ final MessageRouterSubscribeRequest subscribeRequest = createMRSubscribeRequest(topicUrl, CONSUMER_GROUP, CONSUMER_ID);
+
+ final List<String> twoJsonMessages = List.of("{\"message\":\"message1\"}",
+ "{\"differentMessage\":\"message2\"}");
+ final List<JsonElement> expectedElements = getAsJsonElements(twoJsonMessages);
+ final Flux<JsonObject> jsonMessageBatch = jsonBatch(twoJsonMessages);
+ final MessageRouterSubscribeResponse expectedResponse = successSubscribeResponse(expectedElements);
+
+ //when
+ registerTopic(publisher, publishRequest, subscriber, subscribeRequest);
+ Mono<MessageRouterSubscribeResponse> response = publisher
+ .put(publishRequest, jsonMessageBatch)
+ .then(subscriber.get(subscribeRequest));
+
+ //then
StepVerifier.create(response)
.expectNext(expectedResponse)
.expectComplete()
- .verify(TIMEOUT);
+ .verify();
}
@Test
- void subscriber_shouldGetTooManyRequestsErrorResponse(){
- MessageRouterSubscribeRequest request = createFailingRequest(FAILING_WITH_429_CONSUMER_ID);
- Mono<MessageRouterSubscribeResponse> response = sut.get(request);
-
- MessageRouterSubscribeResponse expectedResponse = createErrorResponse(String
- .format("429 Too Many Requests\n%s", ERROR_MESSAGE));
-
- StepVerifier.create(response)
- .expectNext(expectedResponse)
- .expectComplete()
- .verify(TIMEOUT);
- }
-
- @Test
- void subscriber_shouldGetInternalServerErrorResponse(){
- Mono<MessageRouterSubscribeResponse> response = sut
- .get(mrFailingRequest);
-
- MessageRouterSubscribeResponse expectedResponse = createErrorResponse(String
- .format("500 Internal Server Error\n%s", ERROR_MESSAGE));
-
- StepVerifier.create(response)
- .expectNext(expectedResponse)
- .expectComplete()
- .verify(TIMEOUT);
- }
-
- @Test
- void subscriber_shouldParseCorrectResponse() {
- final Flux<String> result = sut
- .getElements(mrSuccessRequest)
- .map(JsonElement::getAsString);
-
+ void subscriber_shouldExtractItemsFromResponse() {
+ //given
+ final String topic = "TOPIC3";
+ final String topicUrl = String.format("%s/%s", EVENTS_PATH, topic);
+ final MessageRouterPublishRequest publishRequest = createPublishRequest(topicUrl);
+ final MessageRouterSubscribeRequest subscribeRequest = createMRSubscribeRequest(topicUrl,
+ CONSUMER_GROUP, CONSUMER_ID);
+
+ final List<String> twoJsonMessages = List.of("{\"message\":\"message1\"}",
+ "{\"differentMessage\":\"message2\"}");
+ final Flux<JsonObject> jsonMessageBatch = jsonBatch(twoJsonMessages);
+
+ //when
+ registerTopic(publisher, publishRequest, subscriber, subscribeRequest);
+ final Flux<JsonElement> result = publisher.put(publishRequest, jsonMessageBatch)
+ .thenMany(subscriber.getElements(subscribeRequest));
+
+ //then
StepVerifier.create(result)
- .expectNext("I", "like", "pizza")
+ .expectNext(getAsJsonObject(twoJsonMessages.get(0)))
+ .expectNext(getAsJsonObject(twoJsonMessages.get(1)))
.expectComplete()
.verify(TIMEOUT);
}
@Test
- void subscriber_shouldParseErrorResponse(){
- Flux<String> result = sut
- .getElements(mrFailingRequest)
- .map(JsonElement::getAsString);
-
- StepVerifier.create(result)
- .expectError(IllegalStateException.class)
- .verify(TIMEOUT);
- }
-
- @Test
- void subscriber_shouldSubscribeCorrectly(){
- Flux<String> subscriptionForElements = sut
- .subscribeForElements(mrSuccessRequest, Duration.ofSeconds(1))
- .map(JsonElement:: getAsString);
-
- StepVerifier.create(subscriptionForElements.take(2))
- .expectNext("I", "like")
+ void subscriber_shouldSubscribeToTopic(){
+ //given
+ final String topic = "TOPIC4";
+ final String topicUrl = String.format("%s/%s", EVENTS_PATH, topic);
+ final MessageRouterPublishRequest publishRequest = createPublishRequest(topicUrl);
+ final MessageRouterSubscribeRequest subscribeRequest = createMRSubscribeRequest(topicUrl,
+ CONSUMER_GROUP, CONSUMER_ID);
+
+ final List<String> twoJsonMessages = List.of("{\"message\":\"message1\"}",
+ "{\"differentMessage\":\"message2\"}");
+ final List<JsonElement> messages = getAsJsonElements(twoJsonMessages);
+ final Flux<JsonObject> jsonMessageBatch = jsonBatch(twoJsonMessages);
+
+ //when
+ registerTopic(publisher, publishRequest, subscriber, subscribeRequest);
+ final Flux<JsonElement> result = publisher.put(publishRequest, jsonMessageBatch)
+ .thenMany(subscriber.subscribeForElements(subscribeRequest, Duration.ofSeconds(1)));
+
+ //then
+ StepVerifier.create(result.take(2))
+ .expectNext(messages.get(0))
+ .expectNext(messages.get(1))
.expectComplete()
.verify(TIMEOUT);
}
- @Test
- void subscriber_shouldParseErrorWhenSubscribed(){
- Flux<String> subscriptionForElements = sut
- .subscribeForElements(mrFailingRequest, Duration.ofSeconds(1))
- .map(JsonElement:: getAsString);
-
- StepVerifier.create(subscriptionForElements.take(2))
- .expectError(IllegalStateException.class)
- .verify(TIMEOUT);
- }
- private static HttpServerRoutes setRoutes(HttpServerRoutes routes){
- return routes
- .get(SUCCESS_RESP_PATH, (req, resp) ->
- sendResource(resp, "/sample-mr-subscribe-response.json"))
- .get(FAILING_WITH_401_RESP_PATH, (req, resp) ->
- sendError(resp, 401, ERROR_MESSAGE))
- .get(FAILING_WITH_403_RESP_PATH, (req, resp) ->
- sendError(resp, 403, ERROR_MESSAGE))
- .get(FAILING_WITH_409_RESP_PATH, (req, resp) ->
- sendError(resp, 409, ERROR_MESSAGE))
- .get(FAILING_WITH_429_RESP_PATH, (req, resp) ->
- sendError(resp, 429, ERROR_MESSAGE))
- .get(FAILING_WITH_500_RESP_PATH, (req, resp) ->
- sendError(resp, 500, ERROR_MESSAGE));
- }
- private static MessageRouterSource createMessageRouterSource(DummyHttpServer server){
- return ImmutableMessageRouterSource.builder()
- .name("the topic")
- .topicUrl(String.format("http://%s:%d/events/TOPIC", server.host(), server.port()))
- .build();
- }
-
- private static MessageRouterSubscribeRequest createSuccessRequest(){
- return ImmutableMessageRouterSubscribeRequest.builder()
- .sourceDefinition(sourceDefinition)
- .consumerGroup(CONSUMER_GROUP)
- .consumerId(SUCCESS_CONSUMER_ID)
- .build();
- }
-
- private static MessageRouterSubscribeRequest createFailingRequest(String consumerId){
- return ImmutableMessageRouterSubscribeRequest
- .builder()
- .sourceDefinition(sourceDefinition)
- .consumerGroup(CONSUMER_GROUP)
- .consumerId(consumerId)
- .build();
- }
-
- private static MessageRouterSubscribeResponse createErrorResponse(String failReason){
- return ImmutableMessageRouterSubscribeResponse
- .builder()
- .failReason(failReason)
- .build();
- }
}