aboutsummaryrefslogtreecommitdiffstats
path: root/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterPublisherIT.java
diff options
context:
space:
mode:
Diffstat (limited to 'rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterPublisherIT.java')
-rw-r--r--rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterPublisherIT.java207
1 files changed, 74 insertions, 133 deletions
diff --git a/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterPublisherIT.java b/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterPublisherIT.java
index 9fbd63c8..c746bfec 100644
--- a/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterPublisherIT.java
+++ b/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterPublisherIT.java
@@ -20,97 +20,68 @@
package org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api;
-import static org.onap.dcaegen2.services.sdk.rest.services.adapters.http.test.DummyHttpServer.sendError;
-import static org.onap.dcaegen2.services.sdk.rest.services.adapters.http.test.DummyHttpServer.sendString;
+import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.MessageRouterTestsUtils.*;
import com.google.gson.JsonElement;
+import com.google.gson.JsonObject;
import com.google.gson.JsonPrimitive;
import io.vavr.collection.List;
import java.time.Duration;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
-import org.onap.dcaegen2.services.sdk.model.streams.dmaap.ImmutableMessageRouterSink;
-import org.onap.dcaegen2.services.sdk.model.streams.dmaap.MessageRouterSink;
-import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.test.DummyHttpServer;
-import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.ImmutableMessageRouterPublishRequest;
-import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.ImmutableMessageRouterPublishResponse;
import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishRequest;
import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishResponse;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterSubscribeRequest;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterSubscribeResponse;
import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.MessageRouterPublisherConfig;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.MessageRouterSubscriberConfig;
+import org.testcontainers.containers.DockerComposeContainer;
+import org.testcontainers.junit.jupiter.Container;
+import org.testcontainers.junit.jupiter.Testcontainers;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.test.StepVerifier;
-/**
- * @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a>
- * @since May 2019
- */
+@Testcontainers
class MessageRouterPublisherIT {
-
- private static final String ERROR_MESSAGE = "Something went wrong";
- private static final String TEXT_PLAIN_CONTENT_TYPE = "text/plain";
- private static final String JSON_CONTENT_TYPE = "application/json";
- private static final String SUCCESS_RESP_TOPIC_PATH = "/events/TOPIC";
- private static final String FAILING_WITH_400_RESP_PATH = "/events/TOPIC400";
- private static final String FAILING_WITH_401_RESP_PATH = "/events/TOPIC401";
- private static final String FAILING_WITH_403_RESP_PATH = "/events/TOPIC403";
- private static final String FAILING_WITH_404_RESP_PATH = "/events/TOPIC404";
- private static final String FAILING_WITH_500_TOPIC_PATH = "/events/TOPIC500";
+ @Container
+ private static final DockerComposeContainer CONTAINER = DMaapContainer.createContainerInstance();
private static final Duration TIMEOUT = Duration.ofSeconds(10);
- private static final Flux<JsonPrimitive> messageBatch = Flux.just("ala", "ma", "kota")
- .map(JsonPrimitive::new);
- private static final List<String> messageBatchItems = List.of("ala", "ma", "kota");
-
- private static DummyHttpServer server;
- private MessageRouterPublisher sut = DmaapClientFactory
+ private static final String DMAAP_400_ERROR_RESPONSE_FORMAT = "400 Bad Request\n"
+ + "{"
+ + "\"mrstatus\":5007,"
+ + "\"helpURL\":\"http://onap.readthedocs.io\","
+ + "\"message\":\"Error while publishing data to topic.:%s."
+ + "Successfully published number of messages :0."
+ + "Expected { to start an object.\",\"status\":400"
+ + "}";
+ private static String EVENTS_PATH;
+ private final MessageRouterPublisher publisher = DmaapClientFactory
.createMessageRouterPublisher(MessageRouterPublisherConfig.createDefault());
-
+ private MessageRouterSubscriber subscriber = DmaapClientFactory
+ .createMessageRouterSubscriber(MessageRouterSubscriberConfig.createDefault());
@BeforeAll
static void setUp() {
- server = DummyHttpServer.start(routes ->
- routes.post(SUCCESS_RESP_TOPIC_PATH, (req, resp) -> sendString(resp, Mono.just("OK")))
- .post(FAILING_WITH_400_RESP_PATH, (req, resp) ->
- sendError(resp, 400, ERROR_MESSAGE))
- .post(FAILING_WITH_401_RESP_PATH, (req, resp) ->
- sendError(resp, 401, ERROR_MESSAGE))
- .post(FAILING_WITH_403_RESP_PATH, (req, resp) ->
- sendError(resp, 403, ERROR_MESSAGE))
- .post(FAILING_WITH_404_RESP_PATH, (req, resp) ->
- sendError(resp, 404, ERROR_MESSAGE))
- .post(FAILING_WITH_500_TOPIC_PATH, (req, resp) ->
- sendError(resp, 500, ERROR_MESSAGE))
- );
+ EVENTS_PATH = String.format("http://%s:%d/events",
+ CONTAINER.getServiceHost(DMaapContainer.DMAAP_SERVICE_NAME,
+ DMaapContainer.DMAAP_SERVICE_EXPOSED_PORT),
+ CONTAINER.getServicePort(DMaapContainer.DMAAP_SERVICE_NAME,
+ DMaapContainer.DMAAP_SERVICE_EXPOSED_PORT));
}
@Test
void test_put_givenMessageBatch_shouldMakeSuccessfulPostRequestReturningBatch(){
//given
- final MessageRouterPublishRequest mrRequest = createMRRequest(SUCCESS_RESP_TOPIC_PATH,
- TEXT_PLAIN_CONTENT_TYPE);
- final List<JsonElement> expectedItems = messageBatchItems.map(JsonPrimitive::new);
-
+ final String topic = "TOPIC";
+ final List<String> twoJsonMessages = List.of("{\"message\":\"message1\"}",
+ "{\"differentMessage\":\"message2\"}");
+ final Flux<JsonObject> messageBatch = jsonBatch(twoJsonMessages);
+ final MessageRouterPublishRequest mrRequest = createPublishRequest(String.format("%s/%s", EVENTS_PATH, topic));
+ final MessageRouterPublishResponse expectedResponse = successPublishResponse(getAsJsonElements(twoJsonMessages));
//when
- final Flux<MessageRouterPublishResponse> result = sut.put(mrRequest, messageBatch);
-
- //then
- StepVerifier.create(result)
- .expectNext(ImmutableMessageRouterPublishResponse.builder().items(expectedItems).build())
- .expectComplete()
- .verify(TIMEOUT);
- }
-
- @Test
- void publisher_shouldHandleBadRequestError(){
- //given
- final MessageRouterPublishRequest mrRequest = createMRRequest(FAILING_WITH_400_RESP_PATH,
- JSON_CONTENT_TYPE);
- final MessageRouterPublishResponse expectedResponse = createErrorResponse(
- "400 Bad Request\n%s", ERROR_MESSAGE);
-
- //when
- final Flux<MessageRouterPublishResponse> result = sut.put(mrRequest, messageBatch);
+ final Flux<MessageRouterPublishResponse> result = publisher.put(mrRequest, messageBatch);
//then
StepVerifier.create(result)
@@ -120,34 +91,17 @@ class MessageRouterPublisherIT {
}
@Test
- void publisher_shouldHandleUnauthorizedError(){
- //given
- final MessageRouterPublishRequest mrRequest = createMRRequest(FAILING_WITH_401_RESP_PATH,
- TEXT_PLAIN_CONTENT_TYPE);
- final MessageRouterPublishResponse expectedResponse = createErrorResponse(
- "401 Unauthorized\n%s", ERROR_MESSAGE);
-
- //when
- final Flux<MessageRouterPublishResponse> result = sut.put(mrRequest, messageBatch);
-
- //then
- StepVerifier.create(result)
- .expectNext(expectedResponse)
- .expectComplete()
- .verify(TIMEOUT);
- }
-
- @Test
- void publisher_shouldHandleForbiddenError(){
+ void publisher_shouldHandleBadRequestError(){
//given
- final MessageRouterPublishRequest mrRequest = createMRRequest(FAILING_WITH_403_RESP_PATH,
- TEXT_PLAIN_CONTENT_TYPE);
- final MessageRouterPublishResponse expectedResponse = createErrorResponse(
- "403 Forbidden\n%s", ERROR_MESSAGE);
+ final String topic = "TOPIC2";
+ final List<String> threePlainTextMessages = List.of("I", "like", "pizza");
+ final Flux<JsonPrimitive> messageBatch = plainBatch(threePlainTextMessages);
+ final MessageRouterPublishRequest mrRequest = createPublishRequest(String.format("%s/%s", EVENTS_PATH, topic));
+ final MessageRouterPublishResponse expectedResponse = errorPublishResponse(
+ DMAAP_400_ERROR_RESPONSE_FORMAT, topic);
//when
- final Flux<MessageRouterPublishResponse> result = sut
- .put(mrRequest, messageBatch);
+ final Flux<MessageRouterPublishResponse> result = publisher.put(mrRequest, messageBatch);
//then
StepVerifier.create(result)
@@ -157,64 +111,51 @@ class MessageRouterPublisherIT {
}
@Test
- void publisher_shouldHandleNotFoundError(){
- //given
- final MessageRouterPublishRequest mrRequest = createMRRequest(FAILING_WITH_404_RESP_PATH,
- TEXT_PLAIN_CONTENT_TYPE);
- final MessageRouterPublishResponse expectedResponse = createErrorResponse(
- "404 Not Found\n%s", ERROR_MESSAGE);
+ void publisher_shouldSuccessfullyPublishSingleMessage(){
+ final String topic = "TOPIC3";
+ final String topicUrl = String.format("%s/%s", EVENTS_PATH, topic);
+ final List<String> singleJsonMessage = List.of("{\"message\":\"message1\"}");
+ final List<JsonElement> expectedItems = getAsJsonElements(singleJsonMessage);
+ final Flux<JsonObject> jsonMessageBatch = jsonBatch(singleJsonMessage);
+ final MessageRouterPublishRequest publishRequest = createPublishRequest(topicUrl);
+ final MessageRouterSubscribeRequest subscribeRequest = createMRSubscribeRequest(topicUrl, "sampleGroup", "sampleId");
+ final MessageRouterSubscribeResponse expectedResponse = successSubscribeResponse(expectedItems);
//when
- final Flux<MessageRouterPublishResponse> result = sut
- .put(mrRequest, messageBatch);
+ registerTopic(publisher, publishRequest, subscriber, subscribeRequest);
+ Mono<MessageRouterSubscribeResponse> response = publisher
+ .put(publishRequest, jsonMessageBatch)
+ .then(subscriber.get(subscribeRequest));
//then
- StepVerifier.create(result)
+ StepVerifier.create(response)
.expectNext(expectedResponse)
.expectComplete()
- .verify(TIMEOUT);
+ .verify();
}
@Test
- void publisher_shouldHandleInternalServerError(){
- //given
- final MessageRouterPublishRequest mrRequest = createMRRequest(FAILING_WITH_500_TOPIC_PATH,
- TEXT_PLAIN_CONTENT_TYPE);
- final MessageRouterPublishResponse expectedResponse = createErrorResponse(
- "500 Internal Server Error\n%s", ERROR_MESSAGE);
+ void publisher_shouldSuccessfullyPublishMultipleMessages(){
+ final String topic = "TOPIC4";
+ final String topicUrl = String.format("%s/%s", EVENTS_PATH, topic);
+ final List<String> singleJsonMessage = List.of("{\"message\":\"message1\"}",
+ "{\"differentMessage\":\"message2\"}");
+ final List<JsonElement> expectedItems = getAsJsonElements(singleJsonMessage);
+ final Flux<JsonObject> jsonMessageBatch = jsonBatch(singleJsonMessage);
+ final MessageRouterPublishRequest publishRequest = createPublishRequest(topicUrl);
+ final MessageRouterSubscribeRequest subscribeRequest = createMRSubscribeRequest(topicUrl, "sampleGroup", "sampleId");
+ final MessageRouterSubscribeResponse expectedResponse = successSubscribeResponse(expectedItems);
//when
- final Flux<MessageRouterPublishResponse> result = sut
- .put(mrRequest, messageBatch);
+ registerTopic(publisher, publishRequest, subscriber, subscribeRequest);
+ Mono<MessageRouterSubscribeResponse> response = publisher
+ .put(publishRequest, jsonMessageBatch)
+ .then(subscriber.get(subscribeRequest));
//then
- StepVerifier.create(result)
+ StepVerifier.create(response)
.expectNext(expectedResponse)
.expectComplete()
- .verify(TIMEOUT);
- }
-
-
- private MessageRouterPublishRequest createMRRequest(String topicPath, String contentType){
- final MessageRouterSink sinkDefinition = ImmutableMessageRouterSink.builder()
- .name("the topic")
- .topicUrl(String.format("http://%s:%d%s",
- server.host(),
- server.port(),
- topicPath)
- )
- .build();
-
- return ImmutableMessageRouterPublishRequest.builder()
- .sinkDefinition(sinkDefinition)
- .contentType(contentType)
- .build();
- }
-
- private MessageRouterPublishResponse createErrorResponse(String failReasonFormat, Object... formatArgs){
- return ImmutableMessageRouterPublishResponse
- .builder()
- .failReason(String.format(failReasonFormat, formatArgs))
- .build();
+ .verify();
}
}