aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorIzabela Zawadzka <izabela.zawadzka@nokia.com>2019-06-14 15:26:29 +0200
committerIzabela Zawadzka <izabela.zawadzka@nokia.com>2019-06-26 09:23:52 +0200
commit97d60b4653c265530485209e8f61f73137d521b0 (patch)
tree21bbbfe4d077df3062d0f9b8176b94a0941cea4f
parent3ec6e99c20075386b639f33f27bcd3a3b3e5706d (diff)
Add text/plain content type handling in Publisher
Change-Id: I51e17d64f813e16b81385abb8aa862ee1f927d35 Signed-off-by: Izabela Zawadzka <izabela.zawadzka@nokia.com> Issue-ID: DCAEGEN2-1630
-rw-r--r--rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/ContentType.java40
-rw-r--r--rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/impl/MessageRouterPublisherImpl.java21
-rw-r--r--rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/model/MessageRouterPublishRequest.java5
-rw-r--r--rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/MessageRouterTestsUtils.java (renamed from rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterTestsUtils.java)46
-rw-r--r--rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterPublisherIT.java117
-rw-r--r--rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterSubscriberIT.java2
-rw-r--r--rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/impl/MessageRouterPublisherImplTest.java416
7 files changed, 522 insertions, 125 deletions
diff --git a/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/ContentType.java b/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/ContentType.java
new file mode 100644
index 00000000..80a28d6c
--- /dev/null
+++ b/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/ContentType.java
@@ -0,0 +1,40 @@
+/*
+ * ============LICENSE_START====================================
+ * DCAEGEN2-SERVICES-SDK
+ * =========================================================
+ * Copyright (C) 2019 Nokia. All rights reserved.
+ * =========================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=====================================
+ */
+
+package org.onap.dcaegen2.services.sdk.rest.services.dmaap.client;
+
+import io.netty.handler.codec.http.HttpHeaderValues;
+import io.netty.util.AsciiString;
+
+public enum ContentType {
+ APPLICATION_JSON(HttpHeaderValues.APPLICATION_JSON),
+ TEXT_PLAIN(HttpHeaderValues.TEXT_PLAIN);
+
+ private AsciiString contentType;
+
+ ContentType(AsciiString contentType) {
+ this.contentType = contentType;
+ }
+
+ @Override
+ public String toString(){
+ return contentType.toString();
+ }
+}
diff --git a/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/impl/MessageRouterPublisherImpl.java b/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/impl/MessageRouterPublisherImpl.java
index aa88b9ee..191ec64f 100644
--- a/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/impl/MessageRouterPublisherImpl.java
+++ b/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/impl/MessageRouterPublisherImpl.java
@@ -27,6 +27,7 @@ import com.google.gson.JsonElement;
import io.vavr.collection.HashMap;
import io.vavr.collection.List;
import java.time.Duration;
+import java.util.stream.Collectors;
import org.jetbrains.annotations.NotNull;
import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.HttpHeaders;
import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.HttpMethod;
@@ -35,6 +36,7 @@ import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.HttpResponse;
import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.ImmutableHttpRequest;
import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.RequestBody;
import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.RxHttpClient;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.ContentType;
import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.MessageRouterPublisher;
import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.ImmutableMessageRouterPublishResponse;
import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishRequest;
@@ -74,14 +76,20 @@ public class MessageRouterPublisherImpl implements MessageRouterPublisher {
List<JsonElement> batch) {
LOGGER.debug("Sending a batch of {} items to DMaaP MR", batch.size());
LOGGER.trace("The items to be sent: {}", batch);
- return httpClient.call(buildHttpRequest(request, asJsonBody(batch)))
+ return httpClient.call(buildHttpRequest(request, createBody(batch, request.contentType())))
.map(httpResponse -> buildResponse(httpResponse, batch));
}
- private @NotNull RequestBody asJsonBody(List<? extends JsonElement> subItems) {
- final JsonArray elements = new JsonArray(subItems.size());
- subItems.forEach(elements::add);
- return RequestBody.fromJson(elements);
+ private @NotNull RequestBody createBody(List<? extends JsonElement> subItems, ContentType contentType) {
+ if(contentType == ContentType.APPLICATION_JSON) {
+ final JsonArray elements = new JsonArray(subItems.size());
+ subItems.forEach(elements::add);
+ return RequestBody.fromJson(elements);
+ }else if(contentType == ContentType.TEXT_PLAIN){
+ String messages = subItems.map(JsonElement::toString)
+ .collect(Collectors.joining("\n"));
+ return RequestBody.fromString(messages);
+ }else throw new IllegalArgumentException("Unsupported content type: " + contentType);
}
private @NotNull HttpRequest buildHttpRequest(MessageRouterPublishRequest request, RequestBody body) {
@@ -89,7 +97,7 @@ public class MessageRouterPublisherImpl implements MessageRouterPublisher {
.method(HttpMethod.POST)
.url(request.sinkDefinition().topicUrl())
.diagnosticContext(request.diagnosticContext().withNewInvocationId())
- .customHeaders(HashMap.of(HttpHeaders.CONTENT_TYPE, request.contentType()))
+ .customHeaders(HashMap.of(HttpHeaders.CONTENT_TYPE, request.contentType().toString()))
.body(body)
.build();
}
@@ -98,6 +106,7 @@ public class MessageRouterPublisherImpl implements MessageRouterPublisher {
HttpResponse httpResponse, List<JsonElement> batch) {
final ImmutableMessageRouterPublishResponse.Builder builder =
ImmutableMessageRouterPublishResponse.builder();
+
return httpResponse.successful()
? builder.items(batch).build()
: builder.failReason(extractFailReason(httpResponse)).build();
diff --git a/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/model/MessageRouterPublishRequest.java b/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/model/MessageRouterPublishRequest.java
index 77f92e77..29904138 100644
--- a/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/model/MessageRouterPublishRequest.java
+++ b/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/model/MessageRouterPublishRequest.java
@@ -23,6 +23,7 @@ package org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model;
import org.immutables.value.Value;
import org.onap.dcaegen2.services.sdk.model.streams.dmaap.MessageRouterSink;
import org.onap.dcaegen2.services.sdk.rest.services.annotations.ExperimentalApi;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.ContentType;
/**
* @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a>
@@ -34,7 +35,7 @@ public interface MessageRouterPublishRequest extends DmaapRequest {
MessageRouterSink sinkDefinition();
@Value.Default
- default String contentType() {
- return "application/json";
+ default ContentType contentType() {
+ return ContentType.APPLICATION_JSON;
}
}
diff --git a/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterTestsUtils.java b/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/MessageRouterTestsUtils.java
index 8695b727..52946f56 100644
--- a/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterTestsUtils.java
+++ b/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/MessageRouterTestsUtils.java
@@ -18,7 +18,7 @@
* ============LICENSE_END=====================================
*/
-package org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api;
+package org.onap.dcaegen2.services.sdk.rest.services.dmaap.client;
import com.google.gson.Gson;
import com.google.gson.JsonElement;
@@ -29,6 +29,8 @@ import io.vavr.collection.List;
import org.onap.dcaegen2.services.sdk.model.streams.dmaap.ImmutableMessageRouterSink;
import org.onap.dcaegen2.services.sdk.model.streams.dmaap.ImmutableMessageRouterSource;
import org.onap.dcaegen2.services.sdk.model.streams.dmaap.MessageRouterSink;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.MessageRouterPublisher;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.MessageRouterSubscriber;
import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.ImmutableMessageRouterPublishRequest;
import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.ImmutableMessageRouterPublishResponse;
import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.ImmutableMessageRouterSubscribeRequest;
@@ -39,11 +41,16 @@ import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRo
import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterSubscribeResponse;
import reactor.core.publisher.Flux;
-final class MessageRouterTestsUtils {
+
+public final class MessageRouterTestsUtils {
private static final JsonParser parser = new JsonParser();
private MessageRouterTestsUtils() {}
- static MessageRouterPublishRequest createPublishRequest(String topicUrl){
+ public static MessageRouterPublishRequest createPublishRequest(String topicUrl){
+ return createPublishRequest(topicUrl, ContentType.APPLICATION_JSON);
+ }
+
+ public static MessageRouterPublishRequest createPublishRequest(String topicUrl, ContentType contentType){
MessageRouterSink sinkDefinition = ImmutableMessageRouterSink.builder()
.name("the topic")
.topicUrl(topicUrl)
@@ -51,10 +58,11 @@ final class MessageRouterTestsUtils {
return ImmutableMessageRouterPublishRequest.builder()
.sinkDefinition(sinkDefinition)
+ .contentType(contentType)
.build();
}
- static MessageRouterSubscribeRequest createMRSubscribeRequest(String topicUrl,
+ public static MessageRouterSubscribeRequest createMRSubscribeRequest(String topicUrl,
String consumerGroup, String consumerId) {
ImmutableMessageRouterSource sourceDefinition = ImmutableMessageRouterSource.builder()
.name("the topic")
@@ -69,51 +77,59 @@ final class MessageRouterTestsUtils {
.build();
}
- static List<JsonElement> getAsJsonElements(List<String> messages){
+ public static List<JsonElement> getAsJsonElements(List<String> messages){
return messages.map(parser::parse);
}
- static JsonObject getAsJsonObject(String item){
+ public static List<JsonObject> getAsJsonObjects(List<String> messages){
+ return getAsJsonElements(messages).map(JsonElement::getAsJsonObject);
+ }
+
+ public static List<JsonPrimitive> getAsJsonPrimitives(List<String> messages){
+ return getAsJsonElements(messages).map(JsonElement::getAsJsonPrimitive);
+ }
+
+ public static JsonObject getAsJsonObject(String item){
return new Gson().fromJson(item, JsonObject.class);
}
- static Flux<JsonObject> jsonBatch(List<String> messages){
- return Flux.fromIterable(messages).map(parser::parse).map(JsonElement::getAsJsonObject);
+ public static Flux<JsonElement> plainBatch(List<String> messages){
+ return Flux.fromIterable(getAsJsonElements(messages));
}
- static Flux<JsonPrimitive> plainBatch(List<String> messages){
- return Flux.fromIterable(messages).map(JsonPrimitive::new);
+ public static Flux<JsonObject> jsonBatch(List<String> messages){
+ return Flux.fromIterable(getAsJsonObjects(messages));
}
- static MessageRouterSubscribeResponse errorSubscribeResponse(String failReasonFormat, Object... formatArgs){
+ public static MessageRouterSubscribeResponse errorSubscribeResponse(String failReasonFormat, Object... formatArgs){
return ImmutableMessageRouterSubscribeResponse
.builder()
.failReason(String.format(failReasonFormat, formatArgs))
.build();
}
- static MessageRouterSubscribeResponse successSubscribeResponse(List<JsonElement> items){
+ public static MessageRouterSubscribeResponse successSubscribeResponse(List<JsonElement> items){
return ImmutableMessageRouterSubscribeResponse
.builder()
.items(items)
.build();
}
- static MessageRouterPublishResponse errorPublishResponse(String failReasonFormat, Object... formatArgs){
+ public static MessageRouterPublishResponse errorPublishResponse(String failReasonFormat, Object... formatArgs){
return ImmutableMessageRouterPublishResponse
.builder()
.failReason(String.format(failReasonFormat, formatArgs))
.build();
}
- static MessageRouterPublishResponse successPublishResponse(List<JsonElement> items){
+ public static MessageRouterPublishResponse successPublishResponse(List<JsonElement> items){
return ImmutableMessageRouterPublishResponse
.builder()
.items(items)
.build();
}
- static void registerTopic(MessageRouterPublisher publisher, MessageRouterPublishRequest publishRequest,
+ public static void registerTopic(MessageRouterPublisher publisher, MessageRouterPublishRequest publishRequest,
MessageRouterSubscriber subscriber, MessageRouterSubscribeRequest subscribeRequest) {
final List<String> sampleJsonMessages = List.of("{\"message\":\"message1\"}",
"{\"differentMessage\":\"message2\"}");
diff --git a/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterPublisherIT.java b/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterPublisherIT.java
index c746bfec..0afea748 100644
--- a/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterPublisherIT.java
+++ b/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterPublisherIT.java
@@ -20,15 +20,15 @@
package org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api;
-import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.MessageRouterTestsUtils.*;
+import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.MessageRouterTestsUtils.*;
import com.google.gson.JsonElement;
import com.google.gson.JsonObject;
-import com.google.gson.JsonPrimitive;
import io.vavr.collection.List;
import java.time.Duration;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.ContentType;
import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishRequest;
import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishResponse;
import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterSubscribeRequest;
@@ -95,7 +95,7 @@ class MessageRouterPublisherIT {
//given
final String topic = "TOPIC2";
final List<String> threePlainTextMessages = List.of("I", "like", "pizza");
- final Flux<JsonPrimitive> messageBatch = plainBatch(threePlainTextMessages);
+ final Flux<JsonElement> messageBatch = plainBatch(threePlainTextMessages);
final MessageRouterPublishRequest mrRequest = createPublishRequest(String.format("%s/%s", EVENTS_PATH, topic));
final MessageRouterPublishResponse expectedResponse = errorPublishResponse(
DMAAP_400_ERROR_RESPONSE_FORMAT, topic);
@@ -112,6 +112,7 @@ class MessageRouterPublisherIT {
@Test
void publisher_shouldSuccessfullyPublishSingleMessage(){
+ //given
final String topic = "TOPIC3";
final String topicUrl = String.format("%s/%s", EVENTS_PATH, topic);
final List<String> singleJsonMessage = List.of("{\"message\":\"message1\"}");
@@ -136,7 +137,7 @@ class MessageRouterPublisherIT {
@Test
void publisher_shouldSuccessfullyPublishMultipleMessages(){
- final String topic = "TOPIC4";
+ final String topic = "TOPIC5";
final String topicUrl = String.format("%s/%s", EVENTS_PATH, topic);
final List<String> singleJsonMessage = List.of("{\"message\":\"message1\"}",
"{\"differentMessage\":\"message2\"}");
@@ -158,4 +159,112 @@ class MessageRouterPublisherIT {
.expectComplete()
.verify();
}
+
+ @Test
+ void publisher_shouldSuccessfullyPublishSingleJsonMessageWithPlainContentType(){
+ //given
+ final String topic = "TOPIC6";
+ final String topicUrl = String.format("%s/%s", EVENTS_PATH, topic);
+
+ final List<String> singleJsonMessage = List.of("{\"message\":\"message1\"}");
+ final List<JsonElement> expectedItems = getAsJsonElements(singleJsonMessage);
+ final Flux<JsonElement> plainBatch = plainBatch(singleJsonMessage);
+
+ final MessageRouterPublishRequest publishRequest = createPublishRequest(topicUrl, ContentType.TEXT_PLAIN);
+ final MessageRouterSubscribeRequest subscribeRequest = createMRSubscribeRequest(topicUrl, "sampleGroup", "sampleId");
+ final MessageRouterSubscribeResponse expectedResponse = successSubscribeResponse(expectedItems);
+
+ //when
+ registerTopic(publisher, publishRequest, subscriber, subscribeRequest);
+ Mono<MessageRouterSubscribeResponse> response = publisher
+ .put(publishRequest, plainBatch)
+ .then(subscriber.get(subscribeRequest));
+
+ //then
+ StepVerifier.create(response)
+ .expectNext(expectedResponse)
+ .expectComplete()
+ .verify();
+ }
+
+ @Test
+ void publisher_shouldSuccessfullyPublishMultipleJsonMessagesWithPlainContentType(){
+ //given
+ final String topic = "TOPIC7";
+ final String topicUrl = String.format("%s/%s", EVENTS_PATH, topic);
+
+ final List<String> twoJsonMessage = List.of("{\"message\":\"message1\"}", "{\"message2\":\"message2\"}");
+ final List<JsonElement> expectedItems = getAsJsonElements(twoJsonMessage);
+ final Flux<JsonElement> plainBatch = plainBatch(twoJsonMessage);
+
+ final MessageRouterPublishRequest publishRequest = createPublishRequest(topicUrl, ContentType.TEXT_PLAIN);
+ final MessageRouterSubscribeRequest subscribeRequest = createMRSubscribeRequest(topicUrl, "sampleGroup", "sampleId");
+ final MessageRouterSubscribeResponse expectedResponse = successSubscribeResponse(expectedItems);
+
+ //when
+ registerTopic(publisher, publishRequest, subscriber, subscribeRequest);
+ Mono<MessageRouterSubscribeResponse> response = publisher
+ .put(publishRequest, plainBatch)
+ .then(subscriber.get(subscribeRequest));
+
+ //then
+ StepVerifier.create(response)
+ .expectNext(expectedResponse)
+ .expectComplete()
+ .verify();
+ }
+
+ @Test
+ void publisher_shouldSuccessfullyPublishSinglePlainMessageWithPlainContentType(){
+ //given
+ final String topic = "TOPIC8";
+ final String topicUrl = String.format("%s/%s", EVENTS_PATH, topic);
+
+ final List<String> singlePlainMessage = List.of("kebab");
+ final List<JsonElement> expectedItems = getAsJsonElements(singlePlainMessage);
+ final Flux<JsonElement> plainBatch = plainBatch(singlePlainMessage);
+
+ final MessageRouterPublishRequest publishRequest = createPublishRequest(topicUrl, ContentType.TEXT_PLAIN);
+ final MessageRouterSubscribeRequest subscribeRequest = createMRSubscribeRequest(topicUrl, "sampleGroup", "sampleId");
+ final MessageRouterSubscribeResponse expectedResponse = successSubscribeResponse(expectedItems);
+
+ //when
+ registerTopic(publisher, publishRequest, subscriber, subscribeRequest);
+ Mono<MessageRouterSubscribeResponse> response = publisher
+ .put(publishRequest, plainBatch)
+ .then(subscriber.get(subscribeRequest));
+
+ //then
+ StepVerifier.create(response)
+ .expectNext(expectedResponse)
+ .expectComplete()
+ .verify();
+ }
+
+ @Test
+ void publisher_shouldSuccessfullyPublishMultiplePlainMessagesWithPlainContentType(){
+ //given
+ final String topic = "TOPIC9";
+ final String topicUrl = String.format("%s/%s", EVENTS_PATH, topic);
+
+ final List<String> singlePlainMessage = List.of("I", "like", "pizza");
+ final List<JsonElement> expectedItems = getAsJsonElements(singlePlainMessage);
+ final Flux<JsonElement> plainBatch = plainBatch(singlePlainMessage);
+
+ final MessageRouterPublishRequest publishRequest = createPublishRequest(topicUrl, ContentType.TEXT_PLAIN);
+ final MessageRouterSubscribeRequest subscribeRequest = createMRSubscribeRequest(topicUrl, "sampleGroup", "sampleId");
+ final MessageRouterSubscribeResponse expectedResponse = successSubscribeResponse(expectedItems);
+
+ //when
+ registerTopic(publisher, publishRequest, subscriber, subscribeRequest);
+ Mono<MessageRouterSubscribeResponse> response = publisher
+ .put(publishRequest, plainBatch)
+ .then(subscriber.get(subscribeRequest));
+
+ //then
+ StepVerifier.create(response)
+ .expectNext(expectedResponse)
+ .expectComplete()
+ .verify();
+ }
}
diff --git a/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterSubscriberIT.java b/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterSubscriberIT.java
index c2e96b58..a758b2de 100644
--- a/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterSubscriberIT.java
+++ b/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterSubscriberIT.java
@@ -20,7 +20,7 @@
package org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api;
-import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.MessageRouterTestsUtils.*;
+import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.MessageRouterTestsUtils.*;
import com.google.gson.JsonElement;
import com.google.gson.JsonObject;
diff --git a/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/impl/MessageRouterPublisherImplTest.java b/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/impl/MessageRouterPublisherImplTest.java
index f44e86a5..38659acd 100644
--- a/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/impl/MessageRouterPublisherImplTest.java
+++ b/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/impl/MessageRouterPublisherImplTest.java
@@ -26,32 +26,28 @@ import static org.mockito.BDDMockito.given;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
+import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.MessageRouterTestsUtils.*;
import com.google.gson.JsonArray;
import com.google.gson.JsonElement;
import com.google.gson.JsonObject;
-import com.google.gson.JsonParser;
import com.google.gson.Gson;
+import com.google.gson.JsonPrimitive;
import io.netty.buffer.ByteBufAllocator;
import io.netty.handler.codec.http.HttpHeaderValues;
+import io.vavr.collection.List;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
-import java.util.Arrays;
-import java.util.List;
-import java.util.stream.Collectors;
-import java.util.stream.Stream;
import org.junit.jupiter.api.Test;
import org.mockito.ArgumentCaptor;
-import org.onap.dcaegen2.services.sdk.model.streams.dmaap.ImmutableMessageRouterSink;
-import org.onap.dcaegen2.services.sdk.model.streams.dmaap.MessageRouterSink;
import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.HttpHeaders;
import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.HttpMethod;
import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.HttpRequest;
import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.HttpResponse;
import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.ImmutableHttpResponse;
import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.RxHttpClient;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.ContentType;
import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.MessageRouterPublisher;
-import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.ImmutableMessageRouterPublishRequest;
import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishRequest;
import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishResponse;
import reactor.core.publisher.Flux;
@@ -64,71 +60,123 @@ import reactor.test.StepVerifier;
*/
class MessageRouterPublisherImplTest {
private static final Duration TIMEOUT = Duration.ofSeconds(5);
- private static final JsonParser parser = new JsonParser();
- private static final MessageRouterSink sinkDefinition = ImmutableMessageRouterSink.builder()
- .name("the topic")
- .topicUrl("https://dmaap-mr/TOPIC")
- .build();
+ private static final String TOPIC_URL = "https://dmaap-mr/TOPIC";
+ private static final int MAX_BATCH_SIZE = 3;
private final RxHttpClient httpClient = mock(RxHttpClient.class);
- private final MessageRouterPublisher cut = new MessageRouterPublisherImpl(httpClient, 3, Duration.ofMinutes(1));
+ private final MessageRouterPublisher cut = new MessageRouterPublisherImpl(httpClient, MAX_BATCH_SIZE, Duration.ofMinutes(1));
private final ArgumentCaptor<HttpRequest> httpRequestArgumentCaptor = ArgumentCaptor.forClass(HttpRequest.class);
- private final MessageRouterPublishRequest mrRequestTextPlain = createMRRPublishRequest();
- private final MessageRouterPublishRequest mrRequestJson = createMRRPublishRequest();
+ private final MessageRouterPublishRequest plainPublishRequest = createPublishRequest(TOPIC_URL, ContentType.TEXT_PLAIN);
+ private final MessageRouterPublishRequest jsonPublishRequest = createPublishRequest(TOPIC_URL);
private final HttpResponse successHttpResponse = createHttpResponse("OK", 200);
@Test
void puttingElementsShouldYieldNonChunkedHttpRequest() {
// given
- final List<String> threeJsonMessages = getAsMRJsonMessages(Arrays.asList("I", "like", "cookies"));
+ final List<String> threeJsonMessages = getAsMRJsonMessages(List.of("I", "like", "cookies"));
final Flux<JsonObject> singleJsonMessageBatch = jsonBatch(threeJsonMessages);
given(httpClient.call(any(HttpRequest.class))).willReturn(Mono.just(successHttpResponse));
// when
final Flux<MessageRouterPublishResponse> responses = cut
- .put(mrRequestTextPlain, singleJsonMessageBatch);
+ .put(jsonPublishRequest, singleJsonMessageBatch);
responses.then().block();
// then
verify(httpClient).call(httpRequestArgumentCaptor.capture());
final HttpRequest httpRequest = httpRequestArgumentCaptor.getValue();
assertThat(httpRequest.method()).isEqualTo(HttpMethod.POST);
- assertThat(httpRequest.url()).isEqualTo(sinkDefinition.topicUrl());
+ assertThat(httpRequest.url()).isEqualTo(TOPIC_URL);
assertThat(httpRequest.body()).isNotNull();
assertThat(httpRequest.body().length()).isGreaterThan(0);
}
@Test
- void puttingLowNumberOfElementsShouldYieldSingleHttpRequest() {
+ void onPut_givenJsonMessages_whenTheirAmountIsNotAboveMaxBatchSize_shouldSendSingleHttpRequest(){
// given
- final List<String> threeJsonMessages = getAsMRJsonMessages(Arrays.asList("I", "like", "cookies"));
- final Flux<JsonObject> singleJsonMessageBatch = jsonBatch(threeJsonMessages);
+ final List<String> threeJsonMessages = getAsMRJsonMessages(List.of("I", "like", "cookies"));
+ final List<JsonObject> parsedThreeMessages = getAsJsonObjects(threeJsonMessages);
+
+ final Flux<JsonObject> jsonMessagesMaxBatch = jsonBatch(threeJsonMessages);
given(httpClient.call(any(HttpRequest.class))).willReturn(Mono.just(successHttpResponse));
// when
final Flux<MessageRouterPublishResponse> responses = cut
- .put(mrRequestJson, singleJsonMessageBatch);
+ .put(jsonPublishRequest, jsonMessagesMaxBatch);
responses.then().block();
// then
verify(httpClient).call(httpRequestArgumentCaptor.capture());
final HttpRequest httpRequest = httpRequestArgumentCaptor.getValue();
final JsonArray elementsInRequest = extractNonEmptyJsonRequestBody(httpRequest);
- assertThat(elementsInRequest.size()).isEqualTo(3);
- assertThat(elementsInRequest.get(0).toString()).isEqualTo(threeJsonMessages.get(0));
- assertThat(elementsInRequest.get(1).toString()).isEqualTo(threeJsonMessages.get(1));
- assertThat(elementsInRequest.get(2).toString()).isEqualTo(threeJsonMessages.get(2));
+
+ assertThat(elementsInRequest.size()).describedAs("Http request batch size")
+ .isEqualTo(MAX_BATCH_SIZE);
+ assertListsContainSameElements(elementsInRequest, parsedThreeMessages);
+ }
+
+
+
+ @Test
+ void onPut_givenJsonMessagesWithPlainContentType_whenTheirAmountIsNotAboveMaxBatchSize_shouldSendSingleHttpRequest(){
+ // given
+ final List<String> threeJsonMessages = getAsMRJsonMessages(List.of("I", "like", "cookies"));
+ final List<JsonObject> parsedThreeMessages = getAsJsonObjects(threeJsonMessages);
+
+ final Flux<JsonElement> plainMessagesMaxBatch = plainBatch(threeJsonMessages);
+ given(httpClient.call(any(HttpRequest.class))).willReturn(Mono.just(successHttpResponse));
+
+ // when
+ final Flux<MessageRouterPublishResponse> responses = cut
+ .put(plainPublishRequest, plainMessagesMaxBatch);
+ responses.then().block();
+
+ // then
+ verify(httpClient).call(httpRequestArgumentCaptor.capture());
+ final HttpRequest httpRequest = httpRequestArgumentCaptor.getValue();
+ final List<JsonObject> elementsInRequest = extractNonEmptyPlainRequestBody(httpRequest)
+ .map(JsonElement::getAsJsonObject);
+
+
+ assertThat(elementsInRequest.size()).describedAs("Http request batch size")
+ .isEqualTo(MAX_BATCH_SIZE);
+ assertListsContainSameElements(elementsInRequest, parsedThreeMessages);
+ }
+
+ @Test
+ void onPut_givenPlainMessages_whenTheirAmountIsNotAboveMaxBatchSize_shouldSendSingleHttpRequest(){
+ // given
+ final List<String> threePlainMessages = List.of("I", "like", "cookies");
+ final List<JsonPrimitive> parsedThreeMessages = getAsJsonPrimitives(threePlainMessages);
+
+ final Flux<JsonElement> plainMessagesMaxBatch = plainBatch(threePlainMessages);
+ given(httpClient.call(any(HttpRequest.class))).willReturn(Mono.just(successHttpResponse));
+
+ // when
+ final Flux<MessageRouterPublishResponse> responses = cut
+ .put(plainPublishRequest, plainMessagesMaxBatch);
+ responses.then().block();
+
+ // then
+ verify(httpClient).call(httpRequestArgumentCaptor.capture());
+ final HttpRequest httpRequest = httpRequestArgumentCaptor.getValue();
+ final List<JsonPrimitive> elementsInRequest = extractNonEmptyPlainRequestBody(httpRequest)
+ .map(JsonElement::getAsJsonPrimitive);
+
+ assertThat(elementsInRequest.size()).describedAs("Http request batch size")
+ .isEqualTo(MAX_BATCH_SIZE);
+ assertListsContainSameElements(elementsInRequest, parsedThreeMessages);
}
@Test
void puttingElementsWithoutContentTypeSetShouldUseApplicationJson(){
// given
- final List<String> threeJsonMessages = getAsMRJsonMessages(Arrays.asList("I", "like", "cookies"));
+ final List<String> threeJsonMessages = getAsMRJsonMessages(List.of("I", "like", "cookies"));
final Flux<JsonObject> singleJsonMessageBatch = jsonBatch(threeJsonMessages);
given(httpClient.call(any(HttpRequest.class))).willReturn(Mono.just(successHttpResponse));
// when
final Flux<MessageRouterPublishResponse> responses = cut
- .put(mrRequestJson, singleJsonMessageBatch);
+ .put(jsonPublishRequest, singleJsonMessageBatch);
responses.then().block();
// then
@@ -139,124 +187,237 @@ class MessageRouterPublisherImplTest {
}
@Test
- void puttingLowNumberOfElementsShouldReturnSingleResponse() {
+ void onPut_givenJsonMessages_whenTheirAmountIsNotAboveMaxBatchSize_shouldReturnSingleResponse() {
// given
- final List<String> threeJsonMessages = getAsMRJsonMessages(Arrays.asList("I", "like", "cookies"));
- final Flux<JsonObject> singleJsonMessageBatch = jsonBatch(threeJsonMessages);
+ final List<String> threeJsonMessages = getAsMRJsonMessages(List.of("I", "like", "cookies"));
+ final List<JsonObject> parsedThreeMessages = getAsJsonObjects(threeJsonMessages);
+
+ final Flux<JsonObject> jsonMessagesMaxBatch = jsonBatch(threeJsonMessages);
given(httpClient.call(any(HttpRequest.class))).willReturn(Mono.just(successHttpResponse));
// when
final Flux<MessageRouterPublishResponse> responses = cut
- .put(mrRequestJson, singleJsonMessageBatch);
+ .put(jsonPublishRequest, jsonMessagesMaxBatch);
// then
- StepVerifier.create(responses)
- .consumeNextWith(response -> {
- assertThat(response.successful()).describedAs("successful").isTrue();
- assertThat(response.items()).containsExactly(
- getAsJsonObject(threeJsonMessages.get(0)),
- getAsJsonObject(threeJsonMessages.get(1)),
- getAsJsonObject(threeJsonMessages.get(2)));
- })
- .expectComplete()
- .verify(TIMEOUT);
+ verifySingleResponse(parsedThreeMessages, responses);
}
@Test
- void puttingHighNumberOfElementsShouldYieldMultipleHttpRequests() {
+ void onPut_givenJsonMessagesWithPlainContentType_whenTheirAmountIsNotAboveMaxBatchSize_shouldReturnSingleResponse() {
// given
- final List<String> threeJsonMessages = getAsMRJsonMessages(Arrays.asList("I", "like", "cookies"));
- final List<String> twoJsonMessages = getAsMRJsonMessages(Arrays.asList("and", "pierogi"));
- final Flux<JsonObject> doubleJsonMessageBatch = jsonBatch(concat(
- threeJsonMessages, twoJsonMessages));
+ final List<String> threeJsonMessages = getAsMRJsonMessages(List.of("I", "like", "cookies"));
+ final List<JsonObject> parsedThreeMessages = getAsJsonObjects(threeJsonMessages);
+ final Flux<JsonElement> plainMessagesMaxBatch = plainBatch(threeJsonMessages);
given(httpClient.call(any(HttpRequest.class))).willReturn(Mono.just(successHttpResponse));
// when
final Flux<MessageRouterPublishResponse> responses = cut
- .put(mrRequestJson, doubleJsonMessageBatch);
+ .put(plainPublishRequest, plainMessagesMaxBatch);
+
+ // then
+ verifySingleResponse(parsedThreeMessages, responses);
+ }
+
+ @Test
+ void onPut_givenPlainMessages_whenTheirAmountIsNotAboveMaxBatchSize_shouldReturnSingleResponse() {
+ // given
+ final List<String> threePlainMessages = List.of("I", "like", "cookies");
+ final List<JsonPrimitive> parsedThreeMessages = getAsJsonPrimitives(threePlainMessages);
+
+ final Flux<JsonElement> plainMessagesMaxBatch = plainBatch(threePlainMessages);
+ given(httpClient.call(any(HttpRequest.class))).willReturn(Mono.just(successHttpResponse));
+
+ // when
+ final Flux<MessageRouterPublishResponse> responses = cut
+ .put(plainPublishRequest, plainMessagesMaxBatch);
+
+ // then
+ verifySingleResponse(parsedThreeMessages, responses);
+ }
+
+ @Test
+ void onPut_givenJsonMessages_whenTheirAmountIsAboveMaxBatchSize_shouldYieldMultipleHttpRequests() {
+ // given
+ final List<String> threeJsonMessages = getAsMRJsonMessages(List.of("I", "like", "cookies"));
+ final List<String> twoJsonMessages = getAsMRJsonMessages(List.of("and", "pierogi"));
+
+ final List<JsonObject> parsedThreeMessages = getAsJsonObjects(threeJsonMessages);
+ final List<JsonObject> parsedTwoMessages = getAsJsonObjects(twoJsonMessages);
+
+ final Flux<JsonObject> doubleJsonMessageBatch = jsonBatch(
+ threeJsonMessages.appendAll(twoJsonMessages));
+ given(httpClient.call(any(HttpRequest.class))).willReturn(Mono.just(successHttpResponse));
+
+ // when
+ final Flux<MessageRouterPublishResponse> responses = cut
+ .put(jsonPublishRequest, doubleJsonMessageBatch);
// then
responses.then().block();
verify(httpClient, times(2)).call(httpRequestArgumentCaptor.capture());
- final List<HttpRequest> httpRequests = httpRequestArgumentCaptor.getAllValues();
+ final List<HttpRequest> httpRequests = List.ofAll(httpRequestArgumentCaptor.getAllValues());
assertThat(httpRequests.size()).describedAs("number of requests").isEqualTo(2);
final JsonArray firstRequest = extractNonEmptyJsonRequestBody(httpRequests.get(0));
- assertThat(firstRequest.size()).isEqualTo(3);
- assertThat(firstRequest.get(0).toString()).isEqualTo(threeJsonMessages.get(0));
- assertThat(firstRequest.get(1).toString()).isEqualTo(threeJsonMessages.get(1));
- assertThat(firstRequest.get(2).toString()).isEqualTo(threeJsonMessages.get(2));
+ assertThat(firstRequest.size()).describedAs("Http request first batch size")
+ .isEqualTo(MAX_BATCH_SIZE);
+ assertListsContainSameElements(firstRequest, parsedThreeMessages);
final JsonArray secondRequest = extractNonEmptyJsonRequestBody(httpRequests.get(1));
- assertThat(secondRequest.size()).isEqualTo(2);
- assertThat(secondRequest.get(0).toString()).isEqualTo(twoJsonMessages.get(0));
- assertThat(secondRequest.get(1).toString()).isEqualTo(twoJsonMessages.get(1));
+ assertThat(secondRequest.size()).describedAs("Http request second batch size")
+ .isEqualTo(MAX_BATCH_SIZE-1);
+ assertListsContainSameElements(secondRequest, parsedTwoMessages);
}
@Test
- void puttingHighNumberOfElementsShouldReturnMoreResponses() {
+ void onPut_givenJsonMessagesWithPlainContentType_whenTheirAmountIsAboveMaxBatchSize_shouldYieldMultipleHttpRequests() {
// given
- final List<String> threeJsonMessages = getAsMRJsonMessages(Arrays.asList("I", "like", "cookies"));
- final List<String> twoJsonMessages = getAsMRJsonMessages(Arrays.asList("and", "pierogi"));
- final Flux<JsonObject> doubleJsonMessageBatch = jsonBatch(concat(
- threeJsonMessages, twoJsonMessages));
+ final List<String> threeJsonMessages = getAsMRJsonMessages(List.of("I", "like", "cookies"));
+ final List<String> twoJsonMessages = getAsMRJsonMessages(List.of("and", "pierogi"));
+
+ final List<JsonObject> parsedThreeMessages = getAsJsonObjects(threeJsonMessages);
+ final List<JsonObject> parsedTwoMessages = getAsJsonObjects(twoJsonMessages);
+
+ final Flux<JsonElement> doublePlainMessageBatch = plainBatch(
+ threeJsonMessages.appendAll(twoJsonMessages));
given(httpClient.call(any(HttpRequest.class))).willReturn(Mono.just(successHttpResponse));
// when
final Flux<MessageRouterPublishResponse> responses = cut
- .put(mrRequestJson, doubleJsonMessageBatch);
+ .put(plainPublishRequest, doublePlainMessageBatch);
+ // then
+ responses.then().block();
+
+ verify(httpClient, times(2)).call(httpRequestArgumentCaptor.capture());
+ final List<HttpRequest> httpRequests = List.ofAll(httpRequestArgumentCaptor.getAllValues());
+ assertThat(httpRequests.size()).describedAs("number of requests").isEqualTo(2);
+
+ final List<JsonObject> firstRequest = extractNonEmptyPlainRequestBody(httpRequests.get(0))
+ .map(JsonElement::getAsJsonObject);
+ assertThat(firstRequest.size()).describedAs("Http request first batch size")
+ .isEqualTo(MAX_BATCH_SIZE);
+ assertListsContainSameElements(firstRequest, parsedThreeMessages);
+
+ final List<JsonObject> secondRequest = extractNonEmptyPlainRequestBody(httpRequests.get(1))
+ .map(JsonElement::getAsJsonObject);
+ assertThat(secondRequest.size()).describedAs("Http request second batch size")
+ .isEqualTo(MAX_BATCH_SIZE-1);
+ assertListsContainSameElements(secondRequest, parsedTwoMessages);
+ }
+
+ @Test
+ void onPut_givenPlainMessages_whenTheirAmountIsAboveMaxBatchSize_shouldYieldMultipleHttpRequests() {
+ // given
+ final List<String> threePlainMessages = List.of("I", "like", "cookies");
+ final List<String> twoPlainMessages = List.of("and", "pierogi");
+
+ final List<JsonPrimitive> parsedThreePlainMessages = getAsJsonPrimitives(threePlainMessages);
+ final List<JsonPrimitive> parsedTwoPlainMessages = getAsJsonPrimitives(twoPlainMessages);
+
+ final Flux<JsonElement> doublePlainMessageBatch = plainBatch(
+ threePlainMessages.appendAll(twoPlainMessages));
+ given(httpClient.call(any(HttpRequest.class))).willReturn(Mono.just(successHttpResponse));
+ // when
+ final Flux<MessageRouterPublishResponse> responses = cut
+ .put(plainPublishRequest, doublePlainMessageBatch);
// then
- StepVerifier.create(responses)
- .consumeNextWith(response -> {
- assertThat(response.successful()).describedAs("successful").isTrue();
- assertThat(response.items()).containsExactly(
- getAsJsonObject(threeJsonMessages.get(0)),
- getAsJsonObject(threeJsonMessages.get(1)),
- getAsJsonObject(threeJsonMessages.get(2)));
- })
- .consumeNextWith(response -> {
- assertThat(response.successful()).describedAs("successful").isTrue();
- assertThat(response.items()).containsExactly(
- getAsJsonObject(twoJsonMessages.get(0)),
- getAsJsonObject(twoJsonMessages.get(1)));
- })
- .expectComplete()
- .verify(TIMEOUT);
+ responses.then().block();
+
+ verify(httpClient, times(2)).call(httpRequestArgumentCaptor.capture());
+ final List<HttpRequest> httpRequests = List.ofAll(httpRequestArgumentCaptor.getAllValues());
+ assertThat(httpRequests.size()).describedAs("number of requests").isEqualTo(2);
+
+ final List<JsonPrimitive> firstRequest = extractNonEmptyPlainRequestBody(httpRequests.get(0))
+ .map(JsonElement::getAsJsonPrimitive);
+ assertThat(firstRequest.size()).describedAs("Http request first batch size")
+ .isEqualTo(MAX_BATCH_SIZE);
+ assertListsContainSameElements(firstRequest, parsedThreePlainMessages);
+
+ final List<JsonPrimitive> secondRequest = extractNonEmptyPlainRequestBody(httpRequests.get(1))
+ .map(JsonElement::getAsJsonPrimitive);
+ assertThat(secondRequest.size()).describedAs("Http request second batch size")
+ .isEqualTo(MAX_BATCH_SIZE-1);
+ assertListsContainSameElements(secondRequest, parsedTwoPlainMessages);
}
- private static List<String> getAsMRJsonMessages(List<String> plainTextMessages){
- return plainTextMessages.stream()
- .map(message -> String.format("{\"message\":\"%s\"}", message))
- .collect(Collectors.toList());
+ @Test
+ void onPut_givenJsonMessages_whenTheirAmountIsAboveMaxBatchSize_shouldReturnMoreResponses() {
+ // given
+ final List<String> threeJsonMessages = getAsMRJsonMessages(List.of("I", "like", "cookies"));
+ final List<String> twoJsonMessages = getAsMRJsonMessages(List.of("and", "pierogi"));
+
+ final List<JsonObject> parsedThreeMessages = getAsJsonObjects(threeJsonMessages);
+ final List<JsonObject> parsedTwoMessages = getAsJsonObjects(twoJsonMessages);
+
+ final Flux<JsonObject> doubleJsonMessageBatch = jsonBatch(threeJsonMessages.appendAll(twoJsonMessages));
+ given(httpClient.call(any(HttpRequest.class))).willReturn(Mono.just(successHttpResponse));
+
+ // when
+ final Flux<MessageRouterPublishResponse> responses = cut
+ .put(jsonPublishRequest, doubleJsonMessageBatch);
+
+ // then
+ verifyDoubleResponse(parsedThreeMessages, parsedTwoMessages, responses);
+ }
+
+ @Test
+ void onPut_givenJsonMessagesWithPlainContentType_whenTheirAmountIsAboveMaxBatchSize_shouldReturnMoreResponses() {
+ // given
+ final List<String> threeJsonMessages = getAsMRJsonMessages(List.of("I", "like", "cookies"));
+ final List<String> twoJsonMessages = getAsMRJsonMessages(List.of("and", "pierogi"));
+
+ final List<JsonObject> parsedThreeMessages = getAsJsonObjects(threeJsonMessages);
+ final List<JsonObject> parsedTwoMessages = getAsJsonObjects(twoJsonMessages);
+
+ final Flux<JsonElement> doubleJsonMessageBatch = plainBatch(threeJsonMessages.appendAll(twoJsonMessages));
+ given(httpClient.call(any(HttpRequest.class))).willReturn(Mono.just(successHttpResponse));
+
+ // when
+ final Flux<MessageRouterPublishResponse> responses = cut
+ .put(plainPublishRequest, doubleJsonMessageBatch);
+
+ // then
+ verifyDoubleResponse(parsedThreeMessages, parsedTwoMessages, responses);
}
+ @Test
+ void onPut_givenPlainMessages_whenTheirAmountIsAboveMaxBatchSize_shouldReturnMoreResponses() {
+ // given
+ final List<String> threePlainMessages = List.of("I", "like", "cookies");
+ final List<String> twoPlainMessages = List.of("and", "pierogi");
+
+ final List<JsonPrimitive> parsedThreeMessages = getAsJsonPrimitives(threePlainMessages);
+ final List<JsonPrimitive> parsedTwoMessages = getAsJsonPrimitives(twoPlainMessages);
- private static Flux<JsonObject> jsonBatch(List<String> messages){
- return Flux.fromIterable(messages).map(parser::parse).map(JsonElement::getAsJsonObject);
+ final Flux<JsonElement> doublePlainMessageBatch = plainBatch(
+ threePlainMessages.appendAll(twoPlainMessages));
+ given(httpClient.call(any(HttpRequest.class))).willReturn(Mono.just(successHttpResponse));
+
+ // when
+ final Flux<MessageRouterPublishResponse> responses = cut
+ .put(plainPublishRequest, doublePlainMessageBatch);
+
+ // then
+ verifyDoubleResponse(parsedThreeMessages, parsedTwoMessages, responses);
}
- private static List<String> concat(List<String> firstList, List<String> secondList){
- return Stream.concat(firstList.stream(), secondList.stream()).collect(Collectors.toList());
+ private static List<String> getAsMRJsonMessages(List<String> plainTextMessages){
+ return plainTextMessages
+ .map(message -> String.format("{\"message\":\"%s\"}", message));
}
private static HttpResponse createHttpResponse(String statusReason, int statusCode){
return ImmutableHttpResponse.builder()
.statusCode(statusCode)
- .url(sinkDefinition.topicUrl())
+ .url(TOPIC_URL)
.statusReason(statusReason)
.rawBody("[]".getBytes())
.build();
}
- private static MessageRouterPublishRequest createMRRPublishRequest(){
- return ImmutableMessageRouterPublishRequest
- .builder()
- .sinkDefinition(sinkDefinition)
- .build();
- }
-
private String collectNonEmptyRequestBody(HttpRequest httpRequest){
final String body = Flux.from(httpRequest.body().contents())
.collect(ByteBufAllocator.DEFAULT::compositeBuffer,
@@ -272,7 +433,68 @@ class MessageRouterPublisherImplTest {
return new Gson().fromJson(collectNonEmptyRequestBody(httpRequest), JsonArray.class);
}
- private JsonObject getAsJsonObject(String item){
- return new Gson().fromJson(item, JsonObject.class);
+ private List<JsonElement> extractNonEmptyPlainRequestBody(HttpRequest httpRequest){
+ return getAsJsonElements(
+ List.of(
+ collectNonEmptyRequestBody(httpRequest)
+ .split("\n")
+ )
+ );
+ }
+
+ private void assertListsContainSameElements(List<? extends JsonElement> actualMessages,
+ List<? extends JsonElement> expectedMessages){
+ for (int i = 0; i < actualMessages.size(); i++) {
+ assertThat(actualMessages.get(i))
+ .describedAs(String.format("Http request element at position %d", i))
+ .isEqualTo(expectedMessages.get(i));
+ }
+ }
+
+ private void assertListsContainSameElements(JsonArray actualMessages,
+ List<? extends JsonElement> expectedMessages){
+ assertThat(actualMessages.size()).describedAs("Http request batch size")
+ .isEqualTo(expectedMessages.size());
+
+ for (int i = 0; i < actualMessages.size(); i++) {
+ assertThat(actualMessages.get(i))
+ .describedAs(String.format("Http request element at position %d", i))
+ .isEqualTo(expectedMessages.get(i));
+ }
+ }
+
+ private void verifySingleResponse(List<? extends JsonElement> threeMessages,
+ Flux<MessageRouterPublishResponse> responses) {
+ StepVerifier.create(responses)
+ .consumeNextWith(response -> {
+ assertThat(response.successful()).describedAs("successful").isTrue();
+ assertThat(response.items()).containsExactly(
+ threeMessages.get(0),
+ threeMessages.get(1),
+ threeMessages.get(2));
+ })
+ .expectComplete()
+ .verify(TIMEOUT);
+ }
+
+ private void verifyDoubleResponse(List<? extends JsonElement> threeMessages,
+ List<? extends JsonElement> twoMessages, Flux<MessageRouterPublishResponse> responses) {
+
+ StepVerifier.create(responses)
+ .consumeNextWith(response -> {
+ assertThat(response.successful()).describedAs("successful").isTrue();
+ assertThat(response.items()).containsExactly(
+ threeMessages.get(0),
+ threeMessages.get(1),
+ threeMessages.get(2));
+ })
+ .consumeNextWith(response -> {
+ assertThat(response.successful()).describedAs("successful").isTrue();
+ assertThat(response.items()).containsExactly(
+ twoMessages.get(0),
+ twoMessages.get(1));
+ })
+ .expectComplete()
+ .verify(TIMEOUT);
}
} \ No newline at end of file