From 97d60b4653c265530485209e8f61f73137d521b0 Mon Sep 17 00:00:00 2001 From: Izabela Zawadzka Date: Fri, 14 Jun 2019 15:26:29 +0200 Subject: Add text/plain content type handling in Publisher Change-Id: I51e17d64f813e16b81385abb8aa862ee1f927d35 Signed-off-by: Izabela Zawadzka Issue-ID: DCAEGEN2-1630 --- .../rest/services/dmaap/client/ContentType.java | 40 ++ .../client/impl/MessageRouterPublisherImpl.java | 21 +- .../client/model/MessageRouterPublishRequest.java | 5 +- .../dmaap/client/MessageRouterTestsUtils.java | 141 +++++++ .../dmaap/client/api/MessageRouterPublisherIT.java | 117 +++++- .../client/api/MessageRouterSubscriberIT.java | 2 +- .../dmaap/client/api/MessageRouterTestsUtils.java | 125 ------- .../impl/MessageRouterPublisherImplTest.java | 416 ++++++++++++++++----- 8 files changed, 632 insertions(+), 235 deletions(-) create mode 100644 rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/ContentType.java create mode 100644 rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/MessageRouterTestsUtils.java delete mode 100644 rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterTestsUtils.java (limited to 'rest-services') diff --git a/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/ContentType.java b/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/ContentType.java new file mode 100644 index 00000000..80a28d6c --- /dev/null +++ b/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/ContentType.java @@ -0,0 +1,40 @@ +/* + * ============LICENSE_START==================================== + * DCAEGEN2-SERVICES-SDK + * ========================================================= + * Copyright (C) 2019 Nokia. All rights reserved. + * ========================================================= + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END===================================== + */ + +package org.onap.dcaegen2.services.sdk.rest.services.dmaap.client; + +import io.netty.handler.codec.http.HttpHeaderValues; +import io.netty.util.AsciiString; + +public enum ContentType { + APPLICATION_JSON(HttpHeaderValues.APPLICATION_JSON), + TEXT_PLAIN(HttpHeaderValues.TEXT_PLAIN); + + private AsciiString contentType; + + ContentType(AsciiString contentType) { + this.contentType = contentType; + } + + @Override + public String toString(){ + return contentType.toString(); + } +} diff --git a/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/impl/MessageRouterPublisherImpl.java b/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/impl/MessageRouterPublisherImpl.java index aa88b9ee..191ec64f 100644 --- a/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/impl/MessageRouterPublisherImpl.java +++ b/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/impl/MessageRouterPublisherImpl.java @@ -27,6 +27,7 @@ import com.google.gson.JsonElement; import io.vavr.collection.HashMap; import io.vavr.collection.List; import java.time.Duration; +import java.util.stream.Collectors; import org.jetbrains.annotations.NotNull; import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.HttpHeaders; import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.HttpMethod; @@ -35,6 +36,7 @@ import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.HttpResponse; import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.ImmutableHttpRequest; import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.RequestBody; import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.RxHttpClient; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.ContentType; import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.MessageRouterPublisher; import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.ImmutableMessageRouterPublishResponse; import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishRequest; @@ -74,14 +76,20 @@ public class MessageRouterPublisherImpl implements MessageRouterPublisher { List batch) { LOGGER.debug("Sending a batch of {} items to DMaaP MR", batch.size()); LOGGER.trace("The items to be sent: {}", batch); - return httpClient.call(buildHttpRequest(request, asJsonBody(batch))) + return httpClient.call(buildHttpRequest(request, createBody(batch, request.contentType()))) .map(httpResponse -> buildResponse(httpResponse, batch)); } - private @NotNull RequestBody asJsonBody(List subItems) { - final JsonArray elements = new JsonArray(subItems.size()); - subItems.forEach(elements::add); - return RequestBody.fromJson(elements); + private @NotNull RequestBody createBody(List subItems, ContentType contentType) { + if(contentType == ContentType.APPLICATION_JSON) { + final JsonArray elements = new JsonArray(subItems.size()); + subItems.forEach(elements::add); + return RequestBody.fromJson(elements); + }else if(contentType == ContentType.TEXT_PLAIN){ + String messages = subItems.map(JsonElement::toString) + .collect(Collectors.joining("\n")); + return RequestBody.fromString(messages); + }else throw new IllegalArgumentException("Unsupported content type: " + contentType); } private @NotNull HttpRequest buildHttpRequest(MessageRouterPublishRequest request, RequestBody body) { @@ -89,7 +97,7 @@ public class MessageRouterPublisherImpl implements MessageRouterPublisher { .method(HttpMethod.POST) .url(request.sinkDefinition().topicUrl()) .diagnosticContext(request.diagnosticContext().withNewInvocationId()) - .customHeaders(HashMap.of(HttpHeaders.CONTENT_TYPE, request.contentType())) + .customHeaders(HashMap.of(HttpHeaders.CONTENT_TYPE, request.contentType().toString())) .body(body) .build(); } @@ -98,6 +106,7 @@ public class MessageRouterPublisherImpl implements MessageRouterPublisher { HttpResponse httpResponse, List batch) { final ImmutableMessageRouterPublishResponse.Builder builder = ImmutableMessageRouterPublishResponse.builder(); + return httpResponse.successful() ? builder.items(batch).build() : builder.failReason(extractFailReason(httpResponse)).build(); diff --git a/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/model/MessageRouterPublishRequest.java b/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/model/MessageRouterPublishRequest.java index 77f92e77..29904138 100644 --- a/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/model/MessageRouterPublishRequest.java +++ b/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/model/MessageRouterPublishRequest.java @@ -23,6 +23,7 @@ package org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model; import org.immutables.value.Value; import org.onap.dcaegen2.services.sdk.model.streams.dmaap.MessageRouterSink; import org.onap.dcaegen2.services.sdk.rest.services.annotations.ExperimentalApi; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.ContentType; /** * @author Piotr Jaszczyk @@ -34,7 +35,7 @@ public interface MessageRouterPublishRequest extends DmaapRequest { MessageRouterSink sinkDefinition(); @Value.Default - default String contentType() { - return "application/json"; + default ContentType contentType() { + return ContentType.APPLICATION_JSON; } } diff --git a/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/MessageRouterTestsUtils.java b/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/MessageRouterTestsUtils.java new file mode 100644 index 00000000..52946f56 --- /dev/null +++ b/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/MessageRouterTestsUtils.java @@ -0,0 +1,141 @@ +/* + * ============LICENSE_START==================================== + * DCAEGEN2-SERVICES-SDK + * ========================================================= + * Copyright (C) 2019 Nokia. All rights reserved. + * ========================================================= + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END===================================== + */ + +package org.onap.dcaegen2.services.sdk.rest.services.dmaap.client; + +import com.google.gson.Gson; +import com.google.gson.JsonElement; +import com.google.gson.JsonObject; +import com.google.gson.JsonParser; +import com.google.gson.JsonPrimitive; +import io.vavr.collection.List; +import org.onap.dcaegen2.services.sdk.model.streams.dmaap.ImmutableMessageRouterSink; +import org.onap.dcaegen2.services.sdk.model.streams.dmaap.ImmutableMessageRouterSource; +import org.onap.dcaegen2.services.sdk.model.streams.dmaap.MessageRouterSink; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.MessageRouterPublisher; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.MessageRouterSubscriber; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.ImmutableMessageRouterPublishRequest; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.ImmutableMessageRouterPublishResponse; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.ImmutableMessageRouterSubscribeRequest; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.ImmutableMessageRouterSubscribeResponse; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishRequest; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishResponse; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterSubscribeRequest; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterSubscribeResponse; +import reactor.core.publisher.Flux; + + +public final class MessageRouterTestsUtils { + private static final JsonParser parser = new JsonParser(); + private MessageRouterTestsUtils() {} + + public static MessageRouterPublishRequest createPublishRequest(String topicUrl){ + return createPublishRequest(topicUrl, ContentType.APPLICATION_JSON); + } + + public static MessageRouterPublishRequest createPublishRequest(String topicUrl, ContentType contentType){ + MessageRouterSink sinkDefinition = ImmutableMessageRouterSink.builder() + .name("the topic") + .topicUrl(topicUrl) + .build(); + + return ImmutableMessageRouterPublishRequest.builder() + .sinkDefinition(sinkDefinition) + .contentType(contentType) + .build(); + } + + public static MessageRouterSubscribeRequest createMRSubscribeRequest(String topicUrl, + String consumerGroup, String consumerId) { + ImmutableMessageRouterSource sourceDefinition = ImmutableMessageRouterSource.builder() + .name("the topic") + .topicUrl(topicUrl) + .build(); + + return ImmutableMessageRouterSubscribeRequest + .builder() + .sourceDefinition(sourceDefinition) + .consumerGroup(consumerGroup) + .consumerId(consumerId) + .build(); + } + + public static List getAsJsonElements(List messages){ + return messages.map(parser::parse); + } + + public static List getAsJsonObjects(List messages){ + return getAsJsonElements(messages).map(JsonElement::getAsJsonObject); + } + + public static List getAsJsonPrimitives(List messages){ + return getAsJsonElements(messages).map(JsonElement::getAsJsonPrimitive); + } + + public static JsonObject getAsJsonObject(String item){ + return new Gson().fromJson(item, JsonObject.class); + } + + public static Flux plainBatch(List messages){ + return Flux.fromIterable(getAsJsonElements(messages)); + } + + public static Flux jsonBatch(List messages){ + return Flux.fromIterable(getAsJsonObjects(messages)); + } + + public static MessageRouterSubscribeResponse errorSubscribeResponse(String failReasonFormat, Object... formatArgs){ + return ImmutableMessageRouterSubscribeResponse + .builder() + .failReason(String.format(failReasonFormat, formatArgs)) + .build(); + } + + public static MessageRouterSubscribeResponse successSubscribeResponse(List items){ + return ImmutableMessageRouterSubscribeResponse + .builder() + .items(items) + .build(); + } + + public static MessageRouterPublishResponse errorPublishResponse(String failReasonFormat, Object... formatArgs){ + return ImmutableMessageRouterPublishResponse + .builder() + .failReason(String.format(failReasonFormat, formatArgs)) + .build(); + } + + public static MessageRouterPublishResponse successPublishResponse(List items){ + return ImmutableMessageRouterPublishResponse + .builder() + .items(items) + .build(); + } + + public static void registerTopic(MessageRouterPublisher publisher, MessageRouterPublishRequest publishRequest, + MessageRouterSubscriber subscriber, MessageRouterSubscribeRequest subscribeRequest) { + final List sampleJsonMessages = List.of("{\"message\":\"message1\"}", + "{\"differentMessage\":\"message2\"}"); + final Flux jsonMessageBatch = MessageRouterTestsUtils.jsonBatch(sampleJsonMessages); + + publisher.put(publishRequest, jsonMessageBatch).blockLast(); + subscriber.get(subscribeRequest).block(); + } +} diff --git a/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterPublisherIT.java b/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterPublisherIT.java index c746bfec..0afea748 100644 --- a/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterPublisherIT.java +++ b/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterPublisherIT.java @@ -20,15 +20,15 @@ package org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api; -import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.MessageRouterTestsUtils.*; +import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.MessageRouterTestsUtils.*; import com.google.gson.JsonElement; import com.google.gson.JsonObject; -import com.google.gson.JsonPrimitive; import io.vavr.collection.List; import java.time.Duration; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.ContentType; import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishRequest; import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishResponse; import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterSubscribeRequest; @@ -95,7 +95,7 @@ class MessageRouterPublisherIT { //given final String topic = "TOPIC2"; final List threePlainTextMessages = List.of("I", "like", "pizza"); - final Flux messageBatch = plainBatch(threePlainTextMessages); + final Flux messageBatch = plainBatch(threePlainTextMessages); final MessageRouterPublishRequest mrRequest = createPublishRequest(String.format("%s/%s", EVENTS_PATH, topic)); final MessageRouterPublishResponse expectedResponse = errorPublishResponse( DMAAP_400_ERROR_RESPONSE_FORMAT, topic); @@ -112,6 +112,7 @@ class MessageRouterPublisherIT { @Test void publisher_shouldSuccessfullyPublishSingleMessage(){ + //given final String topic = "TOPIC3"; final String topicUrl = String.format("%s/%s", EVENTS_PATH, topic); final List singleJsonMessage = List.of("{\"message\":\"message1\"}"); @@ -136,7 +137,7 @@ class MessageRouterPublisherIT { @Test void publisher_shouldSuccessfullyPublishMultipleMessages(){ - final String topic = "TOPIC4"; + final String topic = "TOPIC5"; final String topicUrl = String.format("%s/%s", EVENTS_PATH, topic); final List singleJsonMessage = List.of("{\"message\":\"message1\"}", "{\"differentMessage\":\"message2\"}"); @@ -158,4 +159,112 @@ class MessageRouterPublisherIT { .expectComplete() .verify(); } + + @Test + void publisher_shouldSuccessfullyPublishSingleJsonMessageWithPlainContentType(){ + //given + final String topic = "TOPIC6"; + final String topicUrl = String.format("%s/%s", EVENTS_PATH, topic); + + final List singleJsonMessage = List.of("{\"message\":\"message1\"}"); + final List expectedItems = getAsJsonElements(singleJsonMessage); + final Flux plainBatch = plainBatch(singleJsonMessage); + + final MessageRouterPublishRequest publishRequest = createPublishRequest(topicUrl, ContentType.TEXT_PLAIN); + final MessageRouterSubscribeRequest subscribeRequest = createMRSubscribeRequest(topicUrl, "sampleGroup", "sampleId"); + final MessageRouterSubscribeResponse expectedResponse = successSubscribeResponse(expectedItems); + + //when + registerTopic(publisher, publishRequest, subscriber, subscribeRequest); + Mono response = publisher + .put(publishRequest, plainBatch) + .then(subscriber.get(subscribeRequest)); + + //then + StepVerifier.create(response) + .expectNext(expectedResponse) + .expectComplete() + .verify(); + } + + @Test + void publisher_shouldSuccessfullyPublishMultipleJsonMessagesWithPlainContentType(){ + //given + final String topic = "TOPIC7"; + final String topicUrl = String.format("%s/%s", EVENTS_PATH, topic); + + final List twoJsonMessage = List.of("{\"message\":\"message1\"}", "{\"message2\":\"message2\"}"); + final List expectedItems = getAsJsonElements(twoJsonMessage); + final Flux plainBatch = plainBatch(twoJsonMessage); + + final MessageRouterPublishRequest publishRequest = createPublishRequest(topicUrl, ContentType.TEXT_PLAIN); + final MessageRouterSubscribeRequest subscribeRequest = createMRSubscribeRequest(topicUrl, "sampleGroup", "sampleId"); + final MessageRouterSubscribeResponse expectedResponse = successSubscribeResponse(expectedItems); + + //when + registerTopic(publisher, publishRequest, subscriber, subscribeRequest); + Mono response = publisher + .put(publishRequest, plainBatch) + .then(subscriber.get(subscribeRequest)); + + //then + StepVerifier.create(response) + .expectNext(expectedResponse) + .expectComplete() + .verify(); + } + + @Test + void publisher_shouldSuccessfullyPublishSinglePlainMessageWithPlainContentType(){ + //given + final String topic = "TOPIC8"; + final String topicUrl = String.format("%s/%s", EVENTS_PATH, topic); + + final List singlePlainMessage = List.of("kebab"); + final List expectedItems = getAsJsonElements(singlePlainMessage); + final Flux plainBatch = plainBatch(singlePlainMessage); + + final MessageRouterPublishRequest publishRequest = createPublishRequest(topicUrl, ContentType.TEXT_PLAIN); + final MessageRouterSubscribeRequest subscribeRequest = createMRSubscribeRequest(topicUrl, "sampleGroup", "sampleId"); + final MessageRouterSubscribeResponse expectedResponse = successSubscribeResponse(expectedItems); + + //when + registerTopic(publisher, publishRequest, subscriber, subscribeRequest); + Mono response = publisher + .put(publishRequest, plainBatch) + .then(subscriber.get(subscribeRequest)); + + //then + StepVerifier.create(response) + .expectNext(expectedResponse) + .expectComplete() + .verify(); + } + + @Test + void publisher_shouldSuccessfullyPublishMultiplePlainMessagesWithPlainContentType(){ + //given + final String topic = "TOPIC9"; + final String topicUrl = String.format("%s/%s", EVENTS_PATH, topic); + + final List singlePlainMessage = List.of("I", "like", "pizza"); + final List expectedItems = getAsJsonElements(singlePlainMessage); + final Flux plainBatch = plainBatch(singlePlainMessage); + + final MessageRouterPublishRequest publishRequest = createPublishRequest(topicUrl, ContentType.TEXT_PLAIN); + final MessageRouterSubscribeRequest subscribeRequest = createMRSubscribeRequest(topicUrl, "sampleGroup", "sampleId"); + final MessageRouterSubscribeResponse expectedResponse = successSubscribeResponse(expectedItems); + + //when + registerTopic(publisher, publishRequest, subscriber, subscribeRequest); + Mono response = publisher + .put(publishRequest, plainBatch) + .then(subscriber.get(subscribeRequest)); + + //then + StepVerifier.create(response) + .expectNext(expectedResponse) + .expectComplete() + .verify(); + } } diff --git a/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterSubscriberIT.java b/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterSubscriberIT.java index c2e96b58..a758b2de 100644 --- a/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterSubscriberIT.java +++ b/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterSubscriberIT.java @@ -20,7 +20,7 @@ package org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api; -import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.MessageRouterTestsUtils.*; +import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.MessageRouterTestsUtils.*; import com.google.gson.JsonElement; import com.google.gson.JsonObject; diff --git a/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterTestsUtils.java b/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterTestsUtils.java deleted file mode 100644 index 8695b727..00000000 --- a/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterTestsUtils.java +++ /dev/null @@ -1,125 +0,0 @@ -/* - * ============LICENSE_START==================================== - * DCAEGEN2-SERVICES-SDK - * ========================================================= - * Copyright (C) 2019 Nokia. All rights reserved. - * ========================================================= - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END===================================== - */ - -package org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api; - -import com.google.gson.Gson; -import com.google.gson.JsonElement; -import com.google.gson.JsonObject; -import com.google.gson.JsonParser; -import com.google.gson.JsonPrimitive; -import io.vavr.collection.List; -import org.onap.dcaegen2.services.sdk.model.streams.dmaap.ImmutableMessageRouterSink; -import org.onap.dcaegen2.services.sdk.model.streams.dmaap.ImmutableMessageRouterSource; -import org.onap.dcaegen2.services.sdk.model.streams.dmaap.MessageRouterSink; -import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.ImmutableMessageRouterPublishRequest; -import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.ImmutableMessageRouterPublishResponse; -import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.ImmutableMessageRouterSubscribeRequest; -import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.ImmutableMessageRouterSubscribeResponse; -import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishRequest; -import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishResponse; -import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterSubscribeRequest; -import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterSubscribeResponse; -import reactor.core.publisher.Flux; - -final class MessageRouterTestsUtils { - private static final JsonParser parser = new JsonParser(); - private MessageRouterTestsUtils() {} - - static MessageRouterPublishRequest createPublishRequest(String topicUrl){ - MessageRouterSink sinkDefinition = ImmutableMessageRouterSink.builder() - .name("the topic") - .topicUrl(topicUrl) - .build(); - - return ImmutableMessageRouterPublishRequest.builder() - .sinkDefinition(sinkDefinition) - .build(); - } - - static MessageRouterSubscribeRequest createMRSubscribeRequest(String topicUrl, - String consumerGroup, String consumerId) { - ImmutableMessageRouterSource sourceDefinition = ImmutableMessageRouterSource.builder() - .name("the topic") - .topicUrl(topicUrl) - .build(); - - return ImmutableMessageRouterSubscribeRequest - .builder() - .sourceDefinition(sourceDefinition) - .consumerGroup(consumerGroup) - .consumerId(consumerId) - .build(); - } - - static List getAsJsonElements(List messages){ - return messages.map(parser::parse); - } - - static JsonObject getAsJsonObject(String item){ - return new Gson().fromJson(item, JsonObject.class); - } - - static Flux jsonBatch(List messages){ - return Flux.fromIterable(messages).map(parser::parse).map(JsonElement::getAsJsonObject); - } - - static Flux plainBatch(List messages){ - return Flux.fromIterable(messages).map(JsonPrimitive::new); - } - - static MessageRouterSubscribeResponse errorSubscribeResponse(String failReasonFormat, Object... formatArgs){ - return ImmutableMessageRouterSubscribeResponse - .builder() - .failReason(String.format(failReasonFormat, formatArgs)) - .build(); - } - - static MessageRouterSubscribeResponse successSubscribeResponse(List items){ - return ImmutableMessageRouterSubscribeResponse - .builder() - .items(items) - .build(); - } - - static MessageRouterPublishResponse errorPublishResponse(String failReasonFormat, Object... formatArgs){ - return ImmutableMessageRouterPublishResponse - .builder() - .failReason(String.format(failReasonFormat, formatArgs)) - .build(); - } - - static MessageRouterPublishResponse successPublishResponse(List items){ - return ImmutableMessageRouterPublishResponse - .builder() - .items(items) - .build(); - } - - static void registerTopic(MessageRouterPublisher publisher, MessageRouterPublishRequest publishRequest, - MessageRouterSubscriber subscriber, MessageRouterSubscribeRequest subscribeRequest) { - final List sampleJsonMessages = List.of("{\"message\":\"message1\"}", - "{\"differentMessage\":\"message2\"}"); - final Flux jsonMessageBatch = MessageRouterTestsUtils.jsonBatch(sampleJsonMessages); - - publisher.put(publishRequest, jsonMessageBatch).blockLast(); - subscriber.get(subscribeRequest).block(); - } -} diff --git a/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/impl/MessageRouterPublisherImplTest.java b/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/impl/MessageRouterPublisherImplTest.java index f44e86a5..38659acd 100644 --- a/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/impl/MessageRouterPublisherImplTest.java +++ b/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/impl/MessageRouterPublisherImplTest.java @@ -26,32 +26,28 @@ import static org.mockito.BDDMockito.given; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; +import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.MessageRouterTestsUtils.*; import com.google.gson.JsonArray; import com.google.gson.JsonElement; import com.google.gson.JsonObject; -import com.google.gson.JsonParser; import com.google.gson.Gson; +import com.google.gson.JsonPrimitive; import io.netty.buffer.ByteBufAllocator; import io.netty.handler.codec.http.HttpHeaderValues; +import io.vavr.collection.List; import java.nio.charset.StandardCharsets; import java.time.Duration; -import java.util.Arrays; -import java.util.List; -import java.util.stream.Collectors; -import java.util.stream.Stream; import org.junit.jupiter.api.Test; import org.mockito.ArgumentCaptor; -import org.onap.dcaegen2.services.sdk.model.streams.dmaap.ImmutableMessageRouterSink; -import org.onap.dcaegen2.services.sdk.model.streams.dmaap.MessageRouterSink; import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.HttpHeaders; import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.HttpMethod; import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.HttpRequest; import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.HttpResponse; import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.ImmutableHttpResponse; import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.RxHttpClient; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.ContentType; import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.MessageRouterPublisher; -import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.ImmutableMessageRouterPublishRequest; import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishRequest; import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishResponse; import reactor.core.publisher.Flux; @@ -64,71 +60,123 @@ import reactor.test.StepVerifier; */ class MessageRouterPublisherImplTest { private static final Duration TIMEOUT = Duration.ofSeconds(5); - private static final JsonParser parser = new JsonParser(); - private static final MessageRouterSink sinkDefinition = ImmutableMessageRouterSink.builder() - .name("the topic") - .topicUrl("https://dmaap-mr/TOPIC") - .build(); + private static final String TOPIC_URL = "https://dmaap-mr/TOPIC"; + private static final int MAX_BATCH_SIZE = 3; private final RxHttpClient httpClient = mock(RxHttpClient.class); - private final MessageRouterPublisher cut = new MessageRouterPublisherImpl(httpClient, 3, Duration.ofMinutes(1)); + private final MessageRouterPublisher cut = new MessageRouterPublisherImpl(httpClient, MAX_BATCH_SIZE, Duration.ofMinutes(1)); private final ArgumentCaptor httpRequestArgumentCaptor = ArgumentCaptor.forClass(HttpRequest.class); - private final MessageRouterPublishRequest mrRequestTextPlain = createMRRPublishRequest(); - private final MessageRouterPublishRequest mrRequestJson = createMRRPublishRequest(); + private final MessageRouterPublishRequest plainPublishRequest = createPublishRequest(TOPIC_URL, ContentType.TEXT_PLAIN); + private final MessageRouterPublishRequest jsonPublishRequest = createPublishRequest(TOPIC_URL); private final HttpResponse successHttpResponse = createHttpResponse("OK", 200); @Test void puttingElementsShouldYieldNonChunkedHttpRequest() { // given - final List threeJsonMessages = getAsMRJsonMessages(Arrays.asList("I", "like", "cookies")); + final List threeJsonMessages = getAsMRJsonMessages(List.of("I", "like", "cookies")); final Flux singleJsonMessageBatch = jsonBatch(threeJsonMessages); given(httpClient.call(any(HttpRequest.class))).willReturn(Mono.just(successHttpResponse)); // when final Flux responses = cut - .put(mrRequestTextPlain, singleJsonMessageBatch); + .put(jsonPublishRequest, singleJsonMessageBatch); responses.then().block(); // then verify(httpClient).call(httpRequestArgumentCaptor.capture()); final HttpRequest httpRequest = httpRequestArgumentCaptor.getValue(); assertThat(httpRequest.method()).isEqualTo(HttpMethod.POST); - assertThat(httpRequest.url()).isEqualTo(sinkDefinition.topicUrl()); + assertThat(httpRequest.url()).isEqualTo(TOPIC_URL); assertThat(httpRequest.body()).isNotNull(); assertThat(httpRequest.body().length()).isGreaterThan(0); } @Test - void puttingLowNumberOfElementsShouldYieldSingleHttpRequest() { + void onPut_givenJsonMessages_whenTheirAmountIsNotAboveMaxBatchSize_shouldSendSingleHttpRequest(){ // given - final List threeJsonMessages = getAsMRJsonMessages(Arrays.asList("I", "like", "cookies")); - final Flux singleJsonMessageBatch = jsonBatch(threeJsonMessages); + final List threeJsonMessages = getAsMRJsonMessages(List.of("I", "like", "cookies")); + final List parsedThreeMessages = getAsJsonObjects(threeJsonMessages); + + final Flux jsonMessagesMaxBatch = jsonBatch(threeJsonMessages); given(httpClient.call(any(HttpRequest.class))).willReturn(Mono.just(successHttpResponse)); // when final Flux responses = cut - .put(mrRequestJson, singleJsonMessageBatch); + .put(jsonPublishRequest, jsonMessagesMaxBatch); responses.then().block(); // then verify(httpClient).call(httpRequestArgumentCaptor.capture()); final HttpRequest httpRequest = httpRequestArgumentCaptor.getValue(); final JsonArray elementsInRequest = extractNonEmptyJsonRequestBody(httpRequest); - assertThat(elementsInRequest.size()).isEqualTo(3); - assertThat(elementsInRequest.get(0).toString()).isEqualTo(threeJsonMessages.get(0)); - assertThat(elementsInRequest.get(1).toString()).isEqualTo(threeJsonMessages.get(1)); - assertThat(elementsInRequest.get(2).toString()).isEqualTo(threeJsonMessages.get(2)); + + assertThat(elementsInRequest.size()).describedAs("Http request batch size") + .isEqualTo(MAX_BATCH_SIZE); + assertListsContainSameElements(elementsInRequest, parsedThreeMessages); + } + + + + @Test + void onPut_givenJsonMessagesWithPlainContentType_whenTheirAmountIsNotAboveMaxBatchSize_shouldSendSingleHttpRequest(){ + // given + final List threeJsonMessages = getAsMRJsonMessages(List.of("I", "like", "cookies")); + final List parsedThreeMessages = getAsJsonObjects(threeJsonMessages); + + final Flux plainMessagesMaxBatch = plainBatch(threeJsonMessages); + given(httpClient.call(any(HttpRequest.class))).willReturn(Mono.just(successHttpResponse)); + + // when + final Flux responses = cut + .put(plainPublishRequest, plainMessagesMaxBatch); + responses.then().block(); + + // then + verify(httpClient).call(httpRequestArgumentCaptor.capture()); + final HttpRequest httpRequest = httpRequestArgumentCaptor.getValue(); + final List elementsInRequest = extractNonEmptyPlainRequestBody(httpRequest) + .map(JsonElement::getAsJsonObject); + + + assertThat(elementsInRequest.size()).describedAs("Http request batch size") + .isEqualTo(MAX_BATCH_SIZE); + assertListsContainSameElements(elementsInRequest, parsedThreeMessages); + } + + @Test + void onPut_givenPlainMessages_whenTheirAmountIsNotAboveMaxBatchSize_shouldSendSingleHttpRequest(){ + // given + final List threePlainMessages = List.of("I", "like", "cookies"); + final List parsedThreeMessages = getAsJsonPrimitives(threePlainMessages); + + final Flux plainMessagesMaxBatch = plainBatch(threePlainMessages); + given(httpClient.call(any(HttpRequest.class))).willReturn(Mono.just(successHttpResponse)); + + // when + final Flux responses = cut + .put(plainPublishRequest, plainMessagesMaxBatch); + responses.then().block(); + + // then + verify(httpClient).call(httpRequestArgumentCaptor.capture()); + final HttpRequest httpRequest = httpRequestArgumentCaptor.getValue(); + final List elementsInRequest = extractNonEmptyPlainRequestBody(httpRequest) + .map(JsonElement::getAsJsonPrimitive); + + assertThat(elementsInRequest.size()).describedAs("Http request batch size") + .isEqualTo(MAX_BATCH_SIZE); + assertListsContainSameElements(elementsInRequest, parsedThreeMessages); } @Test void puttingElementsWithoutContentTypeSetShouldUseApplicationJson(){ // given - final List threeJsonMessages = getAsMRJsonMessages(Arrays.asList("I", "like", "cookies")); + final List threeJsonMessages = getAsMRJsonMessages(List.of("I", "like", "cookies")); final Flux singleJsonMessageBatch = jsonBatch(threeJsonMessages); given(httpClient.call(any(HttpRequest.class))).willReturn(Mono.just(successHttpResponse)); // when final Flux responses = cut - .put(mrRequestJson, singleJsonMessageBatch); + .put(jsonPublishRequest, singleJsonMessageBatch); responses.then().block(); // then @@ -139,124 +187,237 @@ class MessageRouterPublisherImplTest { } @Test - void puttingLowNumberOfElementsShouldReturnSingleResponse() { + void onPut_givenJsonMessages_whenTheirAmountIsNotAboveMaxBatchSize_shouldReturnSingleResponse() { // given - final List threeJsonMessages = getAsMRJsonMessages(Arrays.asList("I", "like", "cookies")); - final Flux singleJsonMessageBatch = jsonBatch(threeJsonMessages); + final List threeJsonMessages = getAsMRJsonMessages(List.of("I", "like", "cookies")); + final List parsedThreeMessages = getAsJsonObjects(threeJsonMessages); + + final Flux jsonMessagesMaxBatch = jsonBatch(threeJsonMessages); given(httpClient.call(any(HttpRequest.class))).willReturn(Mono.just(successHttpResponse)); // when final Flux responses = cut - .put(mrRequestJson, singleJsonMessageBatch); + .put(jsonPublishRequest, jsonMessagesMaxBatch); // then - StepVerifier.create(responses) - .consumeNextWith(response -> { - assertThat(response.successful()).describedAs("successful").isTrue(); - assertThat(response.items()).containsExactly( - getAsJsonObject(threeJsonMessages.get(0)), - getAsJsonObject(threeJsonMessages.get(1)), - getAsJsonObject(threeJsonMessages.get(2))); - }) - .expectComplete() - .verify(TIMEOUT); + verifySingleResponse(parsedThreeMessages, responses); } @Test - void puttingHighNumberOfElementsShouldYieldMultipleHttpRequests() { + void onPut_givenJsonMessagesWithPlainContentType_whenTheirAmountIsNotAboveMaxBatchSize_shouldReturnSingleResponse() { // given - final List threeJsonMessages = getAsMRJsonMessages(Arrays.asList("I", "like", "cookies")); - final List twoJsonMessages = getAsMRJsonMessages(Arrays.asList("and", "pierogi")); - final Flux doubleJsonMessageBatch = jsonBatch(concat( - threeJsonMessages, twoJsonMessages)); + final List threeJsonMessages = getAsMRJsonMessages(List.of("I", "like", "cookies")); + final List parsedThreeMessages = getAsJsonObjects(threeJsonMessages); + final Flux plainMessagesMaxBatch = plainBatch(threeJsonMessages); given(httpClient.call(any(HttpRequest.class))).willReturn(Mono.just(successHttpResponse)); // when final Flux responses = cut - .put(mrRequestJson, doubleJsonMessageBatch); + .put(plainPublishRequest, plainMessagesMaxBatch); + + // then + verifySingleResponse(parsedThreeMessages, responses); + } + + @Test + void onPut_givenPlainMessages_whenTheirAmountIsNotAboveMaxBatchSize_shouldReturnSingleResponse() { + // given + final List threePlainMessages = List.of("I", "like", "cookies"); + final List parsedThreeMessages = getAsJsonPrimitives(threePlainMessages); + + final Flux plainMessagesMaxBatch = plainBatch(threePlainMessages); + given(httpClient.call(any(HttpRequest.class))).willReturn(Mono.just(successHttpResponse)); + + // when + final Flux responses = cut + .put(plainPublishRequest, plainMessagesMaxBatch); + + // then + verifySingleResponse(parsedThreeMessages, responses); + } + + @Test + void onPut_givenJsonMessages_whenTheirAmountIsAboveMaxBatchSize_shouldYieldMultipleHttpRequests() { + // given + final List threeJsonMessages = getAsMRJsonMessages(List.of("I", "like", "cookies")); + final List twoJsonMessages = getAsMRJsonMessages(List.of("and", "pierogi")); + + final List parsedThreeMessages = getAsJsonObjects(threeJsonMessages); + final List parsedTwoMessages = getAsJsonObjects(twoJsonMessages); + + final Flux doubleJsonMessageBatch = jsonBatch( + threeJsonMessages.appendAll(twoJsonMessages)); + given(httpClient.call(any(HttpRequest.class))).willReturn(Mono.just(successHttpResponse)); + + // when + final Flux responses = cut + .put(jsonPublishRequest, doubleJsonMessageBatch); // then responses.then().block(); verify(httpClient, times(2)).call(httpRequestArgumentCaptor.capture()); - final List httpRequests = httpRequestArgumentCaptor.getAllValues(); + final List httpRequests = List.ofAll(httpRequestArgumentCaptor.getAllValues()); assertThat(httpRequests.size()).describedAs("number of requests").isEqualTo(2); final JsonArray firstRequest = extractNonEmptyJsonRequestBody(httpRequests.get(0)); - assertThat(firstRequest.size()).isEqualTo(3); - assertThat(firstRequest.get(0).toString()).isEqualTo(threeJsonMessages.get(0)); - assertThat(firstRequest.get(1).toString()).isEqualTo(threeJsonMessages.get(1)); - assertThat(firstRequest.get(2).toString()).isEqualTo(threeJsonMessages.get(2)); + assertThat(firstRequest.size()).describedAs("Http request first batch size") + .isEqualTo(MAX_BATCH_SIZE); + assertListsContainSameElements(firstRequest, parsedThreeMessages); final JsonArray secondRequest = extractNonEmptyJsonRequestBody(httpRequests.get(1)); - assertThat(secondRequest.size()).isEqualTo(2); - assertThat(secondRequest.get(0).toString()).isEqualTo(twoJsonMessages.get(0)); - assertThat(secondRequest.get(1).toString()).isEqualTo(twoJsonMessages.get(1)); + assertThat(secondRequest.size()).describedAs("Http request second batch size") + .isEqualTo(MAX_BATCH_SIZE-1); + assertListsContainSameElements(secondRequest, parsedTwoMessages); } @Test - void puttingHighNumberOfElementsShouldReturnMoreResponses() { + void onPut_givenJsonMessagesWithPlainContentType_whenTheirAmountIsAboveMaxBatchSize_shouldYieldMultipleHttpRequests() { // given - final List threeJsonMessages = getAsMRJsonMessages(Arrays.asList("I", "like", "cookies")); - final List twoJsonMessages = getAsMRJsonMessages(Arrays.asList("and", "pierogi")); - final Flux doubleJsonMessageBatch = jsonBatch(concat( - threeJsonMessages, twoJsonMessages)); + final List threeJsonMessages = getAsMRJsonMessages(List.of("I", "like", "cookies")); + final List twoJsonMessages = getAsMRJsonMessages(List.of("and", "pierogi")); + + final List parsedThreeMessages = getAsJsonObjects(threeJsonMessages); + final List parsedTwoMessages = getAsJsonObjects(twoJsonMessages); + + final Flux doublePlainMessageBatch = plainBatch( + threeJsonMessages.appendAll(twoJsonMessages)); given(httpClient.call(any(HttpRequest.class))).willReturn(Mono.just(successHttpResponse)); // when final Flux responses = cut - .put(mrRequestJson, doubleJsonMessageBatch); + .put(plainPublishRequest, doublePlainMessageBatch); + // then + responses.then().block(); + + verify(httpClient, times(2)).call(httpRequestArgumentCaptor.capture()); + final List httpRequests = List.ofAll(httpRequestArgumentCaptor.getAllValues()); + assertThat(httpRequests.size()).describedAs("number of requests").isEqualTo(2); + + final List firstRequest = extractNonEmptyPlainRequestBody(httpRequests.get(0)) + .map(JsonElement::getAsJsonObject); + assertThat(firstRequest.size()).describedAs("Http request first batch size") + .isEqualTo(MAX_BATCH_SIZE); + assertListsContainSameElements(firstRequest, parsedThreeMessages); + + final List secondRequest = extractNonEmptyPlainRequestBody(httpRequests.get(1)) + .map(JsonElement::getAsJsonObject); + assertThat(secondRequest.size()).describedAs("Http request second batch size") + .isEqualTo(MAX_BATCH_SIZE-1); + assertListsContainSameElements(secondRequest, parsedTwoMessages); + } + + @Test + void onPut_givenPlainMessages_whenTheirAmountIsAboveMaxBatchSize_shouldYieldMultipleHttpRequests() { + // given + final List threePlainMessages = List.of("I", "like", "cookies"); + final List twoPlainMessages = List.of("and", "pierogi"); + + final List parsedThreePlainMessages = getAsJsonPrimitives(threePlainMessages); + final List parsedTwoPlainMessages = getAsJsonPrimitives(twoPlainMessages); + + final Flux doublePlainMessageBatch = plainBatch( + threePlainMessages.appendAll(twoPlainMessages)); + given(httpClient.call(any(HttpRequest.class))).willReturn(Mono.just(successHttpResponse)); + // when + final Flux responses = cut + .put(plainPublishRequest, doublePlainMessageBatch); // then - StepVerifier.create(responses) - .consumeNextWith(response -> { - assertThat(response.successful()).describedAs("successful").isTrue(); - assertThat(response.items()).containsExactly( - getAsJsonObject(threeJsonMessages.get(0)), - getAsJsonObject(threeJsonMessages.get(1)), - getAsJsonObject(threeJsonMessages.get(2))); - }) - .consumeNextWith(response -> { - assertThat(response.successful()).describedAs("successful").isTrue(); - assertThat(response.items()).containsExactly( - getAsJsonObject(twoJsonMessages.get(0)), - getAsJsonObject(twoJsonMessages.get(1))); - }) - .expectComplete() - .verify(TIMEOUT); + responses.then().block(); + + verify(httpClient, times(2)).call(httpRequestArgumentCaptor.capture()); + final List httpRequests = List.ofAll(httpRequestArgumentCaptor.getAllValues()); + assertThat(httpRequests.size()).describedAs("number of requests").isEqualTo(2); + + final List firstRequest = extractNonEmptyPlainRequestBody(httpRequests.get(0)) + .map(JsonElement::getAsJsonPrimitive); + assertThat(firstRequest.size()).describedAs("Http request first batch size") + .isEqualTo(MAX_BATCH_SIZE); + assertListsContainSameElements(firstRequest, parsedThreePlainMessages); + + final List secondRequest = extractNonEmptyPlainRequestBody(httpRequests.get(1)) + .map(JsonElement::getAsJsonPrimitive); + assertThat(secondRequest.size()).describedAs("Http request second batch size") + .isEqualTo(MAX_BATCH_SIZE-1); + assertListsContainSameElements(secondRequest, parsedTwoPlainMessages); } - private static List getAsMRJsonMessages(List plainTextMessages){ - return plainTextMessages.stream() - .map(message -> String.format("{\"message\":\"%s\"}", message)) - .collect(Collectors.toList()); + @Test + void onPut_givenJsonMessages_whenTheirAmountIsAboveMaxBatchSize_shouldReturnMoreResponses() { + // given + final List threeJsonMessages = getAsMRJsonMessages(List.of("I", "like", "cookies")); + final List twoJsonMessages = getAsMRJsonMessages(List.of("and", "pierogi")); + + final List parsedThreeMessages = getAsJsonObjects(threeJsonMessages); + final List parsedTwoMessages = getAsJsonObjects(twoJsonMessages); + + final Flux doubleJsonMessageBatch = jsonBatch(threeJsonMessages.appendAll(twoJsonMessages)); + given(httpClient.call(any(HttpRequest.class))).willReturn(Mono.just(successHttpResponse)); + + // when + final Flux responses = cut + .put(jsonPublishRequest, doubleJsonMessageBatch); + + // then + verifyDoubleResponse(parsedThreeMessages, parsedTwoMessages, responses); + } + + @Test + void onPut_givenJsonMessagesWithPlainContentType_whenTheirAmountIsAboveMaxBatchSize_shouldReturnMoreResponses() { + // given + final List threeJsonMessages = getAsMRJsonMessages(List.of("I", "like", "cookies")); + final List twoJsonMessages = getAsMRJsonMessages(List.of("and", "pierogi")); + + final List parsedThreeMessages = getAsJsonObjects(threeJsonMessages); + final List parsedTwoMessages = getAsJsonObjects(twoJsonMessages); + + final Flux doubleJsonMessageBatch = plainBatch(threeJsonMessages.appendAll(twoJsonMessages)); + given(httpClient.call(any(HttpRequest.class))).willReturn(Mono.just(successHttpResponse)); + + // when + final Flux responses = cut + .put(plainPublishRequest, doubleJsonMessageBatch); + + // then + verifyDoubleResponse(parsedThreeMessages, parsedTwoMessages, responses); } + @Test + void onPut_givenPlainMessages_whenTheirAmountIsAboveMaxBatchSize_shouldReturnMoreResponses() { + // given + final List threePlainMessages = List.of("I", "like", "cookies"); + final List twoPlainMessages = List.of("and", "pierogi"); + + final List parsedThreeMessages = getAsJsonPrimitives(threePlainMessages); + final List parsedTwoMessages = getAsJsonPrimitives(twoPlainMessages); - private static Flux jsonBatch(List messages){ - return Flux.fromIterable(messages).map(parser::parse).map(JsonElement::getAsJsonObject); + final Flux doublePlainMessageBatch = plainBatch( + threePlainMessages.appendAll(twoPlainMessages)); + given(httpClient.call(any(HttpRequest.class))).willReturn(Mono.just(successHttpResponse)); + + // when + final Flux responses = cut + .put(plainPublishRequest, doublePlainMessageBatch); + + // then + verifyDoubleResponse(parsedThreeMessages, parsedTwoMessages, responses); } - private static List concat(List firstList, List secondList){ - return Stream.concat(firstList.stream(), secondList.stream()).collect(Collectors.toList()); + private static List getAsMRJsonMessages(List plainTextMessages){ + return plainTextMessages + .map(message -> String.format("{\"message\":\"%s\"}", message)); } private static HttpResponse createHttpResponse(String statusReason, int statusCode){ return ImmutableHttpResponse.builder() .statusCode(statusCode) - .url(sinkDefinition.topicUrl()) + .url(TOPIC_URL) .statusReason(statusReason) .rawBody("[]".getBytes()) .build(); } - private static MessageRouterPublishRequest createMRRPublishRequest(){ - return ImmutableMessageRouterPublishRequest - .builder() - .sinkDefinition(sinkDefinition) - .build(); - } - private String collectNonEmptyRequestBody(HttpRequest httpRequest){ final String body = Flux.from(httpRequest.body().contents()) .collect(ByteBufAllocator.DEFAULT::compositeBuffer, @@ -272,7 +433,68 @@ class MessageRouterPublisherImplTest { return new Gson().fromJson(collectNonEmptyRequestBody(httpRequest), JsonArray.class); } - private JsonObject getAsJsonObject(String item){ - return new Gson().fromJson(item, JsonObject.class); + private List extractNonEmptyPlainRequestBody(HttpRequest httpRequest){ + return getAsJsonElements( + List.of( + collectNonEmptyRequestBody(httpRequest) + .split("\n") + ) + ); + } + + private void assertListsContainSameElements(List actualMessages, + List expectedMessages){ + for (int i = 0; i < actualMessages.size(); i++) { + assertThat(actualMessages.get(i)) + .describedAs(String.format("Http request element at position %d", i)) + .isEqualTo(expectedMessages.get(i)); + } + } + + private void assertListsContainSameElements(JsonArray actualMessages, + List expectedMessages){ + assertThat(actualMessages.size()).describedAs("Http request batch size") + .isEqualTo(expectedMessages.size()); + + for (int i = 0; i < actualMessages.size(); i++) { + assertThat(actualMessages.get(i)) + .describedAs(String.format("Http request element at position %d", i)) + .isEqualTo(expectedMessages.get(i)); + } + } + + private void verifySingleResponse(List threeMessages, + Flux responses) { + StepVerifier.create(responses) + .consumeNextWith(response -> { + assertThat(response.successful()).describedAs("successful").isTrue(); + assertThat(response.items()).containsExactly( + threeMessages.get(0), + threeMessages.get(1), + threeMessages.get(2)); + }) + .expectComplete() + .verify(TIMEOUT); + } + + private void verifyDoubleResponse(List threeMessages, + List twoMessages, Flux responses) { + + StepVerifier.create(responses) + .consumeNextWith(response -> { + assertThat(response.successful()).describedAs("successful").isTrue(); + assertThat(response.items()).containsExactly( + threeMessages.get(0), + threeMessages.get(1), + threeMessages.get(2)); + }) + .consumeNextWith(response -> { + assertThat(response.successful()).describedAs("successful").isTrue(); + assertThat(response.items()).containsExactly( + twoMessages.get(0), + twoMessages.get(1)); + }) + .expectComplete() + .verify(TIMEOUT); } } \ No newline at end of file -- cgit 1.2.3-korg