summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterSubscriberCIT.java122
-rw-r--r--rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/impl/MessageRouterPublisherImplTest.java174
2 files changed, 224 insertions, 72 deletions
diff --git a/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterSubscriberCIT.java b/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterSubscriberCIT.java
index 5b113503..1f0fdafd 100644
--- a/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterSubscriberCIT.java
+++ b/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterSubscriberCIT.java
@@ -22,15 +22,16 @@ package org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api;
import com.google.gson.Gson;
import com.google.gson.JsonArray;
+import com.google.gson.JsonElement;
+import com.google.gson.JsonObject;
import com.google.gson.JsonParser;
-import com.google.gson.JsonPrimitive;
import java.io.File;
import java.net.URL;
import java.time.Duration;
import java.util.Arrays;
+import java.util.Collections;
import java.util.List;
import org.junit.jupiter.api.BeforeAll;
-import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import org.onap.dcaegen2.services.sdk.model.streams.dmaap.ImmutableMessageRouterSink;
import org.onap.dcaegen2.services.sdk.model.streams.dmaap.ImmutableMessageRouterSource;
@@ -50,15 +51,12 @@ import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.test.StepVerifier;
-@Disabled("Disabled until fix messages formatting in MessageRouterPublisher::put ")
@Testcontainers
class MessageRouterSubscriberCIT {
private static final Gson gson = new Gson();
+ private static final JsonParser parser = new JsonParser();
private static final Duration TIMEOUT = Duration.ofSeconds(10);
private static final int DMAAP_SERVICE_EXPOSED_PORT = 3904;
- private static final List<String> messageBatchItems = Arrays.asList("I", "like", "pizza");
- private static final Flux<JsonPrimitive> messageBatch = Flux.fromIterable(messageBatchItems)
- .map(JsonPrimitive::new);
private static final String CONSUMER_GROUP = "group1";
private static final String CONSUMER_ID = "consumer200";
private static final String DMAAP_SERVICE_NAME = "dmaap";
@@ -82,7 +80,7 @@ class MessageRouterSubscriberCIT {
private MessageRouterPublisher publisher = DmaapClientFactory
.createMessageRouterPublisher(MessageRouterPublisherConfig.createDefault());
- private MessageRouterSubscriber sut = DmaapClientFactory
+ private MessageRouterSubscriber subscriber = DmaapClientFactory
.createMessageRouterSubscriber(MessageRouterSubscriberConfig.createDefault());
@@ -105,7 +103,7 @@ class MessageRouterSubscriberCIT {
.build();
//when
- Mono<MessageRouterSubscribeResponse> response = sut
+ Mono<MessageRouterSubscribeResponse> response = subscriber
.get(mrSubscribeRequest);
//then
@@ -116,12 +114,15 @@ class MessageRouterSubscriberCIT {
}
@Test
- void subscriber_shouldGetCorrectResponse() {
+ void subscriberShouldHandleSingleItemResponse(){
//given
final String topic = "TOPIC";
- final MessageRouterPublishRequest publishRequest = createMRPublishRequest(topic, "text/plain");
+ final MessageRouterPublishRequest publishRequest = createMRPublishRequest(topic);
final MessageRouterSubscribeRequest subscribeRequest = createMRSubscribeRequest(topic);
- final JsonArray expectedItems = getAsJsonArray(messageBatchItems);
+
+ final List<String> singleJsonMessage = Arrays.asList("{\"message\":\"message1\"}");
+ final Flux<JsonObject> jsonMessageBatch = jsonBatch(singleJsonMessage);
+ final JsonArray expectedItems = getAsJsonArray(singleJsonMessage);
final ImmutableMessageRouterSubscribeResponse expectedResponse = ImmutableMessageRouterSubscribeResponse
.builder()
.items(expectedItems)
@@ -130,8 +131,8 @@ class MessageRouterSubscriberCIT {
//when
registerTopic(publishRequest, subscribeRequest);
Mono<MessageRouterSubscribeResponse> response = publisher
- .put(publishRequest, messageBatch)
- .then(sut.get(subscribeRequest));
+ .put(publishRequest, jsonMessageBatch)
+ .then(subscriber.get(subscribeRequest));
//then
StepVerifier.create(response)
@@ -140,6 +141,84 @@ class MessageRouterSubscriberCIT {
.verify();
}
+ @Test
+ void subscriber_shouldHandleMultipleItemsResponse() {
+ //given
+ final String topic = "TOPIC2";
+ final MessageRouterPublishRequest publishRequest = createMRPublishRequest(topic);
+ final MessageRouterSubscribeRequest subscribeRequest = createMRSubscribeRequest(topic);
+
+ final List<String> twoJsonMessages = Arrays.asList("{\"message\":\"message1\"}",
+ "{\"differentMessage\":\"message2\"}");
+ final Flux<JsonObject> jsonMessageBatch = jsonBatch(twoJsonMessages);
+ final JsonArray expectedItems = getAsJsonArray(twoJsonMessages);
+ final ImmutableMessageRouterSubscribeResponse expectedResponse = ImmutableMessageRouterSubscribeResponse
+ .builder()
+ .items(expectedItems)
+ .build();
+
+ //when
+ registerTopic(publishRequest, subscribeRequest);
+ Mono<MessageRouterSubscribeResponse> response = publisher
+ .put(publishRequest, jsonMessageBatch)
+ .then(subscriber.get(subscribeRequest));
+
+ //then
+ StepVerifier.create(response)
+ .expectNext(expectedResponse)
+ .expectComplete()
+ .verify();
+ }
+
+ @Test
+ void subscriber_shouldExtractItemsFromResponse() {
+ //given
+ final String topic = "TOPIC3";
+ final MessageRouterPublishRequest publishRequest = createMRPublishRequest(topic);
+ final MessageRouterSubscribeRequest subscribeRequest = createMRSubscribeRequest(topic);
+
+ final List<String> twoJsonMessages = Arrays.asList("{\"message\":\"message1\"}",
+ "{\"differentMessage\":\"message2\"}");
+ final Flux<JsonObject> jsonMessageBatch = jsonBatch(twoJsonMessages);
+
+ //when
+ registerTopic(publishRequest, subscribeRequest);
+ final Flux<String> result = publisher.put(publishRequest, jsonMessageBatch)
+ .thenMany(subscriber.getElements(subscribeRequest).map(JsonElement::getAsString));
+
+ //then
+ StepVerifier.create(result)
+ .expectNext(twoJsonMessages.get(0))
+ .expectNext(twoJsonMessages.get(1))
+ .expectComplete()
+ .verify(TIMEOUT);
+ }
+
+ @Test
+ void subscriber_shouldSubscribeToTopic(){
+ //given
+ final String topic = "TOPIC4";
+ final MessageRouterPublishRequest publishRequest = createMRPublishRequest(topic);
+ final MessageRouterSubscribeRequest subscribeRequest = createMRSubscribeRequest(topic);
+
+ final List<String> twoJsonMessages = Arrays.asList("{\"message\":\"message1\"}",
+ "{\"differentMessage\":\"message2\"}");
+ final Flux<JsonObject> jsonMessageBatch = jsonBatch(twoJsonMessages);
+
+ //when
+ registerTopic(publishRequest, subscribeRequest);
+ final Flux<String> result = publisher.put(publishRequest, jsonMessageBatch)
+ .thenMany(subscriber.subscribeForElements(subscribeRequest, Duration.ofSeconds(1))
+ .map(JsonElement::getAsString));
+
+ //then
+ StepVerifier.create(result.take(2))
+ .expectNext(twoJsonMessages.get(0))
+ .expectNext(twoJsonMessages.get(1))
+ .expectComplete()
+ .verify(TIMEOUT);
+ }
+
private static String getDockerComposeFilePath(String resourceName){
URL resource = MessageRouterSubscriberCIT.class.getClassLoader()
.getResource(resourceName);
@@ -149,8 +228,7 @@ class MessageRouterSubscriberCIT {
.format("File %s does not exist", resourceName));
}
- private static MessageRouterPublishRequest createMRPublishRequest(String topic,
- String contentType) {
+ private static MessageRouterPublishRequest createMRPublishRequest(String topic){
MessageRouterSink sinkDefinition = ImmutableMessageRouterSink.builder()
.name("the topic")
.topicUrl(String.format("%s/%s", EVENTS_PATH, topic))
@@ -158,7 +236,6 @@ class MessageRouterSubscriberCIT {
return ImmutableMessageRouterPublishRequest.builder()
.sinkDefinition(sinkDefinition)
- .contentType(contentType)
.build();
}
@@ -178,14 +255,21 @@ class MessageRouterSubscriberCIT {
private void registerTopic(MessageRouterPublishRequest publishRequest,
MessageRouterSubscribeRequest subscribeRequest) {
- Flux<JsonPrimitive> sampleMessage = Flux.just("sample message").map(JsonPrimitive::new);
+ final List<String> sampleJsonMessages = Arrays.asList("{\"message\":\"message1\"}",
+ "{\"differentMessage\":\"message2\"}");
+ final Flux<JsonObject> jsonMessageBatch = Flux.fromIterable(sampleJsonMessages)
+ .map(parser::parse).map(JsonElement::getAsJsonObject);
- publisher.put(publishRequest, sampleMessage).blockLast();
- sut.get(subscribeRequest).block();
+ publisher.put(publishRequest, jsonMessageBatch).blockLast();
+ subscriber.get(subscribeRequest).block();
}
private JsonArray getAsJsonArray(List<String> list) {
String listsJsonString = gson.toJson(list);
return new JsonParser().parse(listsJsonString).getAsJsonArray();
}
+
+ private static Flux<JsonObject> jsonBatch(List<String> messages){
+ return Flux.fromIterable(messages).map(parser::parse).map(JsonElement::getAsJsonObject);
+ }
}
diff --git a/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/impl/MessageRouterPublisherImplTest.java b/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/impl/MessageRouterPublisherImplTest.java
index f0138d2c..f44e86a5 100644
--- a/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/impl/MessageRouterPublisherImplTest.java
+++ b/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/impl/MessageRouterPublisherImplTest.java
@@ -27,21 +27,24 @@ import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
-import com.google.gson.Gson;
import com.google.gson.JsonArray;
+import com.google.gson.JsonElement;
import com.google.gson.JsonObject;
-import com.google.gson.JsonPrimitive;
-import io.netty.buffer.ByteBuf;
+import com.google.gson.JsonParser;
+import com.google.gson.Gson;
import io.netty.buffer.ByteBufAllocator;
-import io.netty.buffer.CompositeByteBuf;
+import io.netty.handler.codec.http.HttpHeaderValues;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
+import java.util.Arrays;
import java.util.List;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
import org.junit.jupiter.api.Test;
import org.mockito.ArgumentCaptor;
-import org.mockito.verification.VerificationMode;
import org.onap.dcaegen2.services.sdk.model.streams.dmaap.ImmutableMessageRouterSink;
import org.onap.dcaegen2.services.sdk.model.streams.dmaap.MessageRouterSink;
+import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.HttpHeaders;
import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.HttpMethod;
import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.HttpRequest;
import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.HttpResponse;
@@ -60,34 +63,29 @@ import reactor.test.StepVerifier;
* @since April 2019
*/
class MessageRouterPublisherImplTest {
-
private static final Duration TIMEOUT = Duration.ofSeconds(5);
- private final RxHttpClient httpClient = mock(RxHttpClient.class);
- private final MessageRouterPublisher cut = new MessageRouterPublisherImpl(httpClient, 3, Duration.ofMinutes(1));
-
- private final ArgumentCaptor<HttpRequest> httpRequestArgumentCaptor = ArgumentCaptor.forClass(HttpRequest.class);
- private final MessageRouterSink sinkDefinition = ImmutableMessageRouterSink.builder()
+ private static final JsonParser parser = new JsonParser();
+ private static final MessageRouterSink sinkDefinition = ImmutableMessageRouterSink.builder()
.name("the topic")
.topicUrl("https://dmaap-mr/TOPIC")
.build();
- private final MessageRouterPublishRequest mrRequest = ImmutableMessageRouterPublishRequest.builder()
- .sinkDefinition(sinkDefinition)
- .build();
- private final HttpResponse httpResponse = ImmutableHttpResponse.builder()
- .statusCode(200)
- .statusReason("OK")
- .url(sinkDefinition.topicUrl())
- .rawBody("[]".getBytes())
- .build();
+ private final RxHttpClient httpClient = mock(RxHttpClient.class);
+ private final MessageRouterPublisher cut = new MessageRouterPublisherImpl(httpClient, 3, Duration.ofMinutes(1));
+ private final ArgumentCaptor<HttpRequest> httpRequestArgumentCaptor = ArgumentCaptor.forClass(HttpRequest.class);
+ private final MessageRouterPublishRequest mrRequestTextPlain = createMRRPublishRequest();
+ private final MessageRouterPublishRequest mrRequestJson = createMRRPublishRequest();
+ private final HttpResponse successHttpResponse = createHttpResponse("OK", 200);
@Test
void puttingElementsShouldYieldNonChunkedHttpRequest() {
// given
- given(httpClient.call(any(HttpRequest.class))).willReturn(Mono.just(httpResponse));
+ final List<String> threeJsonMessages = getAsMRJsonMessages(Arrays.asList("I", "like", "cookies"));
+ final Flux<JsonObject> singleJsonMessageBatch = jsonBatch(threeJsonMessages);
+ given(httpClient.call(any(HttpRequest.class))).willReturn(Mono.just(successHttpResponse));
// when
final Flux<MessageRouterPublishResponse> responses = cut
- .put(mrRequest, Flux.just("I", "like", "cookies").map(JsonPrimitive::new));
+ .put(mrRequestTextPlain, singleJsonMessageBatch);
responses.then().block();
// then
@@ -102,55 +100,81 @@ class MessageRouterPublisherImplTest {
@Test
void puttingLowNumberOfElementsShouldYieldSingleHttpRequest() {
// given
- given(httpClient.call(any(HttpRequest.class))).willReturn(Mono.just(httpResponse));
+ final List<String> threeJsonMessages = getAsMRJsonMessages(Arrays.asList("I", "like", "cookies"));
+ final Flux<JsonObject> singleJsonMessageBatch = jsonBatch(threeJsonMessages);
+ given(httpClient.call(any(HttpRequest.class))).willReturn(Mono.just(successHttpResponse));
// when
final Flux<MessageRouterPublishResponse> responses = cut
- .put(mrRequest, Flux.just("I", "like", "cookies").map(JsonPrimitive::new));
+ .put(mrRequestJson, singleJsonMessageBatch);
responses.then().block();
// then
verify(httpClient).call(httpRequestArgumentCaptor.capture());
final HttpRequest httpRequest = httpRequestArgumentCaptor.getValue();
- final JsonArray elementsInRequest = extractNonEmptyRequestBody(httpRequest);
+ final JsonArray elementsInRequest = extractNonEmptyJsonRequestBody(httpRequest);
assertThat(elementsInRequest.size()).isEqualTo(3);
- assertThat(elementsInRequest.get(0).getAsString()).isEqualTo("I");
- assertThat(elementsInRequest.get(1).getAsString()).isEqualTo("like");
- assertThat(elementsInRequest.get(2).getAsString()).isEqualTo("cookies");
+ assertThat(elementsInRequest.get(0).toString()).isEqualTo(threeJsonMessages.get(0));
+ assertThat(elementsInRequest.get(1).toString()).isEqualTo(threeJsonMessages.get(1));
+ assertThat(elementsInRequest.get(2).toString()).isEqualTo(threeJsonMessages.get(2));
+ }
+
+ @Test
+ void puttingElementsWithoutContentTypeSetShouldUseApplicationJson(){
+ // given
+ final List<String> threeJsonMessages = getAsMRJsonMessages(Arrays.asList("I", "like", "cookies"));
+ final Flux<JsonObject> singleJsonMessageBatch = jsonBatch(threeJsonMessages);
+ given(httpClient.call(any(HttpRequest.class))).willReturn(Mono.just(successHttpResponse));
+
+ // when
+ final Flux<MessageRouterPublishResponse> responses = cut
+ .put(mrRequestJson, singleJsonMessageBatch);
+ responses.then().block();
+
+ // then
+ verify(httpClient).call(httpRequestArgumentCaptor.capture());
+ final HttpRequest httpRequest = httpRequestArgumentCaptor.getValue();
+ assertThat(httpRequest.headers().getOrElse(HttpHeaders.CONTENT_TYPE, ""))
+ .isEqualTo(HttpHeaderValues.APPLICATION_JSON.toString());
}
@Test
void puttingLowNumberOfElementsShouldReturnSingleResponse() {
// given
- given(httpClient.call(any(HttpRequest.class))).willReturn(Mono.just(httpResponse));
+ final List<String> threeJsonMessages = getAsMRJsonMessages(Arrays.asList("I", "like", "cookies"));
+ final Flux<JsonObject> singleJsonMessageBatch = jsonBatch(threeJsonMessages);
+ given(httpClient.call(any(HttpRequest.class))).willReturn(Mono.just(successHttpResponse));
// when
final Flux<MessageRouterPublishResponse> responses = cut
- .put(mrRequest, Flux.just("I", "like", "cookies").map(JsonPrimitive::new));
+ .put(mrRequestJson, singleJsonMessageBatch);
// then
StepVerifier.create(responses)
.consumeNextWith(response -> {
assertThat(response.successful()).describedAs("successful").isTrue();
assertThat(response.items()).containsExactly(
- new JsonPrimitive("I"),
- new JsonPrimitive("like"),
- new JsonPrimitive("cookies"));
+ getAsJsonObject(threeJsonMessages.get(0)),
+ getAsJsonObject(threeJsonMessages.get(1)),
+ getAsJsonObject(threeJsonMessages.get(2)));
})
.expectComplete()
.verify(TIMEOUT);
}
-
@Test
void puttingHighNumberOfElementsShouldYieldMultipleHttpRequests() {
// given
- given(httpClient.call(any(HttpRequest.class))).willReturn(Mono.just(httpResponse));
+ final List<String> threeJsonMessages = getAsMRJsonMessages(Arrays.asList("I", "like", "cookies"));
+ final List<String> twoJsonMessages = getAsMRJsonMessages(Arrays.asList("and", "pierogi"));
+ final Flux<JsonObject> doubleJsonMessageBatch = jsonBatch(concat(
+ threeJsonMessages, twoJsonMessages));
+
+ given(httpClient.call(any(HttpRequest.class))).willReturn(Mono.just(successHttpResponse));
// when
final Flux<MessageRouterPublishResponse> responses = cut
- .put(mrRequest, Flux.just("I", "like", "cookies", "and", "pierogi").map(JsonPrimitive::new));
-
+ .put(mrRequestJson, doubleJsonMessageBatch);
// then
responses.then().block();
@@ -158,53 +182,97 @@ class MessageRouterPublisherImplTest {
final List<HttpRequest> httpRequests = httpRequestArgumentCaptor.getAllValues();
assertThat(httpRequests.size()).describedAs("number of requests").isEqualTo(2);
- final JsonArray firstRequest = extractNonEmptyRequestBody(httpRequests.get(0));
+ final JsonArray firstRequest = extractNonEmptyJsonRequestBody(httpRequests.get(0));
assertThat(firstRequest.size()).isEqualTo(3);
- assertThat(firstRequest.get(0).getAsString()).isEqualTo("I");
- assertThat(firstRequest.get(1).getAsString()).isEqualTo("like");
- assertThat(firstRequest.get(2).getAsString()).isEqualTo("cookies");
+ assertThat(firstRequest.get(0).toString()).isEqualTo(threeJsonMessages.get(0));
+ assertThat(firstRequest.get(1).toString()).isEqualTo(threeJsonMessages.get(1));
+ assertThat(firstRequest.get(2).toString()).isEqualTo(threeJsonMessages.get(2));
- final JsonArray secondRequest = extractNonEmptyRequestBody(httpRequests.get(1));
+ final JsonArray secondRequest = extractNonEmptyJsonRequestBody(httpRequests.get(1));
assertThat(secondRequest.size()).isEqualTo(2);
- assertThat(secondRequest.get(0).getAsString()).isEqualTo("and");
- assertThat(secondRequest.get(1).getAsString()).isEqualTo("pierogi");
+ assertThat(secondRequest.get(0).toString()).isEqualTo(twoJsonMessages.get(0));
+ assertThat(secondRequest.get(1).toString()).isEqualTo(twoJsonMessages.get(1));
}
@Test
void puttingHighNumberOfElementsShouldReturnMoreResponses() {
// given
- given(httpClient.call(any(HttpRequest.class))).willReturn(Mono.just(httpResponse));
+ final List<String> threeJsonMessages = getAsMRJsonMessages(Arrays.asList("I", "like", "cookies"));
+ final List<String> twoJsonMessages = getAsMRJsonMessages(Arrays.asList("and", "pierogi"));
+ final Flux<JsonObject> doubleJsonMessageBatch = jsonBatch(concat(
+ threeJsonMessages, twoJsonMessages));
+ given(httpClient.call(any(HttpRequest.class))).willReturn(Mono.just(successHttpResponse));
// when
final Flux<MessageRouterPublishResponse> responses = cut
- .put(mrRequest, Flux.just("I", "like", "cookies", "and", "pierogi").map(JsonPrimitive::new));
+ .put(mrRequestJson, doubleJsonMessageBatch);
// then
StepVerifier.create(responses)
.consumeNextWith(response -> {
assertThat(response.successful()).describedAs("successful").isTrue();
assertThat(response.items()).containsExactly(
- new JsonPrimitive("I"),
- new JsonPrimitive("like"),
- new JsonPrimitive("cookies"));
+ getAsJsonObject(threeJsonMessages.get(0)),
+ getAsJsonObject(threeJsonMessages.get(1)),
+ getAsJsonObject(threeJsonMessages.get(2)));
})
.consumeNextWith(response -> {
assertThat(response.successful()).describedAs("successful").isTrue();
assertThat(response.items()).containsExactly(
- new JsonPrimitive("and"),
- new JsonPrimitive("pierogi"));
+ getAsJsonObject(twoJsonMessages.get(0)),
+ getAsJsonObject(twoJsonMessages.get(1)));
})
.expectComplete()
.verify(TIMEOUT);
}
- private JsonArray extractNonEmptyRequestBody(HttpRequest httpRequest) {
+ private static List<String> getAsMRJsonMessages(List<String> plainTextMessages){
+ return plainTextMessages.stream()
+ .map(message -> String.format("{\"message\":\"%s\"}", message))
+ .collect(Collectors.toList());
+ }
+
+
+ private static Flux<JsonObject> jsonBatch(List<String> messages){
+ return Flux.fromIterable(messages).map(parser::parse).map(JsonElement::getAsJsonObject);
+ }
+
+ private static List<String> concat(List<String> firstList, List<String> secondList){
+ return Stream.concat(firstList.stream(), secondList.stream()).collect(Collectors.toList());
+ }
+
+ private static HttpResponse createHttpResponse(String statusReason, int statusCode){
+ return ImmutableHttpResponse.builder()
+ .statusCode(statusCode)
+ .url(sinkDefinition.topicUrl())
+ .statusReason(statusReason)
+ .rawBody("[]".getBytes())
+ .build();
+ }
+
+ private static MessageRouterPublishRequest createMRRPublishRequest(){
+ return ImmutableMessageRouterPublishRequest
+ .builder()
+ .sinkDefinition(sinkDefinition)
+ .build();
+ }
+
+ private String collectNonEmptyRequestBody(HttpRequest httpRequest){
final String body = Flux.from(httpRequest.body().contents())
.collect(ByteBufAllocator.DEFAULT::compositeBuffer,
(byteBufs, buffer) -> byteBufs.addComponent(true, buffer))
.map(byteBufs -> byteBufs.toString(StandardCharsets.UTF_8))
.block();
assertThat(body).describedAs("request body").isNotBlank();
- return new Gson().fromJson(body, JsonArray.class);
+
+ return body;
+ }
+
+ private JsonArray extractNonEmptyJsonRequestBody(HttpRequest httpRequest){
+ return new Gson().fromJson(collectNonEmptyRequestBody(httpRequest), JsonArray.class);
+ }
+
+ private JsonObject getAsJsonObject(String item){
+ return new Gson().fromJson(item, JsonObject.class);
}
} \ No newline at end of file