aboutsummaryrefslogtreecommitdiffstats
path: root/rest-services/dmaap-client
diff options
context:
space:
mode:
Diffstat (limited to 'rest-services/dmaap-client')
-rw-r--r--rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/DMaapContainer.java50
-rw-r--r--rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterPublisherIT.java207
-rw-r--r--rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterSubscriberCIT.java269
-rw-r--r--rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterSubscriberIT.java321
-rw-r--r--rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterTestsUtils.java125
5 files changed, 375 insertions, 597 deletions
diff --git a/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/DMaapContainer.java b/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/DMaapContainer.java
new file mode 100644
index 00000000..7e6b0d48
--- /dev/null
+++ b/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/DMaapContainer.java
@@ -0,0 +1,50 @@
+/*
+ * ============LICENSE_START====================================
+ * DCAEGEN2-SERVICES-SDK
+ * =========================================================
+ * Copyright (C) 2019 Nokia. All rights reserved.
+ * =========================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=====================================
+ */
+
+package org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api;
+
+import java.io.File;
+import java.net.URL;
+import org.testcontainers.containers.DockerComposeContainer;
+
+final class DMaapContainer {
+ private static final String MR_COMPOSE_RESOURCE_NAME = "dmaap-msg-router/message-router-compose.yml";
+ private static final String DOCKER_COMPOSE_FILE_PATH = getDockerComposeFilePath(
+ MR_COMPOSE_RESOURCE_NAME);
+ static final int DMAAP_SERVICE_EXPOSED_PORT = 3904;
+ static final String DMAAP_SERVICE_NAME = "dmaap";
+
+ private DMaapContainer() {}
+
+ static DockerComposeContainer createContainerInstance(){
+ return new DockerComposeContainer(
+ new File(DOCKER_COMPOSE_FILE_PATH))
+ .withExposedService(DMAAP_SERVICE_NAME, DMAAP_SERVICE_EXPOSED_PORT);
+ }
+
+ private static String getDockerComposeFilePath(String resourceName){
+ URL resource = DMaapContainer.class.getClassLoader()
+ .getResource(resourceName);
+
+ if(resource != null) return resource.getFile();
+ else throw new DockerComposeNotFoundException(String
+ .format("File %s does not exist", resourceName));
+ }
+}
diff --git a/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterPublisherIT.java b/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterPublisherIT.java
index 9fbd63c8..c746bfec 100644
--- a/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterPublisherIT.java
+++ b/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterPublisherIT.java
@@ -20,97 +20,68 @@
package org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api;
-import static org.onap.dcaegen2.services.sdk.rest.services.adapters.http.test.DummyHttpServer.sendError;
-import static org.onap.dcaegen2.services.sdk.rest.services.adapters.http.test.DummyHttpServer.sendString;
+import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.MessageRouterTestsUtils.*;
import com.google.gson.JsonElement;
+import com.google.gson.JsonObject;
import com.google.gson.JsonPrimitive;
import io.vavr.collection.List;
import java.time.Duration;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
-import org.onap.dcaegen2.services.sdk.model.streams.dmaap.ImmutableMessageRouterSink;
-import org.onap.dcaegen2.services.sdk.model.streams.dmaap.MessageRouterSink;
-import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.test.DummyHttpServer;
-import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.ImmutableMessageRouterPublishRequest;
-import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.ImmutableMessageRouterPublishResponse;
import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishRequest;
import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishResponse;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterSubscribeRequest;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterSubscribeResponse;
import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.MessageRouterPublisherConfig;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.MessageRouterSubscriberConfig;
+import org.testcontainers.containers.DockerComposeContainer;
+import org.testcontainers.junit.jupiter.Container;
+import org.testcontainers.junit.jupiter.Testcontainers;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.test.StepVerifier;
-/**
- * @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a>
- * @since May 2019
- */
+@Testcontainers
class MessageRouterPublisherIT {
-
- private static final String ERROR_MESSAGE = "Something went wrong";
- private static final String TEXT_PLAIN_CONTENT_TYPE = "text/plain";
- private static final String JSON_CONTENT_TYPE = "application/json";
- private static final String SUCCESS_RESP_TOPIC_PATH = "/events/TOPIC";
- private static final String FAILING_WITH_400_RESP_PATH = "/events/TOPIC400";
- private static final String FAILING_WITH_401_RESP_PATH = "/events/TOPIC401";
- private static final String FAILING_WITH_403_RESP_PATH = "/events/TOPIC403";
- private static final String FAILING_WITH_404_RESP_PATH = "/events/TOPIC404";
- private static final String FAILING_WITH_500_TOPIC_PATH = "/events/TOPIC500";
+ @Container
+ private static final DockerComposeContainer CONTAINER = DMaapContainer.createContainerInstance();
private static final Duration TIMEOUT = Duration.ofSeconds(10);
- private static final Flux<JsonPrimitive> messageBatch = Flux.just("ala", "ma", "kota")
- .map(JsonPrimitive::new);
- private static final List<String> messageBatchItems = List.of("ala", "ma", "kota");
-
- private static DummyHttpServer server;
- private MessageRouterPublisher sut = DmaapClientFactory
+ private static final String DMAAP_400_ERROR_RESPONSE_FORMAT = "400 Bad Request\n"
+ + "{"
+ + "\"mrstatus\":5007,"
+ + "\"helpURL\":\"http://onap.readthedocs.io\","
+ + "\"message\":\"Error while publishing data to topic.:%s."
+ + "Successfully published number of messages :0."
+ + "Expected { to start an object.\",\"status\":400"
+ + "}";
+ private static String EVENTS_PATH;
+ private final MessageRouterPublisher publisher = DmaapClientFactory
.createMessageRouterPublisher(MessageRouterPublisherConfig.createDefault());
-
+ private MessageRouterSubscriber subscriber = DmaapClientFactory
+ .createMessageRouterSubscriber(MessageRouterSubscriberConfig.createDefault());
@BeforeAll
static void setUp() {
- server = DummyHttpServer.start(routes ->
- routes.post(SUCCESS_RESP_TOPIC_PATH, (req, resp) -> sendString(resp, Mono.just("OK")))
- .post(FAILING_WITH_400_RESP_PATH, (req, resp) ->
- sendError(resp, 400, ERROR_MESSAGE))
- .post(FAILING_WITH_401_RESP_PATH, (req, resp) ->
- sendError(resp, 401, ERROR_MESSAGE))
- .post(FAILING_WITH_403_RESP_PATH, (req, resp) ->
- sendError(resp, 403, ERROR_MESSAGE))
- .post(FAILING_WITH_404_RESP_PATH, (req, resp) ->
- sendError(resp, 404, ERROR_MESSAGE))
- .post(FAILING_WITH_500_TOPIC_PATH, (req, resp) ->
- sendError(resp, 500, ERROR_MESSAGE))
- );
+ EVENTS_PATH = String.format("http://%s:%d/events",
+ CONTAINER.getServiceHost(DMaapContainer.DMAAP_SERVICE_NAME,
+ DMaapContainer.DMAAP_SERVICE_EXPOSED_PORT),
+ CONTAINER.getServicePort(DMaapContainer.DMAAP_SERVICE_NAME,
+ DMaapContainer.DMAAP_SERVICE_EXPOSED_PORT));
}
@Test
void test_put_givenMessageBatch_shouldMakeSuccessfulPostRequestReturningBatch(){
//given
- final MessageRouterPublishRequest mrRequest = createMRRequest(SUCCESS_RESP_TOPIC_PATH,
- TEXT_PLAIN_CONTENT_TYPE);
- final List<JsonElement> expectedItems = messageBatchItems.map(JsonPrimitive::new);
-
+ final String topic = "TOPIC";
+ final List<String> twoJsonMessages = List.of("{\"message\":\"message1\"}",
+ "{\"differentMessage\":\"message2\"}");
+ final Flux<JsonObject> messageBatch = jsonBatch(twoJsonMessages);
+ final MessageRouterPublishRequest mrRequest = createPublishRequest(String.format("%s/%s", EVENTS_PATH, topic));
+ final MessageRouterPublishResponse expectedResponse = successPublishResponse(getAsJsonElements(twoJsonMessages));
//when
- final Flux<MessageRouterPublishResponse> result = sut.put(mrRequest, messageBatch);
-
- //then
- StepVerifier.create(result)
- .expectNext(ImmutableMessageRouterPublishResponse.builder().items(expectedItems).build())
- .expectComplete()
- .verify(TIMEOUT);
- }
-
- @Test
- void publisher_shouldHandleBadRequestError(){
- //given
- final MessageRouterPublishRequest mrRequest = createMRRequest(FAILING_WITH_400_RESP_PATH,
- JSON_CONTENT_TYPE);
- final MessageRouterPublishResponse expectedResponse = createErrorResponse(
- "400 Bad Request\n%s", ERROR_MESSAGE);
-
- //when
- final Flux<MessageRouterPublishResponse> result = sut.put(mrRequest, messageBatch);
+ final Flux<MessageRouterPublishResponse> result = publisher.put(mrRequest, messageBatch);
//then
StepVerifier.create(result)
@@ -120,34 +91,17 @@ class MessageRouterPublisherIT {
}
@Test
- void publisher_shouldHandleUnauthorizedError(){
- //given
- final MessageRouterPublishRequest mrRequest = createMRRequest(FAILING_WITH_401_RESP_PATH,
- TEXT_PLAIN_CONTENT_TYPE);
- final MessageRouterPublishResponse expectedResponse = createErrorResponse(
- "401 Unauthorized\n%s", ERROR_MESSAGE);
-
- //when
- final Flux<MessageRouterPublishResponse> result = sut.put(mrRequest, messageBatch);
-
- //then
- StepVerifier.create(result)
- .expectNext(expectedResponse)
- .expectComplete()
- .verify(TIMEOUT);
- }
-
- @Test
- void publisher_shouldHandleForbiddenError(){
+ void publisher_shouldHandleBadRequestError(){
//given
- final MessageRouterPublishRequest mrRequest = createMRRequest(FAILING_WITH_403_RESP_PATH,
- TEXT_PLAIN_CONTENT_TYPE);
- final MessageRouterPublishResponse expectedResponse = createErrorResponse(
- "403 Forbidden\n%s", ERROR_MESSAGE);
+ final String topic = "TOPIC2";
+ final List<String> threePlainTextMessages = List.of("I", "like", "pizza");
+ final Flux<JsonPrimitive> messageBatch = plainBatch(threePlainTextMessages);
+ final MessageRouterPublishRequest mrRequest = createPublishRequest(String.format("%s/%s", EVENTS_PATH, topic));
+ final MessageRouterPublishResponse expectedResponse = errorPublishResponse(
+ DMAAP_400_ERROR_RESPONSE_FORMAT, topic);
//when
- final Flux<MessageRouterPublishResponse> result = sut
- .put(mrRequest, messageBatch);
+ final Flux<MessageRouterPublishResponse> result = publisher.put(mrRequest, messageBatch);
//then
StepVerifier.create(result)
@@ -157,64 +111,51 @@ class MessageRouterPublisherIT {
}
@Test
- void publisher_shouldHandleNotFoundError(){
- //given
- final MessageRouterPublishRequest mrRequest = createMRRequest(FAILING_WITH_404_RESP_PATH,
- TEXT_PLAIN_CONTENT_TYPE);
- final MessageRouterPublishResponse expectedResponse = createErrorResponse(
- "404 Not Found\n%s", ERROR_MESSAGE);
+ void publisher_shouldSuccessfullyPublishSingleMessage(){
+ final String topic = "TOPIC3";
+ final String topicUrl = String.format("%s/%s", EVENTS_PATH, topic);
+ final List<String> singleJsonMessage = List.of("{\"message\":\"message1\"}");
+ final List<JsonElement> expectedItems = getAsJsonElements(singleJsonMessage);
+ final Flux<JsonObject> jsonMessageBatch = jsonBatch(singleJsonMessage);
+ final MessageRouterPublishRequest publishRequest = createPublishRequest(topicUrl);
+ final MessageRouterSubscribeRequest subscribeRequest = createMRSubscribeRequest(topicUrl, "sampleGroup", "sampleId");
+ final MessageRouterSubscribeResponse expectedResponse = successSubscribeResponse(expectedItems);
//when
- final Flux<MessageRouterPublishResponse> result = sut
- .put(mrRequest, messageBatch);
+ registerTopic(publisher, publishRequest, subscriber, subscribeRequest);
+ Mono<MessageRouterSubscribeResponse> response = publisher
+ .put(publishRequest, jsonMessageBatch)
+ .then(subscriber.get(subscribeRequest));
//then
- StepVerifier.create(result)
+ StepVerifier.create(response)
.expectNext(expectedResponse)
.expectComplete()
- .verify(TIMEOUT);
+ .verify();
}
@Test
- void publisher_shouldHandleInternalServerError(){
- //given
- final MessageRouterPublishRequest mrRequest = createMRRequest(FAILING_WITH_500_TOPIC_PATH,
- TEXT_PLAIN_CONTENT_TYPE);
- final MessageRouterPublishResponse expectedResponse = createErrorResponse(
- "500 Internal Server Error\n%s", ERROR_MESSAGE);
+ void publisher_shouldSuccessfullyPublishMultipleMessages(){
+ final String topic = "TOPIC4";
+ final String topicUrl = String.format("%s/%s", EVENTS_PATH, topic);
+ final List<String> singleJsonMessage = List.of("{\"message\":\"message1\"}",
+ "{\"differentMessage\":\"message2\"}");
+ final List<JsonElement> expectedItems = getAsJsonElements(singleJsonMessage);
+ final Flux<JsonObject> jsonMessageBatch = jsonBatch(singleJsonMessage);
+ final MessageRouterPublishRequest publishRequest = createPublishRequest(topicUrl);
+ final MessageRouterSubscribeRequest subscribeRequest = createMRSubscribeRequest(topicUrl, "sampleGroup", "sampleId");
+ final MessageRouterSubscribeResponse expectedResponse = successSubscribeResponse(expectedItems);
//when
- final Flux<MessageRouterPublishResponse> result = sut
- .put(mrRequest, messageBatch);
+ registerTopic(publisher, publishRequest, subscriber, subscribeRequest);
+ Mono<MessageRouterSubscribeResponse> response = publisher
+ .put(publishRequest, jsonMessageBatch)
+ .then(subscriber.get(subscribeRequest));
//then
- StepVerifier.create(result)
+ StepVerifier.create(response)
.expectNext(expectedResponse)
.expectComplete()
- .verify(TIMEOUT);
- }
-
-
- private MessageRouterPublishRequest createMRRequest(String topicPath, String contentType){
- final MessageRouterSink sinkDefinition = ImmutableMessageRouterSink.builder()
- .name("the topic")
- .topicUrl(String.format("http://%s:%d%s",
- server.host(),
- server.port(),
- topicPath)
- )
- .build();
-
- return ImmutableMessageRouterPublishRequest.builder()
- .sinkDefinition(sinkDefinition)
- .contentType(contentType)
- .build();
- }
-
- private MessageRouterPublishResponse createErrorResponse(String failReasonFormat, Object... formatArgs){
- return ImmutableMessageRouterPublishResponse
- .builder()
- .failReason(String.format(failReasonFormat, formatArgs))
- .build();
+ .verify();
}
}
diff --git a/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterSubscriberCIT.java b/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterSubscriberCIT.java
deleted file mode 100644
index 32b77a1d..00000000
--- a/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterSubscriberCIT.java
+++ /dev/null
@@ -1,269 +0,0 @@
-/*
- * ============LICENSE_START====================================
- * DCAEGEN2-SERVICES-SDK
- * =========================================================
- * Copyright (C) 2019 Nokia. All rights reserved.
- * =========================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=====================================
- */
-
-package org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api;
-
-import com.google.gson.Gson;
-import com.google.gson.JsonElement;
-import com.google.gson.JsonObject;
-import com.google.gson.JsonParser;
-import io.vavr.collection.List;
-import java.io.File;
-import java.net.URL;
-import java.time.Duration;
-import org.junit.jupiter.api.BeforeAll;
-import org.junit.jupiter.api.Test;
-import org.onap.dcaegen2.services.sdk.model.streams.dmaap.ImmutableMessageRouterSink;
-import org.onap.dcaegen2.services.sdk.model.streams.dmaap.ImmutableMessageRouterSource;
-import org.onap.dcaegen2.services.sdk.model.streams.dmaap.MessageRouterSink;
-import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.ImmutableMessageRouterPublishRequest;
-import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.ImmutableMessageRouterSubscribeRequest;
-import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.ImmutableMessageRouterSubscribeResponse;
-import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishRequest;
-import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterSubscribeRequest;
-import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterSubscribeResponse;
-import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.MessageRouterPublisherConfig;
-import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.MessageRouterSubscriberConfig;
-import org.testcontainers.containers.DockerComposeContainer;
-import org.testcontainers.junit.jupiter.Container;
-import org.testcontainers.junit.jupiter.Testcontainers;
-import reactor.core.publisher.Flux;
-import reactor.core.publisher.Mono;
-import reactor.test.StepVerifier;
-
-@Testcontainers
-class MessageRouterSubscriberCIT {
- private static final JsonParser parser = new JsonParser();
- private static final Duration TIMEOUT = Duration.ofSeconds(10);
- private static final int DMAAP_SERVICE_EXPOSED_PORT = 3904;
- private static final String CONSUMER_GROUP = "group1";
- private static final String CONSUMER_ID = "consumer200";
- private static final String DMAAP_SERVICE_NAME = "dmaap";
- private static final String MR_COMPOSE_RESOURCE_NAME = "dmaap-msg-router/message-router-compose.yml";
- private static final String DOCKER_COMPOSE_FILE_PATH = getDockerComposeFilePath(
- MR_COMPOSE_RESOURCE_NAME);
- private static final String DMAAP_404_ERROR_RESPONSE_FORMAT = "404 Not Found\n" +
- "{" +
- "\"mrstatus\":3001," +
- "\"helpURL\":\"http://onap.readthedocs.io\"," +
- "\"message\":\"No such topic exists.-[%s]\"," +
- "\"status\":404" +
- "}";
-
- @Container
- private static final DockerComposeContainer CONTAINER = new DockerComposeContainer(
- new File(DOCKER_COMPOSE_FILE_PATH))
- .withExposedService(DMAAP_SERVICE_NAME, DMAAP_SERVICE_EXPOSED_PORT);
-
- private static String EVENTS_PATH;
-
- private MessageRouterPublisher publisher = DmaapClientFactory
- .createMessageRouterPublisher(MessageRouterPublisherConfig.createDefault());
- private MessageRouterSubscriber subscriber = DmaapClientFactory
- .createMessageRouterSubscriber(MessageRouterSubscriberConfig.createDefault());
-
-
- @BeforeAll
- static void setUp() {
- EVENTS_PATH = String.format("http://%s:%d/events",
- CONTAINER.getServiceHost(DMAAP_SERVICE_NAME, DMAAP_SERVICE_EXPOSED_PORT),
- CONTAINER.getServicePort(DMAAP_SERVICE_NAME, DMAAP_SERVICE_EXPOSED_PORT));
- }
-
- @Test
- void subscriber_shouldHandleNoSuchTopicException() {
- //given
- final String topic = "newTopic";
- final MessageRouterSubscribeRequest mrSubscribeRequest = createMRSubscribeRequest(topic);
- final String expectedFailReason = String.format(DMAAP_404_ERROR_RESPONSE_FORMAT, topic);
- final MessageRouterSubscribeResponse expectedResponse = ImmutableMessageRouterSubscribeResponse
- .builder()
- .failReason(expectedFailReason)
- .build();
-
- //when
- Mono<MessageRouterSubscribeResponse> response = subscriber
- .get(mrSubscribeRequest);
-
- //then
- StepVerifier.create(response)
- .expectNext(expectedResponse)
- .expectComplete()
- .verify(TIMEOUT);
- }
-
- @Test
- void subscriberShouldHandleSingleItemResponse(){
- //given
- final String topic = "TOPIC";
- final MessageRouterPublishRequest publishRequest = createMRPublishRequest(topic);
- final MessageRouterSubscribeRequest subscribeRequest = createMRSubscribeRequest(topic);
-
- final List<String> singleJsonMessage = List.of("{\"message\":\"message1\"}");
- final List<JsonElement> expectedItems = singleJsonMessage.map(parser::parse);
- final Flux<JsonObject> jsonMessageBatch = jsonBatch(singleJsonMessage);
- final ImmutableMessageRouterSubscribeResponse expectedResponse = ImmutableMessageRouterSubscribeResponse
- .builder()
- .items(expectedItems)
- .build();
-
- //when
- registerTopic(publishRequest, subscribeRequest);
- Mono<MessageRouterSubscribeResponse> response = publisher
- .put(publishRequest, jsonMessageBatch)
- .then(subscriber.get(subscribeRequest));
-
- //then
- StepVerifier.create(response)
- .expectNext(expectedResponse)
- .expectComplete()
- .verify();
- }
-
- @Test
- void subscriber_shouldHandleMultipleItemsResponse() {
- //given
- final String topic = "TOPIC2";
- final MessageRouterPublishRequest publishRequest = createMRPublishRequest(topic);
- final MessageRouterSubscribeRequest subscribeRequest = createMRSubscribeRequest(topic);
-
- final List<String> twoJsonMessages = List.of("{\"message\":\"message1\"}",
- "{\"differentMessage\":\"message2\"}");
- final List<JsonElement> expectedElements = twoJsonMessages.map(parser::parse);
- final Flux<JsonObject> jsonMessageBatch = jsonBatch(twoJsonMessages);
- final ImmutableMessageRouterSubscribeResponse expectedResponse = ImmutableMessageRouterSubscribeResponse
- .builder()
- .items(expectedElements)
- .build();
-
- //when
- registerTopic(publishRequest, subscribeRequest);
- Mono<MessageRouterSubscribeResponse> response = publisher
- .put(publishRequest, jsonMessageBatch)
- .then(subscriber.get(subscribeRequest));
-
- //then
- StepVerifier.create(response)
- .expectNext(expectedResponse)
- .expectComplete()
- .verify();
- }
-
- @Test
- void subscriber_shouldExtractItemsFromResponse() {
- //given
- final String topic = "TOPIC3";
- final MessageRouterPublishRequest publishRequest = createMRPublishRequest(topic);
- final MessageRouterSubscribeRequest subscribeRequest = createMRSubscribeRequest(topic);
-
- final List<String> twoJsonMessages = List.of("{\"message\":\"message1\"}",
- "{\"differentMessage\":\"message2\"}");
- final Flux<JsonObject> jsonMessageBatch = jsonBatch(twoJsonMessages);
-
- //when
- registerTopic(publishRequest, subscribeRequest);
- final Flux<JsonElement> result = publisher.put(publishRequest, jsonMessageBatch)
- .thenMany(subscriber.getElements(subscribeRequest));
-
- //then
- StepVerifier.create(result)
- .expectNext(getAsJsonObject(twoJsonMessages.get(0)))
- .expectNext(getAsJsonObject(twoJsonMessages.get(1)))
- .expectComplete()
- .verify(TIMEOUT);
- }
-
- @Test
- void subscriber_shouldSubscribeToTopic(){
- //given
- final String topic = "TOPIC4";
- final MessageRouterPublishRequest publishRequest = createMRPublishRequest(topic);
- final MessageRouterSubscribeRequest subscribeRequest = createMRSubscribeRequest(topic);
-
- final List<String> twoJsonMessages = List.of("{\"message\":\"message1\"}",
- "{\"differentMessage\":\"message2\"}");
- final List<JsonElement> messages = twoJsonMessages.map(parser::parse);
- final Flux<JsonObject> jsonMessageBatch = jsonBatch(twoJsonMessages);
-
- //when
- registerTopic(publishRequest, subscribeRequest);
- final Flux<JsonElement> result = publisher.put(publishRequest, jsonMessageBatch)
- .thenMany(subscriber.subscribeForElements(subscribeRequest, Duration.ofSeconds(1)));
-
- //then
- StepVerifier.create(result.take(2))
- .expectNext(messages.get(0))
- .expectNext(messages.get(1))
- .expectComplete()
- .verify(TIMEOUT);
- }
-
- private static String getDockerComposeFilePath(String resourceName){
- URL resource = MessageRouterSubscriberCIT.class.getClassLoader()
- .getResource(resourceName);
-
- if(resource != null) return resource.getFile();
- else throw new DockerComposeNotFoundException(String
- .format("File %s does not exist", resourceName));
- }
-
- private static MessageRouterPublishRequest createMRPublishRequest(String topic){
- MessageRouterSink sinkDefinition = ImmutableMessageRouterSink.builder()
- .name("the topic")
- .topicUrl(String.format("%s/%s", EVENTS_PATH, topic))
- .build();
-
- return ImmutableMessageRouterPublishRequest.builder()
- .sinkDefinition(sinkDefinition)
- .build();
- }
-
- private MessageRouterSubscribeRequest createMRSubscribeRequest(String topic) {
- ImmutableMessageRouterSource sourceDefinition = ImmutableMessageRouterSource.builder()
- .name("the topic")
- .topicUrl(String.format("%s/%s", EVENTS_PATH, topic))
- .build();
-
- return ImmutableMessageRouterSubscribeRequest
- .builder()
- .sourceDefinition(sourceDefinition)
- .consumerGroup(CONSUMER_GROUP)
- .consumerId(CONSUMER_ID)
- .build();
- }
-
- private void registerTopic(MessageRouterPublishRequest publishRequest,
- MessageRouterSubscribeRequest subscribeRequest) {
- final List<String> sampleJsonMessages = List.of("{\"message\":\"message1\"}",
- "{\"differentMessage\":\"message2\"}");
- final Flux<JsonObject> jsonMessageBatch = jsonBatch(sampleJsonMessages);
-
- publisher.put(publishRequest, jsonMessageBatch).blockLast();
- subscriber.get(subscribeRequest).block();
- }
-
- private static Flux<JsonObject> jsonBatch(List<String> messages){
- return Flux.fromIterable(messages).map(parser::parse).map(JsonElement::getAsJsonObject);
- }
-
- private JsonObject getAsJsonObject(String item){
- return new Gson().fromJson(item, JsonObject.class);
- }
-}
diff --git a/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterSubscriberIT.java b/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterSubscriberIT.java
index 225d3539..c2e96b58 100644
--- a/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterSubscriberIT.java
+++ b/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterSubscriberIT.java
@@ -20,102 +20,73 @@
package org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api;
-import static org.onap.dcaegen2.services.sdk.rest.services.adapters.http.test.DummyHttpServer.sendError;
-import static org.onap.dcaegen2.services.sdk.rest.services.adapters.http.test.DummyHttpServer.sendResource;
+import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.MessageRouterTestsUtils.*;
import com.google.gson.JsonElement;
-import com.google.gson.JsonPrimitive;
+import com.google.gson.JsonObject;
import io.vavr.collection.List;
import java.time.Duration;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
-import org.onap.dcaegen2.services.sdk.model.streams.dmaap.ImmutableMessageRouterSource;
-import org.onap.dcaegen2.services.sdk.model.streams.dmaap.MessageRouterSource;
-import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.test.DummyHttpServer;
-import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.ImmutableMessageRouterSubscribeRequest;
-import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.ImmutableMessageRouterSubscribeResponse;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishRequest;
import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterSubscribeRequest;
import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterSubscribeResponse;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.MessageRouterPublisherConfig;
import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.MessageRouterSubscriberConfig;
+import org.testcontainers.containers.DockerComposeContainer;
+import org.testcontainers.junit.jupiter.Container;
+import org.testcontainers.junit.jupiter.Testcontainers;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
-import reactor.netty.http.server.HttpServerRoutes;
import reactor.test.StepVerifier;
-/**
- * @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a>
- * @since May 2019
- */
+@Testcontainers
class MessageRouterSubscriberIT {
private static final Duration TIMEOUT = Duration.ofSeconds(10);
- private static final String ERROR_MESSAGE = "Something went wrong";
private static final String CONSUMER_GROUP = "group1";
- private static final String SUCCESS_CONSUMER_ID = "consumer200";
- private static final String FAILING_WITH_401_CONSUMER_ID = "consumer401";
- private static final String FAILING_WITH_403_CONSUMER_ID = "consumer403";
- private static final String FAILING_WITH_409_CONSUMER_ID = "consumer409";
- private static final String FAILING_WITH_429_CONSUMER_ID = "consumer429";
- private static final String FAILING_WITH_500_CONSUMER_ID = "consumer500";
-
- private static final String CONSUMER_PATH = String.format("/events/TOPIC/%s", CONSUMER_GROUP);
-
- private static final String SUCCESS_RESP_PATH = String
- .format("%s/%s", CONSUMER_PATH, SUCCESS_CONSUMER_ID);
- private static final String FAILING_WITH_401_RESP_PATH = String
- .format("%s/%s", CONSUMER_PATH, FAILING_WITH_401_CONSUMER_ID);
- private static final String FAILING_WITH_403_RESP_PATH = String
- .format("%s/%s", CONSUMER_PATH, FAILING_WITH_403_CONSUMER_ID);
- private static final String FAILING_WITH_409_RESP_PATH = String
- .format("%s/%s", CONSUMER_PATH, FAILING_WITH_409_CONSUMER_ID);
- private static final String FAILING_WITH_429_RESP_PATH = String
- .format("%s/%s", CONSUMER_PATH, FAILING_WITH_429_CONSUMER_ID);
- private static final String FAILING_WITH_500_RESP_PATH = String
- .format("%s/%s", CONSUMER_PATH, FAILING_WITH_500_CONSUMER_ID);
-
- private static MessageRouterSubscribeRequest mrSuccessRequest;
- private static MessageRouterSubscribeRequest mrFailingRequest;
- private MessageRouterSubscriber sut = DmaapClientFactory
+ private static final String CONSUMER_ID = "consumer200";
+ private static final String DMAAP_404_ERROR_RESPONSE_FORMAT = "404 Not Found\n" +
+ "{" +
+ "\"mrstatus\":3001," +
+ "\"helpURL\":\"http://onap.readthedocs.io\"," +
+ "\"message\":\"No such topic exists.-[%s]\"," +
+ "\"status\":404" +
+ "}";
+
+ @Container
+ private static final DockerComposeContainer CONTAINER = DMaapContainer.createContainerInstance();
+
+ private static String EVENTS_PATH;
+
+ private MessageRouterPublisher publisher = DmaapClientFactory
+ .createMessageRouterPublisher(MessageRouterPublisherConfig.createDefault());
+ private MessageRouterSubscriber subscriber = DmaapClientFactory
.createMessageRouterSubscriber(MessageRouterSubscriberConfig.createDefault());
- private static MessageRouterSource sourceDefinition;
@BeforeAll
static void setUp() {
- DummyHttpServer server = DummyHttpServer.start(MessageRouterSubscriberIT::setRoutes);
-
- sourceDefinition = createMessageRouterSource(server);
-
- mrSuccessRequest = createSuccessRequest();
-
- mrFailingRequest = createFailingRequest(FAILING_WITH_500_CONSUMER_ID);
- }
-
- @Test
- void subscriber_shouldGetCorrectResponse(){
- Mono<MessageRouterSubscribeResponse> response = sut
- .get(mrSuccessRequest);
-
- List<String> expectedItems = List.of("I", "like", "pizza");
-
- MessageRouterSubscribeResponse expectedResponse = ImmutableMessageRouterSubscribeResponse
- .builder()
- .items(expectedItems.map(JsonPrimitive::new))
- .build();
-
- StepVerifier.create(response)
- .expectNext(expectedResponse)
- .expectComplete()
- .verify(TIMEOUT);
+ EVENTS_PATH = String.format("http://%s:%d/events",
+ CONTAINER.getServiceHost(DMaapContainer.DMAAP_SERVICE_NAME,
+ DMaapContainer.DMAAP_SERVICE_EXPOSED_PORT),
+ CONTAINER.getServicePort(DMaapContainer.DMAAP_SERVICE_NAME,
+ DMaapContainer.DMAAP_SERVICE_EXPOSED_PORT));
}
@Test
- void subscriber_shouldGetUnauthorizedErrorResponse(){
- MessageRouterSubscribeRequest request = createFailingRequest(FAILING_WITH_401_CONSUMER_ID);
- Mono<MessageRouterSubscribeResponse> response = sut.get(request);
-
- MessageRouterSubscribeResponse expectedResponse = createErrorResponse(String
- .format("401 Unauthorized\n%s", ERROR_MESSAGE));
-
+ void subscriber_shouldHandleNoSuchTopicException() {
+ //given
+ final String topic = "newTopic";
+ final MessageRouterSubscribeRequest mrSubscribeRequest = createMRSubscribeRequest(
+ String.format("%s/%s", EVENTS_PATH, topic), CONSUMER_GROUP, CONSUMER_ID);
+ final MessageRouterSubscribeResponse expectedResponse = errorSubscribeResponse(
+ DMAAP_404_ERROR_RESPONSE_FORMAT, topic);
+
+ //when
+ Mono<MessageRouterSubscribeResponse> response = subscriber
+ .get(mrSubscribeRequest);
+
+ //then
StepVerifier.create(response)
.expectNext(expectedResponse)
.expectComplete()
@@ -123,151 +94,111 @@ class MessageRouterSubscriberIT {
}
@Test
- void subscriber_shouldGetForbiddenErrorResponse(){
- MessageRouterSubscribeRequest request = createFailingRequest(FAILING_WITH_403_CONSUMER_ID);
- Mono<MessageRouterSubscribeResponse> response = sut.get(request);
-
- MessageRouterSubscribeResponse expectedResponse = createErrorResponse(String
- .format("403 Forbidden\n%s", ERROR_MESSAGE));
-
+ void subscriberShouldHandleSingleItemResponse(){
+ //given
+ final String topic = "TOPIC";
+ final String topicUrl = String.format("%s/%s", EVENTS_PATH, topic);
+ final MessageRouterPublishRequest publishRequest = createPublishRequest(topicUrl);
+ final MessageRouterSubscribeRequest subscribeRequest = createMRSubscribeRequest(topicUrl, CONSUMER_GROUP, CONSUMER_ID);
+
+ final List<String> singleJsonMessage = List.of("{\"message\":\"message1\"}");
+ final List<JsonElement> expectedItems = getAsJsonElements(singleJsonMessage);
+ final Flux<JsonObject> jsonMessageBatch = jsonBatch(singleJsonMessage);
+ final MessageRouterSubscribeResponse expectedResponse = successSubscribeResponse(expectedItems);
+
+ //when
+ registerTopic(publisher, publishRequest, subscriber, subscribeRequest);
+ Mono<MessageRouterSubscribeResponse> response = publisher
+ .put(publishRequest, jsonMessageBatch)
+ .then(subscriber.get(subscribeRequest));
+
+ //then
StepVerifier.create(response)
.expectNext(expectedResponse)
.expectComplete()
- .verify(TIMEOUT);
+ .verify();
}
@Test
- void subscriber_shouldGetConflictErrorResponse(){
- MessageRouterSubscribeRequest request = createFailingRequest(FAILING_WITH_409_CONSUMER_ID);
- Mono<MessageRouterSubscribeResponse> response = sut.get(request);
-
- MessageRouterSubscribeResponse expectedResponse = createErrorResponse(String
- .format("409 Conflict\n%s", ERROR_MESSAGE));
-
+ void subscriber_shouldHandleMultipleItemsResponse() {
+ //given
+ final String topic = "TOPIC2";
+ final String topicUrl = String.format("%s/%s", EVENTS_PATH, topic);
+ final MessageRouterPublishRequest publishRequest = createPublishRequest(topicUrl);
+ final MessageRouterSubscribeRequest subscribeRequest = createMRSubscribeRequest(topicUrl, CONSUMER_GROUP, CONSUMER_ID);
+
+ final List<String> twoJsonMessages = List.of("{\"message\":\"message1\"}",
+ "{\"differentMessage\":\"message2\"}");
+ final List<JsonElement> expectedElements = getAsJsonElements(twoJsonMessages);
+ final Flux<JsonObject> jsonMessageBatch = jsonBatch(twoJsonMessages);
+ final MessageRouterSubscribeResponse expectedResponse = successSubscribeResponse(expectedElements);
+
+ //when
+ registerTopic(publisher, publishRequest, subscriber, subscribeRequest);
+ Mono<MessageRouterSubscribeResponse> response = publisher
+ .put(publishRequest, jsonMessageBatch)
+ .then(subscriber.get(subscribeRequest));
+
+ //then
StepVerifier.create(response)
.expectNext(expectedResponse)
.expectComplete()
- .verify(TIMEOUT);
+ .verify();
}
@Test
- void subscriber_shouldGetTooManyRequestsErrorResponse(){
- MessageRouterSubscribeRequest request = createFailingRequest(FAILING_WITH_429_CONSUMER_ID);
- Mono<MessageRouterSubscribeResponse> response = sut.get(request);
-
- MessageRouterSubscribeResponse expectedResponse = createErrorResponse(String
- .format("429 Too Many Requests\n%s", ERROR_MESSAGE));
-
- StepVerifier.create(response)
- .expectNext(expectedResponse)
- .expectComplete()
- .verify(TIMEOUT);
- }
-
- @Test
- void subscriber_shouldGetInternalServerErrorResponse(){
- Mono<MessageRouterSubscribeResponse> response = sut
- .get(mrFailingRequest);
-
- MessageRouterSubscribeResponse expectedResponse = createErrorResponse(String
- .format("500 Internal Server Error\n%s", ERROR_MESSAGE));
-
- StepVerifier.create(response)
- .expectNext(expectedResponse)
- .expectComplete()
- .verify(TIMEOUT);
- }
-
- @Test
- void subscriber_shouldParseCorrectResponse() {
- final Flux<String> result = sut
- .getElements(mrSuccessRequest)
- .map(JsonElement::getAsString);
-
+ void subscriber_shouldExtractItemsFromResponse() {
+ //given
+ final String topic = "TOPIC3";
+ final String topicUrl = String.format("%s/%s", EVENTS_PATH, topic);
+ final MessageRouterPublishRequest publishRequest = createPublishRequest(topicUrl);
+ final MessageRouterSubscribeRequest subscribeRequest = createMRSubscribeRequest(topicUrl,
+ CONSUMER_GROUP, CONSUMER_ID);
+
+ final List<String> twoJsonMessages = List.of("{\"message\":\"message1\"}",
+ "{\"differentMessage\":\"message2\"}");
+ final Flux<JsonObject> jsonMessageBatch = jsonBatch(twoJsonMessages);
+
+ //when
+ registerTopic(publisher, publishRequest, subscriber, subscribeRequest);
+ final Flux<JsonElement> result = publisher.put(publishRequest, jsonMessageBatch)
+ .thenMany(subscriber.getElements(subscribeRequest));
+
+ //then
StepVerifier.create(result)
- .expectNext("I", "like", "pizza")
+ .expectNext(getAsJsonObject(twoJsonMessages.get(0)))
+ .expectNext(getAsJsonObject(twoJsonMessages.get(1)))
.expectComplete()
.verify(TIMEOUT);
}
@Test
- void subscriber_shouldParseErrorResponse(){
- Flux<String> result = sut
- .getElements(mrFailingRequest)
- .map(JsonElement::getAsString);
-
- StepVerifier.create(result)
- .expectError(IllegalStateException.class)
- .verify(TIMEOUT);
- }
-
- @Test
- void subscriber_shouldSubscribeCorrectly(){
- Flux<String> subscriptionForElements = sut
- .subscribeForElements(mrSuccessRequest, Duration.ofSeconds(1))
- .map(JsonElement:: getAsString);
-
- StepVerifier.create(subscriptionForElements.take(2))
- .expectNext("I", "like")
+ void subscriber_shouldSubscribeToTopic(){
+ //given
+ final String topic = "TOPIC4";
+ final String topicUrl = String.format("%s/%s", EVENTS_PATH, topic);
+ final MessageRouterPublishRequest publishRequest = createPublishRequest(topicUrl);
+ final MessageRouterSubscribeRequest subscribeRequest = createMRSubscribeRequest(topicUrl,
+ CONSUMER_GROUP, CONSUMER_ID);
+
+ final List<String> twoJsonMessages = List.of("{\"message\":\"message1\"}",
+ "{\"differentMessage\":\"message2\"}");
+ final List<JsonElement> messages = getAsJsonElements(twoJsonMessages);
+ final Flux<JsonObject> jsonMessageBatch = jsonBatch(twoJsonMessages);
+
+ //when
+ registerTopic(publisher, publishRequest, subscriber, subscribeRequest);
+ final Flux<JsonElement> result = publisher.put(publishRequest, jsonMessageBatch)
+ .thenMany(subscriber.subscribeForElements(subscribeRequest, Duration.ofSeconds(1)));
+
+ //then
+ StepVerifier.create(result.take(2))
+ .expectNext(messages.get(0))
+ .expectNext(messages.get(1))
.expectComplete()
.verify(TIMEOUT);
}
- @Test
- void subscriber_shouldParseErrorWhenSubscribed(){
- Flux<String> subscriptionForElements = sut
- .subscribeForElements(mrFailingRequest, Duration.ofSeconds(1))
- .map(JsonElement:: getAsString);
-
- StepVerifier.create(subscriptionForElements.take(2))
- .expectError(IllegalStateException.class)
- .verify(TIMEOUT);
- }
- private static HttpServerRoutes setRoutes(HttpServerRoutes routes){
- return routes
- .get(SUCCESS_RESP_PATH, (req, resp) ->
- sendResource(resp, "/sample-mr-subscribe-response.json"))
- .get(FAILING_WITH_401_RESP_PATH, (req, resp) ->
- sendError(resp, 401, ERROR_MESSAGE))
- .get(FAILING_WITH_403_RESP_PATH, (req, resp) ->
- sendError(resp, 403, ERROR_MESSAGE))
- .get(FAILING_WITH_409_RESP_PATH, (req, resp) ->
- sendError(resp, 409, ERROR_MESSAGE))
- .get(FAILING_WITH_429_RESP_PATH, (req, resp) ->
- sendError(resp, 429, ERROR_MESSAGE))
- .get(FAILING_WITH_500_RESP_PATH, (req, resp) ->
- sendError(resp, 500, ERROR_MESSAGE));
- }
- private static MessageRouterSource createMessageRouterSource(DummyHttpServer server){
- return ImmutableMessageRouterSource.builder()
- .name("the topic")
- .topicUrl(String.format("http://%s:%d/events/TOPIC", server.host(), server.port()))
- .build();
- }
-
- private static MessageRouterSubscribeRequest createSuccessRequest(){
- return ImmutableMessageRouterSubscribeRequest.builder()
- .sourceDefinition(sourceDefinition)
- .consumerGroup(CONSUMER_GROUP)
- .consumerId(SUCCESS_CONSUMER_ID)
- .build();
- }
-
- private static MessageRouterSubscribeRequest createFailingRequest(String consumerId){
- return ImmutableMessageRouterSubscribeRequest
- .builder()
- .sourceDefinition(sourceDefinition)
- .consumerGroup(CONSUMER_GROUP)
- .consumerId(consumerId)
- .build();
- }
-
- private static MessageRouterSubscribeResponse createErrorResponse(String failReason){
- return ImmutableMessageRouterSubscribeResponse
- .builder()
- .failReason(failReason)
- .build();
- }
}
diff --git a/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterTestsUtils.java b/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterTestsUtils.java
new file mode 100644
index 00000000..8695b727
--- /dev/null
+++ b/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterTestsUtils.java
@@ -0,0 +1,125 @@
+/*
+ * ============LICENSE_START====================================
+ * DCAEGEN2-SERVICES-SDK
+ * =========================================================
+ * Copyright (C) 2019 Nokia. All rights reserved.
+ * =========================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=====================================
+ */
+
+package org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api;
+
+import com.google.gson.Gson;
+import com.google.gson.JsonElement;
+import com.google.gson.JsonObject;
+import com.google.gson.JsonParser;
+import com.google.gson.JsonPrimitive;
+import io.vavr.collection.List;
+import org.onap.dcaegen2.services.sdk.model.streams.dmaap.ImmutableMessageRouterSink;
+import org.onap.dcaegen2.services.sdk.model.streams.dmaap.ImmutableMessageRouterSource;
+import org.onap.dcaegen2.services.sdk.model.streams.dmaap.MessageRouterSink;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.ImmutableMessageRouterPublishRequest;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.ImmutableMessageRouterPublishResponse;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.ImmutableMessageRouterSubscribeRequest;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.ImmutableMessageRouterSubscribeResponse;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishRequest;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishResponse;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterSubscribeRequest;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterSubscribeResponse;
+import reactor.core.publisher.Flux;
+
+final class MessageRouterTestsUtils {
+ private static final JsonParser parser = new JsonParser();
+ private MessageRouterTestsUtils() {}
+
+ static MessageRouterPublishRequest createPublishRequest(String topicUrl){
+ MessageRouterSink sinkDefinition = ImmutableMessageRouterSink.builder()
+ .name("the topic")
+ .topicUrl(topicUrl)
+ .build();
+
+ return ImmutableMessageRouterPublishRequest.builder()
+ .sinkDefinition(sinkDefinition)
+ .build();
+ }
+
+ static MessageRouterSubscribeRequest createMRSubscribeRequest(String topicUrl,
+ String consumerGroup, String consumerId) {
+ ImmutableMessageRouterSource sourceDefinition = ImmutableMessageRouterSource.builder()
+ .name("the topic")
+ .topicUrl(topicUrl)
+ .build();
+
+ return ImmutableMessageRouterSubscribeRequest
+ .builder()
+ .sourceDefinition(sourceDefinition)
+ .consumerGroup(consumerGroup)
+ .consumerId(consumerId)
+ .build();
+ }
+
+ static List<JsonElement> getAsJsonElements(List<String> messages){
+ return messages.map(parser::parse);
+ }
+
+ static JsonObject getAsJsonObject(String item){
+ return new Gson().fromJson(item, JsonObject.class);
+ }
+
+ static Flux<JsonObject> jsonBatch(List<String> messages){
+ return Flux.fromIterable(messages).map(parser::parse).map(JsonElement::getAsJsonObject);
+ }
+
+ static Flux<JsonPrimitive> plainBatch(List<String> messages){
+ return Flux.fromIterable(messages).map(JsonPrimitive::new);
+ }
+
+ static MessageRouterSubscribeResponse errorSubscribeResponse(String failReasonFormat, Object... formatArgs){
+ return ImmutableMessageRouterSubscribeResponse
+ .builder()
+ .failReason(String.format(failReasonFormat, formatArgs))
+ .build();
+ }
+
+ static MessageRouterSubscribeResponse successSubscribeResponse(List<JsonElement> items){
+ return ImmutableMessageRouterSubscribeResponse
+ .builder()
+ .items(items)
+ .build();
+ }
+
+ static MessageRouterPublishResponse errorPublishResponse(String failReasonFormat, Object... formatArgs){
+ return ImmutableMessageRouterPublishResponse
+ .builder()
+ .failReason(String.format(failReasonFormat, formatArgs))
+ .build();
+ }
+
+ static MessageRouterPublishResponse successPublishResponse(List<JsonElement> items){
+ return ImmutableMessageRouterPublishResponse
+ .builder()
+ .items(items)
+ .build();
+ }
+
+ static void registerTopic(MessageRouterPublisher publisher, MessageRouterPublishRequest publishRequest,
+ MessageRouterSubscriber subscriber, MessageRouterSubscribeRequest subscribeRequest) {
+ final List<String> sampleJsonMessages = List.of("{\"message\":\"message1\"}",
+ "{\"differentMessage\":\"message2\"}");
+ final Flux<JsonObject> jsonMessageBatch = MessageRouterTestsUtils.jsonBatch(sampleJsonMessages);
+
+ publisher.put(publishRequest, jsonMessageBatch).blockLast();
+ subscriber.get(subscribeRequest).block();
+ }
+}