diff options
2 files changed, 224 insertions, 72 deletions
diff --git a/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterSubscriberCIT.java b/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterSubscriberCIT.java index 5b113503..1f0fdafd 100644 --- a/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterSubscriberCIT.java +++ b/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterSubscriberCIT.java @@ -22,15 +22,16 @@ package org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api; import com.google.gson.Gson; import com.google.gson.JsonArray; +import com.google.gson.JsonElement; +import com.google.gson.JsonObject; import com.google.gson.JsonParser; -import com.google.gson.JsonPrimitive; import java.io.File; import java.net.URL; import java.time.Duration; import java.util.Arrays; +import java.util.Collections; import java.util.List; import org.junit.jupiter.api.BeforeAll; -import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; import org.onap.dcaegen2.services.sdk.model.streams.dmaap.ImmutableMessageRouterSink; import org.onap.dcaegen2.services.sdk.model.streams.dmaap.ImmutableMessageRouterSource; @@ -50,15 +51,12 @@ import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.test.StepVerifier; -@Disabled("Disabled until fix messages formatting in MessageRouterPublisher::put ") @Testcontainers class MessageRouterSubscriberCIT { private static final Gson gson = new Gson(); + private static final JsonParser parser = new JsonParser(); private static final Duration TIMEOUT = Duration.ofSeconds(10); private static final int DMAAP_SERVICE_EXPOSED_PORT = 3904; - private static final List<String> messageBatchItems = Arrays.asList("I", "like", "pizza"); - private static final Flux<JsonPrimitive> messageBatch = Flux.fromIterable(messageBatchItems) - .map(JsonPrimitive::new); private static final String CONSUMER_GROUP = "group1"; private static final String CONSUMER_ID = "consumer200"; private static final String DMAAP_SERVICE_NAME = "dmaap"; @@ -82,7 +80,7 @@ class MessageRouterSubscriberCIT { private MessageRouterPublisher publisher = DmaapClientFactory .createMessageRouterPublisher(MessageRouterPublisherConfig.createDefault()); - private MessageRouterSubscriber sut = DmaapClientFactory + private MessageRouterSubscriber subscriber = DmaapClientFactory .createMessageRouterSubscriber(MessageRouterSubscriberConfig.createDefault()); @@ -105,7 +103,7 @@ class MessageRouterSubscriberCIT { .build(); //when - Mono<MessageRouterSubscribeResponse> response = sut + Mono<MessageRouterSubscribeResponse> response = subscriber .get(mrSubscribeRequest); //then @@ -116,12 +114,15 @@ class MessageRouterSubscriberCIT { } @Test - void subscriber_shouldGetCorrectResponse() { + void subscriberShouldHandleSingleItemResponse(){ //given final String topic = "TOPIC"; - final MessageRouterPublishRequest publishRequest = createMRPublishRequest(topic, "text/plain"); + final MessageRouterPublishRequest publishRequest = createMRPublishRequest(topic); final MessageRouterSubscribeRequest subscribeRequest = createMRSubscribeRequest(topic); - final JsonArray expectedItems = getAsJsonArray(messageBatchItems); + + final List<String> singleJsonMessage = Arrays.asList("{\"message\":\"message1\"}"); + final Flux<JsonObject> jsonMessageBatch = jsonBatch(singleJsonMessage); + final JsonArray expectedItems = getAsJsonArray(singleJsonMessage); final ImmutableMessageRouterSubscribeResponse expectedResponse = ImmutableMessageRouterSubscribeResponse .builder() .items(expectedItems) @@ -130,8 +131,8 @@ class MessageRouterSubscriberCIT { //when registerTopic(publishRequest, subscribeRequest); Mono<MessageRouterSubscribeResponse> response = publisher - .put(publishRequest, messageBatch) - .then(sut.get(subscribeRequest)); + .put(publishRequest, jsonMessageBatch) + .then(subscriber.get(subscribeRequest)); //then StepVerifier.create(response) @@ -140,6 +141,84 @@ class MessageRouterSubscriberCIT { .verify(); } + @Test + void subscriber_shouldHandleMultipleItemsResponse() { + //given + final String topic = "TOPIC2"; + final MessageRouterPublishRequest publishRequest = createMRPublishRequest(topic); + final MessageRouterSubscribeRequest subscribeRequest = createMRSubscribeRequest(topic); + + final List<String> twoJsonMessages = Arrays.asList("{\"message\":\"message1\"}", + "{\"differentMessage\":\"message2\"}"); + final Flux<JsonObject> jsonMessageBatch = jsonBatch(twoJsonMessages); + final JsonArray expectedItems = getAsJsonArray(twoJsonMessages); + final ImmutableMessageRouterSubscribeResponse expectedResponse = ImmutableMessageRouterSubscribeResponse + .builder() + .items(expectedItems) + .build(); + + //when + registerTopic(publishRequest, subscribeRequest); + Mono<MessageRouterSubscribeResponse> response = publisher + .put(publishRequest, jsonMessageBatch) + .then(subscriber.get(subscribeRequest)); + + //then + StepVerifier.create(response) + .expectNext(expectedResponse) + .expectComplete() + .verify(); + } + + @Test + void subscriber_shouldExtractItemsFromResponse() { + //given + final String topic = "TOPIC3"; + final MessageRouterPublishRequest publishRequest = createMRPublishRequest(topic); + final MessageRouterSubscribeRequest subscribeRequest = createMRSubscribeRequest(topic); + + final List<String> twoJsonMessages = Arrays.asList("{\"message\":\"message1\"}", + "{\"differentMessage\":\"message2\"}"); + final Flux<JsonObject> jsonMessageBatch = jsonBatch(twoJsonMessages); + + //when + registerTopic(publishRequest, subscribeRequest); + final Flux<String> result = publisher.put(publishRequest, jsonMessageBatch) + .thenMany(subscriber.getElements(subscribeRequest).map(JsonElement::getAsString)); + + //then + StepVerifier.create(result) + .expectNext(twoJsonMessages.get(0)) + .expectNext(twoJsonMessages.get(1)) + .expectComplete() + .verify(TIMEOUT); + } + + @Test + void subscriber_shouldSubscribeToTopic(){ + //given + final String topic = "TOPIC4"; + final MessageRouterPublishRequest publishRequest = createMRPublishRequest(topic); + final MessageRouterSubscribeRequest subscribeRequest = createMRSubscribeRequest(topic); + + final List<String> twoJsonMessages = Arrays.asList("{\"message\":\"message1\"}", + "{\"differentMessage\":\"message2\"}"); + final Flux<JsonObject> jsonMessageBatch = jsonBatch(twoJsonMessages); + + //when + registerTopic(publishRequest, subscribeRequest); + final Flux<String> result = publisher.put(publishRequest, jsonMessageBatch) + .thenMany(subscriber.subscribeForElements(subscribeRequest, Duration.ofSeconds(1)) + .map(JsonElement::getAsString)); + + //then + StepVerifier.create(result.take(2)) + .expectNext(twoJsonMessages.get(0)) + .expectNext(twoJsonMessages.get(1)) + .expectComplete() + .verify(TIMEOUT); + } + private static String getDockerComposeFilePath(String resourceName){ URL resource = MessageRouterSubscriberCIT.class.getClassLoader() .getResource(resourceName); @@ -149,8 +228,7 @@ class MessageRouterSubscriberCIT { .format("File %s does not exist", resourceName)); } - private static MessageRouterPublishRequest createMRPublishRequest(String topic, - String contentType) { + private static MessageRouterPublishRequest createMRPublishRequest(String topic){ MessageRouterSink sinkDefinition = ImmutableMessageRouterSink.builder() .name("the topic") .topicUrl(String.format("%s/%s", EVENTS_PATH, topic)) @@ -158,7 +236,6 @@ class MessageRouterSubscriberCIT { return ImmutableMessageRouterPublishRequest.builder() .sinkDefinition(sinkDefinition) - .contentType(contentType) .build(); } @@ -178,14 +255,21 @@ class MessageRouterSubscriberCIT { private void registerTopic(MessageRouterPublishRequest publishRequest, MessageRouterSubscribeRequest subscribeRequest) { - Flux<JsonPrimitive> sampleMessage = Flux.just("sample message").map(JsonPrimitive::new); + final List<String> sampleJsonMessages = Arrays.asList("{\"message\":\"message1\"}", + "{\"differentMessage\":\"message2\"}"); + final Flux<JsonObject> jsonMessageBatch = Flux.fromIterable(sampleJsonMessages) + .map(parser::parse).map(JsonElement::getAsJsonObject); - publisher.put(publishRequest, sampleMessage).blockLast(); - sut.get(subscribeRequest).block(); + publisher.put(publishRequest, jsonMessageBatch).blockLast(); + subscriber.get(subscribeRequest).block(); } private JsonArray getAsJsonArray(List<String> list) { String listsJsonString = gson.toJson(list); return new JsonParser().parse(listsJsonString).getAsJsonArray(); } + + private static Flux<JsonObject> jsonBatch(List<String> messages){ + return Flux.fromIterable(messages).map(parser::parse).map(JsonElement::getAsJsonObject); + } } diff --git a/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/impl/MessageRouterPublisherImplTest.java b/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/impl/MessageRouterPublisherImplTest.java index f0138d2c..f44e86a5 100644 --- a/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/impl/MessageRouterPublisherImplTest.java +++ b/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/impl/MessageRouterPublisherImplTest.java @@ -27,21 +27,24 @@ import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; -import com.google.gson.Gson; import com.google.gson.JsonArray; +import com.google.gson.JsonElement; import com.google.gson.JsonObject; -import com.google.gson.JsonPrimitive; -import io.netty.buffer.ByteBuf; +import com.google.gson.JsonParser; +import com.google.gson.Gson; import io.netty.buffer.ByteBufAllocator; -import io.netty.buffer.CompositeByteBuf; +import io.netty.handler.codec.http.HttpHeaderValues; import java.nio.charset.StandardCharsets; import java.time.Duration; +import java.util.Arrays; import java.util.List; +import java.util.stream.Collectors; +import java.util.stream.Stream; import org.junit.jupiter.api.Test; import org.mockito.ArgumentCaptor; -import org.mockito.verification.VerificationMode; import org.onap.dcaegen2.services.sdk.model.streams.dmaap.ImmutableMessageRouterSink; import org.onap.dcaegen2.services.sdk.model.streams.dmaap.MessageRouterSink; +import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.HttpHeaders; import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.HttpMethod; import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.HttpRequest; import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.HttpResponse; @@ -60,34 +63,29 @@ import reactor.test.StepVerifier; * @since April 2019 */ class MessageRouterPublisherImplTest { - private static final Duration TIMEOUT = Duration.ofSeconds(5); - private final RxHttpClient httpClient = mock(RxHttpClient.class); - private final MessageRouterPublisher cut = new MessageRouterPublisherImpl(httpClient, 3, Duration.ofMinutes(1)); - - private final ArgumentCaptor<HttpRequest> httpRequestArgumentCaptor = ArgumentCaptor.forClass(HttpRequest.class); - private final MessageRouterSink sinkDefinition = ImmutableMessageRouterSink.builder() + private static final JsonParser parser = new JsonParser(); + private static final MessageRouterSink sinkDefinition = ImmutableMessageRouterSink.builder() .name("the topic") .topicUrl("https://dmaap-mr/TOPIC") .build(); - private final MessageRouterPublishRequest mrRequest = ImmutableMessageRouterPublishRequest.builder() - .sinkDefinition(sinkDefinition) - .build(); - private final HttpResponse httpResponse = ImmutableHttpResponse.builder() - .statusCode(200) - .statusReason("OK") - .url(sinkDefinition.topicUrl()) - .rawBody("[]".getBytes()) - .build(); + private final RxHttpClient httpClient = mock(RxHttpClient.class); + private final MessageRouterPublisher cut = new MessageRouterPublisherImpl(httpClient, 3, Duration.ofMinutes(1)); + private final ArgumentCaptor<HttpRequest> httpRequestArgumentCaptor = ArgumentCaptor.forClass(HttpRequest.class); + private final MessageRouterPublishRequest mrRequestTextPlain = createMRRPublishRequest(); + private final MessageRouterPublishRequest mrRequestJson = createMRRPublishRequest(); + private final HttpResponse successHttpResponse = createHttpResponse("OK", 200); @Test void puttingElementsShouldYieldNonChunkedHttpRequest() { // given - given(httpClient.call(any(HttpRequest.class))).willReturn(Mono.just(httpResponse)); + final List<String> threeJsonMessages = getAsMRJsonMessages(Arrays.asList("I", "like", "cookies")); + final Flux<JsonObject> singleJsonMessageBatch = jsonBatch(threeJsonMessages); + given(httpClient.call(any(HttpRequest.class))).willReturn(Mono.just(successHttpResponse)); // when final Flux<MessageRouterPublishResponse> responses = cut - .put(mrRequest, Flux.just("I", "like", "cookies").map(JsonPrimitive::new)); + .put(mrRequestTextPlain, singleJsonMessageBatch); responses.then().block(); // then @@ -102,55 +100,81 @@ class MessageRouterPublisherImplTest { @Test void puttingLowNumberOfElementsShouldYieldSingleHttpRequest() { // given - given(httpClient.call(any(HttpRequest.class))).willReturn(Mono.just(httpResponse)); + final List<String> threeJsonMessages = getAsMRJsonMessages(Arrays.asList("I", "like", "cookies")); + final Flux<JsonObject> singleJsonMessageBatch = jsonBatch(threeJsonMessages); + given(httpClient.call(any(HttpRequest.class))).willReturn(Mono.just(successHttpResponse)); // when final Flux<MessageRouterPublishResponse> responses = cut - .put(mrRequest, Flux.just("I", "like", "cookies").map(JsonPrimitive::new)); + .put(mrRequestJson, singleJsonMessageBatch); responses.then().block(); // then verify(httpClient).call(httpRequestArgumentCaptor.capture()); final HttpRequest httpRequest = httpRequestArgumentCaptor.getValue(); - final JsonArray elementsInRequest = extractNonEmptyRequestBody(httpRequest); + final JsonArray elementsInRequest = extractNonEmptyJsonRequestBody(httpRequest); assertThat(elementsInRequest.size()).isEqualTo(3); - assertThat(elementsInRequest.get(0).getAsString()).isEqualTo("I"); - assertThat(elementsInRequest.get(1).getAsString()).isEqualTo("like"); - assertThat(elementsInRequest.get(2).getAsString()).isEqualTo("cookies"); + assertThat(elementsInRequest.get(0).toString()).isEqualTo(threeJsonMessages.get(0)); + assertThat(elementsInRequest.get(1).toString()).isEqualTo(threeJsonMessages.get(1)); + assertThat(elementsInRequest.get(2).toString()).isEqualTo(threeJsonMessages.get(2)); + } + + @Test + void puttingElementsWithoutContentTypeSetShouldUseApplicationJson(){ + // given + final List<String> threeJsonMessages = getAsMRJsonMessages(Arrays.asList("I", "like", "cookies")); + final Flux<JsonObject> singleJsonMessageBatch = jsonBatch(threeJsonMessages); + given(httpClient.call(any(HttpRequest.class))).willReturn(Mono.just(successHttpResponse)); + + // when + final Flux<MessageRouterPublishResponse> responses = cut + .put(mrRequestJson, singleJsonMessageBatch); + responses.then().block(); + + // then + verify(httpClient).call(httpRequestArgumentCaptor.capture()); + final HttpRequest httpRequest = httpRequestArgumentCaptor.getValue(); + assertThat(httpRequest.headers().getOrElse(HttpHeaders.CONTENT_TYPE, "")) + .isEqualTo(HttpHeaderValues.APPLICATION_JSON.toString()); } @Test void puttingLowNumberOfElementsShouldReturnSingleResponse() { // given - given(httpClient.call(any(HttpRequest.class))).willReturn(Mono.just(httpResponse)); + final List<String> threeJsonMessages = getAsMRJsonMessages(Arrays.asList("I", "like", "cookies")); + final Flux<JsonObject> singleJsonMessageBatch = jsonBatch(threeJsonMessages); + given(httpClient.call(any(HttpRequest.class))).willReturn(Mono.just(successHttpResponse)); // when final Flux<MessageRouterPublishResponse> responses = cut - .put(mrRequest, Flux.just("I", "like", "cookies").map(JsonPrimitive::new)); + .put(mrRequestJson, singleJsonMessageBatch); // then StepVerifier.create(responses) .consumeNextWith(response -> { assertThat(response.successful()).describedAs("successful").isTrue(); assertThat(response.items()).containsExactly( - new JsonPrimitive("I"), - new JsonPrimitive("like"), - new JsonPrimitive("cookies")); + getAsJsonObject(threeJsonMessages.get(0)), + getAsJsonObject(threeJsonMessages.get(1)), + getAsJsonObject(threeJsonMessages.get(2))); }) .expectComplete() .verify(TIMEOUT); } - @Test void puttingHighNumberOfElementsShouldYieldMultipleHttpRequests() { // given - given(httpClient.call(any(HttpRequest.class))).willReturn(Mono.just(httpResponse)); + final List<String> threeJsonMessages = getAsMRJsonMessages(Arrays.asList("I", "like", "cookies")); + final List<String> twoJsonMessages = getAsMRJsonMessages(Arrays.asList("and", "pierogi")); + final Flux<JsonObject> doubleJsonMessageBatch = jsonBatch(concat( + threeJsonMessages, twoJsonMessages)); + + given(httpClient.call(any(HttpRequest.class))).willReturn(Mono.just(successHttpResponse)); // when final Flux<MessageRouterPublishResponse> responses = cut - .put(mrRequest, Flux.just("I", "like", "cookies", "and", "pierogi").map(JsonPrimitive::new)); - + .put(mrRequestJson, doubleJsonMessageBatch); // then responses.then().block(); @@ -158,53 +182,97 @@ class MessageRouterPublisherImplTest { final List<HttpRequest> httpRequests = httpRequestArgumentCaptor.getAllValues(); assertThat(httpRequests.size()).describedAs("number of requests").isEqualTo(2); - final JsonArray firstRequest = extractNonEmptyRequestBody(httpRequests.get(0)); + final JsonArray firstRequest = extractNonEmptyJsonRequestBody(httpRequests.get(0)); assertThat(firstRequest.size()).isEqualTo(3); - assertThat(firstRequest.get(0).getAsString()).isEqualTo("I"); - assertThat(firstRequest.get(1).getAsString()).isEqualTo("like"); - assertThat(firstRequest.get(2).getAsString()).isEqualTo("cookies"); + assertThat(firstRequest.get(0).toString()).isEqualTo(threeJsonMessages.get(0)); + assertThat(firstRequest.get(1).toString()).isEqualTo(threeJsonMessages.get(1)); + assertThat(firstRequest.get(2).toString()).isEqualTo(threeJsonMessages.get(2)); - final JsonArray secondRequest = extractNonEmptyRequestBody(httpRequests.get(1)); + final JsonArray secondRequest = extractNonEmptyJsonRequestBody(httpRequests.get(1)); assertThat(secondRequest.size()).isEqualTo(2); - assertThat(secondRequest.get(0).getAsString()).isEqualTo("and"); - assertThat(secondRequest.get(1).getAsString()).isEqualTo("pierogi"); + assertThat(secondRequest.get(0).toString()).isEqualTo(twoJsonMessages.get(0)); + assertThat(secondRequest.get(1).toString()).isEqualTo(twoJsonMessages.get(1)); } @Test void puttingHighNumberOfElementsShouldReturnMoreResponses() { // given - given(httpClient.call(any(HttpRequest.class))).willReturn(Mono.just(httpResponse)); + final List<String> threeJsonMessages = getAsMRJsonMessages(Arrays.asList("I", "like", "cookies")); + final List<String> twoJsonMessages = getAsMRJsonMessages(Arrays.asList("and", "pierogi")); + final Flux<JsonObject> doubleJsonMessageBatch = jsonBatch(concat( + threeJsonMessages, twoJsonMessages)); + given(httpClient.call(any(HttpRequest.class))).willReturn(Mono.just(successHttpResponse)); // when final Flux<MessageRouterPublishResponse> responses = cut - .put(mrRequest, Flux.just("I", "like", "cookies", "and", "pierogi").map(JsonPrimitive::new)); + .put(mrRequestJson, doubleJsonMessageBatch); // then StepVerifier.create(responses) .consumeNextWith(response -> { assertThat(response.successful()).describedAs("successful").isTrue(); assertThat(response.items()).containsExactly( - new JsonPrimitive("I"), - new JsonPrimitive("like"), - new JsonPrimitive("cookies")); + getAsJsonObject(threeJsonMessages.get(0)), + getAsJsonObject(threeJsonMessages.get(1)), + getAsJsonObject(threeJsonMessages.get(2))); }) .consumeNextWith(response -> { assertThat(response.successful()).describedAs("successful").isTrue(); assertThat(response.items()).containsExactly( - new JsonPrimitive("and"), - new JsonPrimitive("pierogi")); + getAsJsonObject(twoJsonMessages.get(0)), + getAsJsonObject(twoJsonMessages.get(1))); }) .expectComplete() .verify(TIMEOUT); } - private JsonArray extractNonEmptyRequestBody(HttpRequest httpRequest) { + private static List<String> getAsMRJsonMessages(List<String> plainTextMessages){ + return plainTextMessages.stream() + .map(message -> String.format("{\"message\":\"%s\"}", message)) + .collect(Collectors.toList()); + } + + + private static Flux<JsonObject> jsonBatch(List<String> messages){ + return Flux.fromIterable(messages).map(parser::parse).map(JsonElement::getAsJsonObject); + } + + private static List<String> concat(List<String> firstList, List<String> secondList){ + return Stream.concat(firstList.stream(), secondList.stream()).collect(Collectors.toList()); + } + + private static HttpResponse createHttpResponse(String statusReason, int statusCode){ + return ImmutableHttpResponse.builder() + .statusCode(statusCode) + .url(sinkDefinition.topicUrl()) + .statusReason(statusReason) + .rawBody("[]".getBytes()) + .build(); + } + + private static MessageRouterPublishRequest createMRRPublishRequest(){ + return ImmutableMessageRouterPublishRequest + .builder() + .sinkDefinition(sinkDefinition) + .build(); + } + + private String collectNonEmptyRequestBody(HttpRequest httpRequest){ final String body = Flux.from(httpRequest.body().contents()) .collect(ByteBufAllocator.DEFAULT::compositeBuffer, (byteBufs, buffer) -> byteBufs.addComponent(true, buffer)) .map(byteBufs -> byteBufs.toString(StandardCharsets.UTF_8)) .block(); assertThat(body).describedAs("request body").isNotBlank(); - return new Gson().fromJson(body, JsonArray.class); + + return body; + } + + private JsonArray extractNonEmptyJsonRequestBody(HttpRequest httpRequest){ + return new Gson().fromJson(collectNonEmptyRequestBody(httpRequest), JsonArray.class); + } + + private JsonObject getAsJsonObject(String item){ + return new Gson().fromJson(item, JsonObject.class); } }
\ No newline at end of file |