diff options
author | Izabela Zawadzka <izabela.zawadzka@nokia.com> | 2019-06-14 15:26:29 +0200 |
---|---|---|
committer | Izabela Zawadzka <izabela.zawadzka@nokia.com> | 2019-06-26 09:23:52 +0200 |
commit | 97d60b4653c265530485209e8f61f73137d521b0 (patch) | |
tree | 21bbbfe4d077df3062d0f9b8176b94a0941cea4f /rest-services | |
parent | 3ec6e99c20075386b639f33f27bcd3a3b3e5706d (diff) |
Add text/plain content type handling in Publisher
Change-Id: I51e17d64f813e16b81385abb8aa862ee1f927d35
Signed-off-by: Izabela Zawadzka <izabela.zawadzka@nokia.com>
Issue-ID: DCAEGEN2-1630
Diffstat (limited to 'rest-services')
7 files changed, 522 insertions, 125 deletions
diff --git a/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/ContentType.java b/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/ContentType.java new file mode 100644 index 00000000..80a28d6c --- /dev/null +++ b/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/ContentType.java @@ -0,0 +1,40 @@ +/* + * ============LICENSE_START==================================== + * DCAEGEN2-SERVICES-SDK + * ========================================================= + * Copyright (C) 2019 Nokia. All rights reserved. + * ========================================================= + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END===================================== + */ + +package org.onap.dcaegen2.services.sdk.rest.services.dmaap.client; + +import io.netty.handler.codec.http.HttpHeaderValues; +import io.netty.util.AsciiString; + +public enum ContentType { + APPLICATION_JSON(HttpHeaderValues.APPLICATION_JSON), + TEXT_PLAIN(HttpHeaderValues.TEXT_PLAIN); + + private AsciiString contentType; + + ContentType(AsciiString contentType) { + this.contentType = contentType; + } + + @Override + public String toString(){ + return contentType.toString(); + } +} diff --git a/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/impl/MessageRouterPublisherImpl.java b/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/impl/MessageRouterPublisherImpl.java index aa88b9ee..191ec64f 100644 --- a/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/impl/MessageRouterPublisherImpl.java +++ b/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/impl/MessageRouterPublisherImpl.java @@ -27,6 +27,7 @@ import com.google.gson.JsonElement; import io.vavr.collection.HashMap; import io.vavr.collection.List; import java.time.Duration; +import java.util.stream.Collectors; import org.jetbrains.annotations.NotNull; import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.HttpHeaders; import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.HttpMethod; @@ -35,6 +36,7 @@ import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.HttpResponse; import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.ImmutableHttpRequest; import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.RequestBody; import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.RxHttpClient; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.ContentType; import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.MessageRouterPublisher; import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.ImmutableMessageRouterPublishResponse; import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishRequest; @@ -74,14 +76,20 @@ public class MessageRouterPublisherImpl implements MessageRouterPublisher { List<JsonElement> batch) { LOGGER.debug("Sending a batch of {} items to DMaaP MR", batch.size()); LOGGER.trace("The items to be sent: {}", batch); - return httpClient.call(buildHttpRequest(request, asJsonBody(batch))) + return httpClient.call(buildHttpRequest(request, createBody(batch, request.contentType()))) .map(httpResponse -> buildResponse(httpResponse, batch)); } - private @NotNull RequestBody asJsonBody(List<? extends JsonElement> subItems) { - final JsonArray elements = new JsonArray(subItems.size()); - subItems.forEach(elements::add); - return RequestBody.fromJson(elements); + private @NotNull RequestBody createBody(List<? extends JsonElement> subItems, ContentType contentType) { + if(contentType == ContentType.APPLICATION_JSON) { + final JsonArray elements = new JsonArray(subItems.size()); + subItems.forEach(elements::add); + return RequestBody.fromJson(elements); + }else if(contentType == ContentType.TEXT_PLAIN){ + String messages = subItems.map(JsonElement::toString) + .collect(Collectors.joining("\n")); + return RequestBody.fromString(messages); + }else throw new IllegalArgumentException("Unsupported content type: " + contentType); } private @NotNull HttpRequest buildHttpRequest(MessageRouterPublishRequest request, RequestBody body) { @@ -89,7 +97,7 @@ public class MessageRouterPublisherImpl implements MessageRouterPublisher { .method(HttpMethod.POST) .url(request.sinkDefinition().topicUrl()) .diagnosticContext(request.diagnosticContext().withNewInvocationId()) - .customHeaders(HashMap.of(HttpHeaders.CONTENT_TYPE, request.contentType())) + .customHeaders(HashMap.of(HttpHeaders.CONTENT_TYPE, request.contentType().toString())) .body(body) .build(); } @@ -98,6 +106,7 @@ public class MessageRouterPublisherImpl implements MessageRouterPublisher { HttpResponse httpResponse, List<JsonElement> batch) { final ImmutableMessageRouterPublishResponse.Builder builder = ImmutableMessageRouterPublishResponse.builder(); + return httpResponse.successful() ? builder.items(batch).build() : builder.failReason(extractFailReason(httpResponse)).build(); diff --git a/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/model/MessageRouterPublishRequest.java b/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/model/MessageRouterPublishRequest.java index 77f92e77..29904138 100644 --- a/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/model/MessageRouterPublishRequest.java +++ b/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/model/MessageRouterPublishRequest.java @@ -23,6 +23,7 @@ package org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model; import org.immutables.value.Value; import org.onap.dcaegen2.services.sdk.model.streams.dmaap.MessageRouterSink; import org.onap.dcaegen2.services.sdk.rest.services.annotations.ExperimentalApi; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.ContentType; /** * @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a> @@ -34,7 +35,7 @@ public interface MessageRouterPublishRequest extends DmaapRequest { MessageRouterSink sinkDefinition(); @Value.Default - default String contentType() { - return "application/json"; + default ContentType contentType() { + return ContentType.APPLICATION_JSON; } } diff --git a/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterTestsUtils.java b/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/MessageRouterTestsUtils.java index 8695b727..52946f56 100644 --- a/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterTestsUtils.java +++ b/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/MessageRouterTestsUtils.java @@ -18,7 +18,7 @@ * ============LICENSE_END===================================== */ -package org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api; +package org.onap.dcaegen2.services.sdk.rest.services.dmaap.client; import com.google.gson.Gson; import com.google.gson.JsonElement; @@ -29,6 +29,8 @@ import io.vavr.collection.List; import org.onap.dcaegen2.services.sdk.model.streams.dmaap.ImmutableMessageRouterSink; import org.onap.dcaegen2.services.sdk.model.streams.dmaap.ImmutableMessageRouterSource; import org.onap.dcaegen2.services.sdk.model.streams.dmaap.MessageRouterSink; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.MessageRouterPublisher; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.MessageRouterSubscriber; import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.ImmutableMessageRouterPublishRequest; import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.ImmutableMessageRouterPublishResponse; import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.ImmutableMessageRouterSubscribeRequest; @@ -39,11 +41,16 @@ import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRo import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterSubscribeResponse; import reactor.core.publisher.Flux; -final class MessageRouterTestsUtils { + +public final class MessageRouterTestsUtils { private static final JsonParser parser = new JsonParser(); private MessageRouterTestsUtils() {} - static MessageRouterPublishRequest createPublishRequest(String topicUrl){ + public static MessageRouterPublishRequest createPublishRequest(String topicUrl){ + return createPublishRequest(topicUrl, ContentType.APPLICATION_JSON); + } + + public static MessageRouterPublishRequest createPublishRequest(String topicUrl, ContentType contentType){ MessageRouterSink sinkDefinition = ImmutableMessageRouterSink.builder() .name("the topic") .topicUrl(topicUrl) @@ -51,10 +58,11 @@ final class MessageRouterTestsUtils { return ImmutableMessageRouterPublishRequest.builder() .sinkDefinition(sinkDefinition) + .contentType(contentType) .build(); } - static MessageRouterSubscribeRequest createMRSubscribeRequest(String topicUrl, + public static MessageRouterSubscribeRequest createMRSubscribeRequest(String topicUrl, String consumerGroup, String consumerId) { ImmutableMessageRouterSource sourceDefinition = ImmutableMessageRouterSource.builder() .name("the topic") @@ -69,51 +77,59 @@ final class MessageRouterTestsUtils { .build(); } - static List<JsonElement> getAsJsonElements(List<String> messages){ + public static List<JsonElement> getAsJsonElements(List<String> messages){ return messages.map(parser::parse); } - static JsonObject getAsJsonObject(String item){ + public static List<JsonObject> getAsJsonObjects(List<String> messages){ + return getAsJsonElements(messages).map(JsonElement::getAsJsonObject); + } + + public static List<JsonPrimitive> getAsJsonPrimitives(List<String> messages){ + return getAsJsonElements(messages).map(JsonElement::getAsJsonPrimitive); + } + + public static JsonObject getAsJsonObject(String item){ return new Gson().fromJson(item, JsonObject.class); } - static Flux<JsonObject> jsonBatch(List<String> messages){ - return Flux.fromIterable(messages).map(parser::parse).map(JsonElement::getAsJsonObject); + public static Flux<JsonElement> plainBatch(List<String> messages){ + return Flux.fromIterable(getAsJsonElements(messages)); } - static Flux<JsonPrimitive> plainBatch(List<String> messages){ - return Flux.fromIterable(messages).map(JsonPrimitive::new); + public static Flux<JsonObject> jsonBatch(List<String> messages){ + return Flux.fromIterable(getAsJsonObjects(messages)); } - static MessageRouterSubscribeResponse errorSubscribeResponse(String failReasonFormat, Object... formatArgs){ + public static MessageRouterSubscribeResponse errorSubscribeResponse(String failReasonFormat, Object... formatArgs){ return ImmutableMessageRouterSubscribeResponse .builder() .failReason(String.format(failReasonFormat, formatArgs)) .build(); } - static MessageRouterSubscribeResponse successSubscribeResponse(List<JsonElement> items){ + public static MessageRouterSubscribeResponse successSubscribeResponse(List<JsonElement> items){ return ImmutableMessageRouterSubscribeResponse .builder() .items(items) .build(); } - static MessageRouterPublishResponse errorPublishResponse(String failReasonFormat, Object... formatArgs){ + public static MessageRouterPublishResponse errorPublishResponse(String failReasonFormat, Object... formatArgs){ return ImmutableMessageRouterPublishResponse .builder() .failReason(String.format(failReasonFormat, formatArgs)) .build(); } - static MessageRouterPublishResponse successPublishResponse(List<JsonElement> items){ + public static MessageRouterPublishResponse successPublishResponse(List<JsonElement> items){ return ImmutableMessageRouterPublishResponse .builder() .items(items) .build(); } - static void registerTopic(MessageRouterPublisher publisher, MessageRouterPublishRequest publishRequest, + public static void registerTopic(MessageRouterPublisher publisher, MessageRouterPublishRequest publishRequest, MessageRouterSubscriber subscriber, MessageRouterSubscribeRequest subscribeRequest) { final List<String> sampleJsonMessages = List.of("{\"message\":\"message1\"}", "{\"differentMessage\":\"message2\"}"); diff --git a/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterPublisherIT.java b/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterPublisherIT.java index c746bfec..0afea748 100644 --- a/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterPublisherIT.java +++ b/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterPublisherIT.java @@ -20,15 +20,15 @@ package org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api; -import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.MessageRouterTestsUtils.*; +import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.MessageRouterTestsUtils.*; import com.google.gson.JsonElement; import com.google.gson.JsonObject; -import com.google.gson.JsonPrimitive; import io.vavr.collection.List; import java.time.Duration; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.ContentType; import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishRequest; import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishResponse; import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterSubscribeRequest; @@ -95,7 +95,7 @@ class MessageRouterPublisherIT { //given final String topic = "TOPIC2"; final List<String> threePlainTextMessages = List.of("I", "like", "pizza"); - final Flux<JsonPrimitive> messageBatch = plainBatch(threePlainTextMessages); + final Flux<JsonElement> messageBatch = plainBatch(threePlainTextMessages); final MessageRouterPublishRequest mrRequest = createPublishRequest(String.format("%s/%s", EVENTS_PATH, topic)); final MessageRouterPublishResponse expectedResponse = errorPublishResponse( DMAAP_400_ERROR_RESPONSE_FORMAT, topic); @@ -112,6 +112,7 @@ class MessageRouterPublisherIT { @Test void publisher_shouldSuccessfullyPublishSingleMessage(){ + //given final String topic = "TOPIC3"; final String topicUrl = String.format("%s/%s", EVENTS_PATH, topic); final List<String> singleJsonMessage = List.of("{\"message\":\"message1\"}"); @@ -136,7 +137,7 @@ class MessageRouterPublisherIT { @Test void publisher_shouldSuccessfullyPublishMultipleMessages(){ - final String topic = "TOPIC4"; + final String topic = "TOPIC5"; final String topicUrl = String.format("%s/%s", EVENTS_PATH, topic); final List<String> singleJsonMessage = List.of("{\"message\":\"message1\"}", "{\"differentMessage\":\"message2\"}"); @@ -158,4 +159,112 @@ class MessageRouterPublisherIT { .expectComplete() .verify(); } + + @Test + void publisher_shouldSuccessfullyPublishSingleJsonMessageWithPlainContentType(){ + //given + final String topic = "TOPIC6"; + final String topicUrl = String.format("%s/%s", EVENTS_PATH, topic); + + final List<String> singleJsonMessage = List.of("{\"message\":\"message1\"}"); + final List<JsonElement> expectedItems = getAsJsonElements(singleJsonMessage); + final Flux<JsonElement> plainBatch = plainBatch(singleJsonMessage); + + final MessageRouterPublishRequest publishRequest = createPublishRequest(topicUrl, ContentType.TEXT_PLAIN); + final MessageRouterSubscribeRequest subscribeRequest = createMRSubscribeRequest(topicUrl, "sampleGroup", "sampleId"); + final MessageRouterSubscribeResponse expectedResponse = successSubscribeResponse(expectedItems); + + //when + registerTopic(publisher, publishRequest, subscriber, subscribeRequest); + Mono<MessageRouterSubscribeResponse> response = publisher + .put(publishRequest, plainBatch) + .then(subscriber.get(subscribeRequest)); + + //then + StepVerifier.create(response) + .expectNext(expectedResponse) + .expectComplete() + .verify(); + } + + @Test + void publisher_shouldSuccessfullyPublishMultipleJsonMessagesWithPlainContentType(){ + //given + final String topic = "TOPIC7"; + final String topicUrl = String.format("%s/%s", EVENTS_PATH, topic); + + final List<String> twoJsonMessage = List.of("{\"message\":\"message1\"}", "{\"message2\":\"message2\"}"); + final List<JsonElement> expectedItems = getAsJsonElements(twoJsonMessage); + final Flux<JsonElement> plainBatch = plainBatch(twoJsonMessage); + + final MessageRouterPublishRequest publishRequest = createPublishRequest(topicUrl, ContentType.TEXT_PLAIN); + final MessageRouterSubscribeRequest subscribeRequest = createMRSubscribeRequest(topicUrl, "sampleGroup", "sampleId"); + final MessageRouterSubscribeResponse expectedResponse = successSubscribeResponse(expectedItems); + + //when + registerTopic(publisher, publishRequest, subscriber, subscribeRequest); + Mono<MessageRouterSubscribeResponse> response = publisher + .put(publishRequest, plainBatch) + .then(subscriber.get(subscribeRequest)); + + //then + StepVerifier.create(response) + .expectNext(expectedResponse) + .expectComplete() + .verify(); + } + + @Test + void publisher_shouldSuccessfullyPublishSinglePlainMessageWithPlainContentType(){ + //given + final String topic = "TOPIC8"; + final String topicUrl = String.format("%s/%s", EVENTS_PATH, topic); + + final List<String> singlePlainMessage = List.of("kebab"); + final List<JsonElement> expectedItems = getAsJsonElements(singlePlainMessage); + final Flux<JsonElement> plainBatch = plainBatch(singlePlainMessage); + + final MessageRouterPublishRequest publishRequest = createPublishRequest(topicUrl, ContentType.TEXT_PLAIN); + final MessageRouterSubscribeRequest subscribeRequest = createMRSubscribeRequest(topicUrl, "sampleGroup", "sampleId"); + final MessageRouterSubscribeResponse expectedResponse = successSubscribeResponse(expectedItems); + + //when + registerTopic(publisher, publishRequest, subscriber, subscribeRequest); + Mono<MessageRouterSubscribeResponse> response = publisher + .put(publishRequest, plainBatch) + .then(subscriber.get(subscribeRequest)); + + //then + StepVerifier.create(response) + .expectNext(expectedResponse) + .expectComplete() + .verify(); + } + + @Test + void publisher_shouldSuccessfullyPublishMultiplePlainMessagesWithPlainContentType(){ + //given + final String topic = "TOPIC9"; + final String topicUrl = String.format("%s/%s", EVENTS_PATH, topic); + + final List<String> singlePlainMessage = List.of("I", "like", "pizza"); + final List<JsonElement> expectedItems = getAsJsonElements(singlePlainMessage); + final Flux<JsonElement> plainBatch = plainBatch(singlePlainMessage); + + final MessageRouterPublishRequest publishRequest = createPublishRequest(topicUrl, ContentType.TEXT_PLAIN); + final MessageRouterSubscribeRequest subscribeRequest = createMRSubscribeRequest(topicUrl, "sampleGroup", "sampleId"); + final MessageRouterSubscribeResponse expectedResponse = successSubscribeResponse(expectedItems); + + //when + registerTopic(publisher, publishRequest, subscriber, subscribeRequest); + Mono<MessageRouterSubscribeResponse> response = publisher + .put(publishRequest, plainBatch) + .then(subscriber.get(subscribeRequest)); + + //then + StepVerifier.create(response) + .expectNext(expectedResponse) + .expectComplete() + .verify(); + } } diff --git a/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterSubscriberIT.java b/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterSubscriberIT.java index c2e96b58..a758b2de 100644 --- a/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterSubscriberIT.java +++ b/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterSubscriberIT.java @@ -20,7 +20,7 @@ package org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api; -import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.MessageRouterTestsUtils.*; +import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.MessageRouterTestsUtils.*; import com.google.gson.JsonElement; import com.google.gson.JsonObject; diff --git a/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/impl/MessageRouterPublisherImplTest.java b/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/impl/MessageRouterPublisherImplTest.java index f44e86a5..38659acd 100644 --- a/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/impl/MessageRouterPublisherImplTest.java +++ b/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/impl/MessageRouterPublisherImplTest.java @@ -26,32 +26,28 @@ import static org.mockito.BDDMockito.given; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; +import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.MessageRouterTestsUtils.*; import com.google.gson.JsonArray; import com.google.gson.JsonElement; import com.google.gson.JsonObject; -import com.google.gson.JsonParser; import com.google.gson.Gson; +import com.google.gson.JsonPrimitive; import io.netty.buffer.ByteBufAllocator; import io.netty.handler.codec.http.HttpHeaderValues; +import io.vavr.collection.List; import java.nio.charset.StandardCharsets; import java.time.Duration; -import java.util.Arrays; -import java.util.List; -import java.util.stream.Collectors; -import java.util.stream.Stream; import org.junit.jupiter.api.Test; import org.mockito.ArgumentCaptor; -import org.onap.dcaegen2.services.sdk.model.streams.dmaap.ImmutableMessageRouterSink; -import org.onap.dcaegen2.services.sdk.model.streams.dmaap.MessageRouterSink; import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.HttpHeaders; import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.HttpMethod; import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.HttpRequest; import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.HttpResponse; import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.ImmutableHttpResponse; import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.RxHttpClient; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.ContentType; import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.MessageRouterPublisher; -import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.ImmutableMessageRouterPublishRequest; import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishRequest; import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishResponse; import reactor.core.publisher.Flux; @@ -64,71 +60,123 @@ import reactor.test.StepVerifier; */ class MessageRouterPublisherImplTest { private static final Duration TIMEOUT = Duration.ofSeconds(5); - private static final JsonParser parser = new JsonParser(); - private static final MessageRouterSink sinkDefinition = ImmutableMessageRouterSink.builder() - .name("the topic") - .topicUrl("https://dmaap-mr/TOPIC") - .build(); + private static final String TOPIC_URL = "https://dmaap-mr/TOPIC"; + private static final int MAX_BATCH_SIZE = 3; private final RxHttpClient httpClient = mock(RxHttpClient.class); - private final MessageRouterPublisher cut = new MessageRouterPublisherImpl(httpClient, 3, Duration.ofMinutes(1)); + private final MessageRouterPublisher cut = new MessageRouterPublisherImpl(httpClient, MAX_BATCH_SIZE, Duration.ofMinutes(1)); private final ArgumentCaptor<HttpRequest> httpRequestArgumentCaptor = ArgumentCaptor.forClass(HttpRequest.class); - private final MessageRouterPublishRequest mrRequestTextPlain = createMRRPublishRequest(); - private final MessageRouterPublishRequest mrRequestJson = createMRRPublishRequest(); + private final MessageRouterPublishRequest plainPublishRequest = createPublishRequest(TOPIC_URL, ContentType.TEXT_PLAIN); + private final MessageRouterPublishRequest jsonPublishRequest = createPublishRequest(TOPIC_URL); private final HttpResponse successHttpResponse = createHttpResponse("OK", 200); @Test void puttingElementsShouldYieldNonChunkedHttpRequest() { // given - final List<String> threeJsonMessages = getAsMRJsonMessages(Arrays.asList("I", "like", "cookies")); + final List<String> threeJsonMessages = getAsMRJsonMessages(List.of("I", "like", "cookies")); final Flux<JsonObject> singleJsonMessageBatch = jsonBatch(threeJsonMessages); given(httpClient.call(any(HttpRequest.class))).willReturn(Mono.just(successHttpResponse)); // when final Flux<MessageRouterPublishResponse> responses = cut - .put(mrRequestTextPlain, singleJsonMessageBatch); + .put(jsonPublishRequest, singleJsonMessageBatch); responses.then().block(); // then verify(httpClient).call(httpRequestArgumentCaptor.capture()); final HttpRequest httpRequest = httpRequestArgumentCaptor.getValue(); assertThat(httpRequest.method()).isEqualTo(HttpMethod.POST); - assertThat(httpRequest.url()).isEqualTo(sinkDefinition.topicUrl()); + assertThat(httpRequest.url()).isEqualTo(TOPIC_URL); assertThat(httpRequest.body()).isNotNull(); assertThat(httpRequest.body().length()).isGreaterThan(0); } @Test - void puttingLowNumberOfElementsShouldYieldSingleHttpRequest() { + void onPut_givenJsonMessages_whenTheirAmountIsNotAboveMaxBatchSize_shouldSendSingleHttpRequest(){ // given - final List<String> threeJsonMessages = getAsMRJsonMessages(Arrays.asList("I", "like", "cookies")); - final Flux<JsonObject> singleJsonMessageBatch = jsonBatch(threeJsonMessages); + final List<String> threeJsonMessages = getAsMRJsonMessages(List.of("I", "like", "cookies")); + final List<JsonObject> parsedThreeMessages = getAsJsonObjects(threeJsonMessages); + + final Flux<JsonObject> jsonMessagesMaxBatch = jsonBatch(threeJsonMessages); given(httpClient.call(any(HttpRequest.class))).willReturn(Mono.just(successHttpResponse)); // when final Flux<MessageRouterPublishResponse> responses = cut - .put(mrRequestJson, singleJsonMessageBatch); + .put(jsonPublishRequest, jsonMessagesMaxBatch); responses.then().block(); // then verify(httpClient).call(httpRequestArgumentCaptor.capture()); final HttpRequest httpRequest = httpRequestArgumentCaptor.getValue(); final JsonArray elementsInRequest = extractNonEmptyJsonRequestBody(httpRequest); - assertThat(elementsInRequest.size()).isEqualTo(3); - assertThat(elementsInRequest.get(0).toString()).isEqualTo(threeJsonMessages.get(0)); - assertThat(elementsInRequest.get(1).toString()).isEqualTo(threeJsonMessages.get(1)); - assertThat(elementsInRequest.get(2).toString()).isEqualTo(threeJsonMessages.get(2)); + + assertThat(elementsInRequest.size()).describedAs("Http request batch size") + .isEqualTo(MAX_BATCH_SIZE); + assertListsContainSameElements(elementsInRequest, parsedThreeMessages); + } + + + + @Test + void onPut_givenJsonMessagesWithPlainContentType_whenTheirAmountIsNotAboveMaxBatchSize_shouldSendSingleHttpRequest(){ + // given + final List<String> threeJsonMessages = getAsMRJsonMessages(List.of("I", "like", "cookies")); + final List<JsonObject> parsedThreeMessages = getAsJsonObjects(threeJsonMessages); + + final Flux<JsonElement> plainMessagesMaxBatch = plainBatch(threeJsonMessages); + given(httpClient.call(any(HttpRequest.class))).willReturn(Mono.just(successHttpResponse)); + + // when + final Flux<MessageRouterPublishResponse> responses = cut + .put(plainPublishRequest, plainMessagesMaxBatch); + responses.then().block(); + + // then + verify(httpClient).call(httpRequestArgumentCaptor.capture()); + final HttpRequest httpRequest = httpRequestArgumentCaptor.getValue(); + final List<JsonObject> elementsInRequest = extractNonEmptyPlainRequestBody(httpRequest) + .map(JsonElement::getAsJsonObject); + + + assertThat(elementsInRequest.size()).describedAs("Http request batch size") + .isEqualTo(MAX_BATCH_SIZE); + assertListsContainSameElements(elementsInRequest, parsedThreeMessages); + } + + @Test + void onPut_givenPlainMessages_whenTheirAmountIsNotAboveMaxBatchSize_shouldSendSingleHttpRequest(){ + // given + final List<String> threePlainMessages = List.of("I", "like", "cookies"); + final List<JsonPrimitive> parsedThreeMessages = getAsJsonPrimitives(threePlainMessages); + + final Flux<JsonElement> plainMessagesMaxBatch = plainBatch(threePlainMessages); + given(httpClient.call(any(HttpRequest.class))).willReturn(Mono.just(successHttpResponse)); + + // when + final Flux<MessageRouterPublishResponse> responses = cut + .put(plainPublishRequest, plainMessagesMaxBatch); + responses.then().block(); + + // then + verify(httpClient).call(httpRequestArgumentCaptor.capture()); + final HttpRequest httpRequest = httpRequestArgumentCaptor.getValue(); + final List<JsonPrimitive> elementsInRequest = extractNonEmptyPlainRequestBody(httpRequest) + .map(JsonElement::getAsJsonPrimitive); + + assertThat(elementsInRequest.size()).describedAs("Http request batch size") + .isEqualTo(MAX_BATCH_SIZE); + assertListsContainSameElements(elementsInRequest, parsedThreeMessages); } @Test void puttingElementsWithoutContentTypeSetShouldUseApplicationJson(){ // given - final List<String> threeJsonMessages = getAsMRJsonMessages(Arrays.asList("I", "like", "cookies")); + final List<String> threeJsonMessages = getAsMRJsonMessages(List.of("I", "like", "cookies")); final Flux<JsonObject> singleJsonMessageBatch = jsonBatch(threeJsonMessages); given(httpClient.call(any(HttpRequest.class))).willReturn(Mono.just(successHttpResponse)); // when final Flux<MessageRouterPublishResponse> responses = cut - .put(mrRequestJson, singleJsonMessageBatch); + .put(jsonPublishRequest, singleJsonMessageBatch); responses.then().block(); // then @@ -139,124 +187,237 @@ class MessageRouterPublisherImplTest { } @Test - void puttingLowNumberOfElementsShouldReturnSingleResponse() { + void onPut_givenJsonMessages_whenTheirAmountIsNotAboveMaxBatchSize_shouldReturnSingleResponse() { // given - final List<String> threeJsonMessages = getAsMRJsonMessages(Arrays.asList("I", "like", "cookies")); - final Flux<JsonObject> singleJsonMessageBatch = jsonBatch(threeJsonMessages); + final List<String> threeJsonMessages = getAsMRJsonMessages(List.of("I", "like", "cookies")); + final List<JsonObject> parsedThreeMessages = getAsJsonObjects(threeJsonMessages); + + final Flux<JsonObject> jsonMessagesMaxBatch = jsonBatch(threeJsonMessages); given(httpClient.call(any(HttpRequest.class))).willReturn(Mono.just(successHttpResponse)); // when final Flux<MessageRouterPublishResponse> responses = cut - .put(mrRequestJson, singleJsonMessageBatch); + .put(jsonPublishRequest, jsonMessagesMaxBatch); // then - StepVerifier.create(responses) - .consumeNextWith(response -> { - assertThat(response.successful()).describedAs("successful").isTrue(); - assertThat(response.items()).containsExactly( - getAsJsonObject(threeJsonMessages.get(0)), - getAsJsonObject(threeJsonMessages.get(1)), - getAsJsonObject(threeJsonMessages.get(2))); - }) - .expectComplete() - .verify(TIMEOUT); + verifySingleResponse(parsedThreeMessages, responses); } @Test - void puttingHighNumberOfElementsShouldYieldMultipleHttpRequests() { + void onPut_givenJsonMessagesWithPlainContentType_whenTheirAmountIsNotAboveMaxBatchSize_shouldReturnSingleResponse() { // given - final List<String> threeJsonMessages = getAsMRJsonMessages(Arrays.asList("I", "like", "cookies")); - final List<String> twoJsonMessages = getAsMRJsonMessages(Arrays.asList("and", "pierogi")); - final Flux<JsonObject> doubleJsonMessageBatch = jsonBatch(concat( - threeJsonMessages, twoJsonMessages)); + final List<String> threeJsonMessages = getAsMRJsonMessages(List.of("I", "like", "cookies")); + final List<JsonObject> parsedThreeMessages = getAsJsonObjects(threeJsonMessages); + final Flux<JsonElement> plainMessagesMaxBatch = plainBatch(threeJsonMessages); given(httpClient.call(any(HttpRequest.class))).willReturn(Mono.just(successHttpResponse)); // when final Flux<MessageRouterPublishResponse> responses = cut - .put(mrRequestJson, doubleJsonMessageBatch); + .put(plainPublishRequest, plainMessagesMaxBatch); + + // then + verifySingleResponse(parsedThreeMessages, responses); + } + + @Test + void onPut_givenPlainMessages_whenTheirAmountIsNotAboveMaxBatchSize_shouldReturnSingleResponse() { + // given + final List<String> threePlainMessages = List.of("I", "like", "cookies"); + final List<JsonPrimitive> parsedThreeMessages = getAsJsonPrimitives(threePlainMessages); + + final Flux<JsonElement> plainMessagesMaxBatch = plainBatch(threePlainMessages); + given(httpClient.call(any(HttpRequest.class))).willReturn(Mono.just(successHttpResponse)); + + // when + final Flux<MessageRouterPublishResponse> responses = cut + .put(plainPublishRequest, plainMessagesMaxBatch); + + // then + verifySingleResponse(parsedThreeMessages, responses); + } + + @Test + void onPut_givenJsonMessages_whenTheirAmountIsAboveMaxBatchSize_shouldYieldMultipleHttpRequests() { + // given + final List<String> threeJsonMessages = getAsMRJsonMessages(List.of("I", "like", "cookies")); + final List<String> twoJsonMessages = getAsMRJsonMessages(List.of("and", "pierogi")); + + final List<JsonObject> parsedThreeMessages = getAsJsonObjects(threeJsonMessages); + final List<JsonObject> parsedTwoMessages = getAsJsonObjects(twoJsonMessages); + + final Flux<JsonObject> doubleJsonMessageBatch = jsonBatch( + threeJsonMessages.appendAll(twoJsonMessages)); + given(httpClient.call(any(HttpRequest.class))).willReturn(Mono.just(successHttpResponse)); + + // when + final Flux<MessageRouterPublishResponse> responses = cut + .put(jsonPublishRequest, doubleJsonMessageBatch); // then responses.then().block(); verify(httpClient, times(2)).call(httpRequestArgumentCaptor.capture()); - final List<HttpRequest> httpRequests = httpRequestArgumentCaptor.getAllValues(); + final List<HttpRequest> httpRequests = List.ofAll(httpRequestArgumentCaptor.getAllValues()); assertThat(httpRequests.size()).describedAs("number of requests").isEqualTo(2); final JsonArray firstRequest = extractNonEmptyJsonRequestBody(httpRequests.get(0)); - assertThat(firstRequest.size()).isEqualTo(3); - assertThat(firstRequest.get(0).toString()).isEqualTo(threeJsonMessages.get(0)); - assertThat(firstRequest.get(1).toString()).isEqualTo(threeJsonMessages.get(1)); - assertThat(firstRequest.get(2).toString()).isEqualTo(threeJsonMessages.get(2)); + assertThat(firstRequest.size()).describedAs("Http request first batch size") + .isEqualTo(MAX_BATCH_SIZE); + assertListsContainSameElements(firstRequest, parsedThreeMessages); final JsonArray secondRequest = extractNonEmptyJsonRequestBody(httpRequests.get(1)); - assertThat(secondRequest.size()).isEqualTo(2); - assertThat(secondRequest.get(0).toString()).isEqualTo(twoJsonMessages.get(0)); - assertThat(secondRequest.get(1).toString()).isEqualTo(twoJsonMessages.get(1)); + assertThat(secondRequest.size()).describedAs("Http request second batch size") + .isEqualTo(MAX_BATCH_SIZE-1); + assertListsContainSameElements(secondRequest, parsedTwoMessages); } @Test - void puttingHighNumberOfElementsShouldReturnMoreResponses() { + void onPut_givenJsonMessagesWithPlainContentType_whenTheirAmountIsAboveMaxBatchSize_shouldYieldMultipleHttpRequests() { // given - final List<String> threeJsonMessages = getAsMRJsonMessages(Arrays.asList("I", "like", "cookies")); - final List<String> twoJsonMessages = getAsMRJsonMessages(Arrays.asList("and", "pierogi")); - final Flux<JsonObject> doubleJsonMessageBatch = jsonBatch(concat( - threeJsonMessages, twoJsonMessages)); + final List<String> threeJsonMessages = getAsMRJsonMessages(List.of("I", "like", "cookies")); + final List<String> twoJsonMessages = getAsMRJsonMessages(List.of("and", "pierogi")); + + final List<JsonObject> parsedThreeMessages = getAsJsonObjects(threeJsonMessages); + final List<JsonObject> parsedTwoMessages = getAsJsonObjects(twoJsonMessages); + + final Flux<JsonElement> doublePlainMessageBatch = plainBatch( + threeJsonMessages.appendAll(twoJsonMessages)); given(httpClient.call(any(HttpRequest.class))).willReturn(Mono.just(successHttpResponse)); // when final Flux<MessageRouterPublishResponse> responses = cut - .put(mrRequestJson, doubleJsonMessageBatch); + .put(plainPublishRequest, doublePlainMessageBatch); + // then + responses.then().block(); + + verify(httpClient, times(2)).call(httpRequestArgumentCaptor.capture()); + final List<HttpRequest> httpRequests = List.ofAll(httpRequestArgumentCaptor.getAllValues()); + assertThat(httpRequests.size()).describedAs("number of requests").isEqualTo(2); + + final List<JsonObject> firstRequest = extractNonEmptyPlainRequestBody(httpRequests.get(0)) + .map(JsonElement::getAsJsonObject); + assertThat(firstRequest.size()).describedAs("Http request first batch size") + .isEqualTo(MAX_BATCH_SIZE); + assertListsContainSameElements(firstRequest, parsedThreeMessages); + + final List<JsonObject> secondRequest = extractNonEmptyPlainRequestBody(httpRequests.get(1)) + .map(JsonElement::getAsJsonObject); + assertThat(secondRequest.size()).describedAs("Http request second batch size") + .isEqualTo(MAX_BATCH_SIZE-1); + assertListsContainSameElements(secondRequest, parsedTwoMessages); + } + + @Test + void onPut_givenPlainMessages_whenTheirAmountIsAboveMaxBatchSize_shouldYieldMultipleHttpRequests() { + // given + final List<String> threePlainMessages = List.of("I", "like", "cookies"); + final List<String> twoPlainMessages = List.of("and", "pierogi"); + + final List<JsonPrimitive> parsedThreePlainMessages = getAsJsonPrimitives(threePlainMessages); + final List<JsonPrimitive> parsedTwoPlainMessages = getAsJsonPrimitives(twoPlainMessages); + + final Flux<JsonElement> doublePlainMessageBatch = plainBatch( + threePlainMessages.appendAll(twoPlainMessages)); + given(httpClient.call(any(HttpRequest.class))).willReturn(Mono.just(successHttpResponse)); + // when + final Flux<MessageRouterPublishResponse> responses = cut + .put(plainPublishRequest, doublePlainMessageBatch); // then - StepVerifier.create(responses) - .consumeNextWith(response -> { - assertThat(response.successful()).describedAs("successful").isTrue(); - assertThat(response.items()).containsExactly( - getAsJsonObject(threeJsonMessages.get(0)), - getAsJsonObject(threeJsonMessages.get(1)), - getAsJsonObject(threeJsonMessages.get(2))); - }) - .consumeNextWith(response -> { - assertThat(response.successful()).describedAs("successful").isTrue(); - assertThat(response.items()).containsExactly( - getAsJsonObject(twoJsonMessages.get(0)), - getAsJsonObject(twoJsonMessages.get(1))); - }) - .expectComplete() - .verify(TIMEOUT); + responses.then().block(); + + verify(httpClient, times(2)).call(httpRequestArgumentCaptor.capture()); + final List<HttpRequest> httpRequests = List.ofAll(httpRequestArgumentCaptor.getAllValues()); + assertThat(httpRequests.size()).describedAs("number of requests").isEqualTo(2); + + final List<JsonPrimitive> firstRequest = extractNonEmptyPlainRequestBody(httpRequests.get(0)) + .map(JsonElement::getAsJsonPrimitive); + assertThat(firstRequest.size()).describedAs("Http request first batch size") + .isEqualTo(MAX_BATCH_SIZE); + assertListsContainSameElements(firstRequest, parsedThreePlainMessages); + + final List<JsonPrimitive> secondRequest = extractNonEmptyPlainRequestBody(httpRequests.get(1)) + .map(JsonElement::getAsJsonPrimitive); + assertThat(secondRequest.size()).describedAs("Http request second batch size") + .isEqualTo(MAX_BATCH_SIZE-1); + assertListsContainSameElements(secondRequest, parsedTwoPlainMessages); } - private static List<String> getAsMRJsonMessages(List<String> plainTextMessages){ - return plainTextMessages.stream() - .map(message -> String.format("{\"message\":\"%s\"}", message)) - .collect(Collectors.toList()); + @Test + void onPut_givenJsonMessages_whenTheirAmountIsAboveMaxBatchSize_shouldReturnMoreResponses() { + // given + final List<String> threeJsonMessages = getAsMRJsonMessages(List.of("I", "like", "cookies")); + final List<String> twoJsonMessages = getAsMRJsonMessages(List.of("and", "pierogi")); + + final List<JsonObject> parsedThreeMessages = getAsJsonObjects(threeJsonMessages); + final List<JsonObject> parsedTwoMessages = getAsJsonObjects(twoJsonMessages); + + final Flux<JsonObject> doubleJsonMessageBatch = jsonBatch(threeJsonMessages.appendAll(twoJsonMessages)); + given(httpClient.call(any(HttpRequest.class))).willReturn(Mono.just(successHttpResponse)); + + // when + final Flux<MessageRouterPublishResponse> responses = cut + .put(jsonPublishRequest, doubleJsonMessageBatch); + + // then + verifyDoubleResponse(parsedThreeMessages, parsedTwoMessages, responses); + } + + @Test + void onPut_givenJsonMessagesWithPlainContentType_whenTheirAmountIsAboveMaxBatchSize_shouldReturnMoreResponses() { + // given + final List<String> threeJsonMessages = getAsMRJsonMessages(List.of("I", "like", "cookies")); + final List<String> twoJsonMessages = getAsMRJsonMessages(List.of("and", "pierogi")); + + final List<JsonObject> parsedThreeMessages = getAsJsonObjects(threeJsonMessages); + final List<JsonObject> parsedTwoMessages = getAsJsonObjects(twoJsonMessages); + + final Flux<JsonElement> doubleJsonMessageBatch = plainBatch(threeJsonMessages.appendAll(twoJsonMessages)); + given(httpClient.call(any(HttpRequest.class))).willReturn(Mono.just(successHttpResponse)); + + // when + final Flux<MessageRouterPublishResponse> responses = cut + .put(plainPublishRequest, doubleJsonMessageBatch); + + // then + verifyDoubleResponse(parsedThreeMessages, parsedTwoMessages, responses); } + @Test + void onPut_givenPlainMessages_whenTheirAmountIsAboveMaxBatchSize_shouldReturnMoreResponses() { + // given + final List<String> threePlainMessages = List.of("I", "like", "cookies"); + final List<String> twoPlainMessages = List.of("and", "pierogi"); + + final List<JsonPrimitive> parsedThreeMessages = getAsJsonPrimitives(threePlainMessages); + final List<JsonPrimitive> parsedTwoMessages = getAsJsonPrimitives(twoPlainMessages); - private static Flux<JsonObject> jsonBatch(List<String> messages){ - return Flux.fromIterable(messages).map(parser::parse).map(JsonElement::getAsJsonObject); + final Flux<JsonElement> doublePlainMessageBatch = plainBatch( + threePlainMessages.appendAll(twoPlainMessages)); + given(httpClient.call(any(HttpRequest.class))).willReturn(Mono.just(successHttpResponse)); + + // when + final Flux<MessageRouterPublishResponse> responses = cut + .put(plainPublishRequest, doublePlainMessageBatch); + + // then + verifyDoubleResponse(parsedThreeMessages, parsedTwoMessages, responses); } - private static List<String> concat(List<String> firstList, List<String> secondList){ - return Stream.concat(firstList.stream(), secondList.stream()).collect(Collectors.toList()); + private static List<String> getAsMRJsonMessages(List<String> plainTextMessages){ + return plainTextMessages + .map(message -> String.format("{\"message\":\"%s\"}", message)); } private static HttpResponse createHttpResponse(String statusReason, int statusCode){ return ImmutableHttpResponse.builder() .statusCode(statusCode) - .url(sinkDefinition.topicUrl()) + .url(TOPIC_URL) .statusReason(statusReason) .rawBody("[]".getBytes()) .build(); } - private static MessageRouterPublishRequest createMRRPublishRequest(){ - return ImmutableMessageRouterPublishRequest - .builder() - .sinkDefinition(sinkDefinition) - .build(); - } - private String collectNonEmptyRequestBody(HttpRequest httpRequest){ final String body = Flux.from(httpRequest.body().contents()) .collect(ByteBufAllocator.DEFAULT::compositeBuffer, @@ -272,7 +433,68 @@ class MessageRouterPublisherImplTest { return new Gson().fromJson(collectNonEmptyRequestBody(httpRequest), JsonArray.class); } - private JsonObject getAsJsonObject(String item){ - return new Gson().fromJson(item, JsonObject.class); + private List<JsonElement> extractNonEmptyPlainRequestBody(HttpRequest httpRequest){ + return getAsJsonElements( + List.of( + collectNonEmptyRequestBody(httpRequest) + .split("\n") + ) + ); + } + + private void assertListsContainSameElements(List<? extends JsonElement> actualMessages, + List<? extends JsonElement> expectedMessages){ + for (int i = 0; i < actualMessages.size(); i++) { + assertThat(actualMessages.get(i)) + .describedAs(String.format("Http request element at position %d", i)) + .isEqualTo(expectedMessages.get(i)); + } + } + + private void assertListsContainSameElements(JsonArray actualMessages, + List<? extends JsonElement> expectedMessages){ + assertThat(actualMessages.size()).describedAs("Http request batch size") + .isEqualTo(expectedMessages.size()); + + for (int i = 0; i < actualMessages.size(); i++) { + assertThat(actualMessages.get(i)) + .describedAs(String.format("Http request element at position %d", i)) + .isEqualTo(expectedMessages.get(i)); + } + } + + private void verifySingleResponse(List<? extends JsonElement> threeMessages, + Flux<MessageRouterPublishResponse> responses) { + StepVerifier.create(responses) + .consumeNextWith(response -> { + assertThat(response.successful()).describedAs("successful").isTrue(); + assertThat(response.items()).containsExactly( + threeMessages.get(0), + threeMessages.get(1), + threeMessages.get(2)); + }) + .expectComplete() + .verify(TIMEOUT); + } + + private void verifyDoubleResponse(List<? extends JsonElement> threeMessages, + List<? extends JsonElement> twoMessages, Flux<MessageRouterPublishResponse> responses) { + + StepVerifier.create(responses) + .consumeNextWith(response -> { + assertThat(response.successful()).describedAs("successful").isTrue(); + assertThat(response.items()).containsExactly( + threeMessages.get(0), + threeMessages.get(1), + threeMessages.get(2)); + }) + .consumeNextWith(response -> { + assertThat(response.successful()).describedAs("successful").isTrue(); + assertThat(response.items()).containsExactly( + twoMessages.get(0), + twoMessages.get(1)); + }) + .expectComplete() + .verify(TIMEOUT); } }
\ No newline at end of file |