diff options
Diffstat (limited to 'rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterSubscriberCIT.java')
-rw-r--r-- | rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterSubscriberCIT.java | 269 |
1 files changed, 0 insertions, 269 deletions
diff --git a/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterSubscriberCIT.java b/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterSubscriberCIT.java deleted file mode 100644 index 32b77a1d..00000000 --- a/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterSubscriberCIT.java +++ /dev/null @@ -1,269 +0,0 @@ -/* - * ============LICENSE_START==================================== - * DCAEGEN2-SERVICES-SDK - * ========================================================= - * Copyright (C) 2019 Nokia. All rights reserved. - * ========================================================= - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END===================================== - */ - -package org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api; - -import com.google.gson.Gson; -import com.google.gson.JsonElement; -import com.google.gson.JsonObject; -import com.google.gson.JsonParser; -import io.vavr.collection.List; -import java.io.File; -import java.net.URL; -import java.time.Duration; -import org.junit.jupiter.api.BeforeAll; -import org.junit.jupiter.api.Test; -import org.onap.dcaegen2.services.sdk.model.streams.dmaap.ImmutableMessageRouterSink; -import org.onap.dcaegen2.services.sdk.model.streams.dmaap.ImmutableMessageRouterSource; -import org.onap.dcaegen2.services.sdk.model.streams.dmaap.MessageRouterSink; -import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.ImmutableMessageRouterPublishRequest; -import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.ImmutableMessageRouterSubscribeRequest; -import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.ImmutableMessageRouterSubscribeResponse; -import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishRequest; -import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterSubscribeRequest; -import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterSubscribeResponse; -import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.MessageRouterPublisherConfig; -import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.MessageRouterSubscriberConfig; -import org.testcontainers.containers.DockerComposeContainer; -import org.testcontainers.junit.jupiter.Container; -import org.testcontainers.junit.jupiter.Testcontainers; -import reactor.core.publisher.Flux; -import reactor.core.publisher.Mono; -import reactor.test.StepVerifier; - -@Testcontainers -class MessageRouterSubscriberCIT { - private static final JsonParser parser = new JsonParser(); - private static final Duration TIMEOUT = Duration.ofSeconds(10); - private static final int DMAAP_SERVICE_EXPOSED_PORT = 3904; - private static final String CONSUMER_GROUP = "group1"; - private static final String CONSUMER_ID = "consumer200"; - private static final String DMAAP_SERVICE_NAME = "dmaap"; - private static final String MR_COMPOSE_RESOURCE_NAME = "dmaap-msg-router/message-router-compose.yml"; - private static final String DOCKER_COMPOSE_FILE_PATH = getDockerComposeFilePath( - MR_COMPOSE_RESOURCE_NAME); - private static final String DMAAP_404_ERROR_RESPONSE_FORMAT = "404 Not Found\n" + - "{" + - "\"mrstatus\":3001," + - "\"helpURL\":\"http://onap.readthedocs.io\"," + - "\"message\":\"No such topic exists.-[%s]\"," + - "\"status\":404" + - "}"; - - @Container - private static final DockerComposeContainer CONTAINER = new DockerComposeContainer( - new File(DOCKER_COMPOSE_FILE_PATH)) - .withExposedService(DMAAP_SERVICE_NAME, DMAAP_SERVICE_EXPOSED_PORT); - - private static String EVENTS_PATH; - - private MessageRouterPublisher publisher = DmaapClientFactory - .createMessageRouterPublisher(MessageRouterPublisherConfig.createDefault()); - private MessageRouterSubscriber subscriber = DmaapClientFactory - .createMessageRouterSubscriber(MessageRouterSubscriberConfig.createDefault()); - - - @BeforeAll - static void setUp() { - EVENTS_PATH = String.format("http://%s:%d/events", - CONTAINER.getServiceHost(DMAAP_SERVICE_NAME, DMAAP_SERVICE_EXPOSED_PORT), - CONTAINER.getServicePort(DMAAP_SERVICE_NAME, DMAAP_SERVICE_EXPOSED_PORT)); - } - - @Test - void subscriber_shouldHandleNoSuchTopicException() { - //given - final String topic = "newTopic"; - final MessageRouterSubscribeRequest mrSubscribeRequest = createMRSubscribeRequest(topic); - final String expectedFailReason = String.format(DMAAP_404_ERROR_RESPONSE_FORMAT, topic); - final MessageRouterSubscribeResponse expectedResponse = ImmutableMessageRouterSubscribeResponse - .builder() - .failReason(expectedFailReason) - .build(); - - //when - Mono<MessageRouterSubscribeResponse> response = subscriber - .get(mrSubscribeRequest); - - //then - StepVerifier.create(response) - .expectNext(expectedResponse) - .expectComplete() - .verify(TIMEOUT); - } - - @Test - void subscriberShouldHandleSingleItemResponse(){ - //given - final String topic = "TOPIC"; - final MessageRouterPublishRequest publishRequest = createMRPublishRequest(topic); - final MessageRouterSubscribeRequest subscribeRequest = createMRSubscribeRequest(topic); - - final List<String> singleJsonMessage = List.of("{\"message\":\"message1\"}"); - final List<JsonElement> expectedItems = singleJsonMessage.map(parser::parse); - final Flux<JsonObject> jsonMessageBatch = jsonBatch(singleJsonMessage); - final ImmutableMessageRouterSubscribeResponse expectedResponse = ImmutableMessageRouterSubscribeResponse - .builder() - .items(expectedItems) - .build(); - - //when - registerTopic(publishRequest, subscribeRequest); - Mono<MessageRouterSubscribeResponse> response = publisher - .put(publishRequest, jsonMessageBatch) - .then(subscriber.get(subscribeRequest)); - - //then - StepVerifier.create(response) - .expectNext(expectedResponse) - .expectComplete() - .verify(); - } - - @Test - void subscriber_shouldHandleMultipleItemsResponse() { - //given - final String topic = "TOPIC2"; - final MessageRouterPublishRequest publishRequest = createMRPublishRequest(topic); - final MessageRouterSubscribeRequest subscribeRequest = createMRSubscribeRequest(topic); - - final List<String> twoJsonMessages = List.of("{\"message\":\"message1\"}", - "{\"differentMessage\":\"message2\"}"); - final List<JsonElement> expectedElements = twoJsonMessages.map(parser::parse); - final Flux<JsonObject> jsonMessageBatch = jsonBatch(twoJsonMessages); - final ImmutableMessageRouterSubscribeResponse expectedResponse = ImmutableMessageRouterSubscribeResponse - .builder() - .items(expectedElements) - .build(); - - //when - registerTopic(publishRequest, subscribeRequest); - Mono<MessageRouterSubscribeResponse> response = publisher - .put(publishRequest, jsonMessageBatch) - .then(subscriber.get(subscribeRequest)); - - //then - StepVerifier.create(response) - .expectNext(expectedResponse) - .expectComplete() - .verify(); - } - - @Test - void subscriber_shouldExtractItemsFromResponse() { - //given - final String topic = "TOPIC3"; - final MessageRouterPublishRequest publishRequest = createMRPublishRequest(topic); - final MessageRouterSubscribeRequest subscribeRequest = createMRSubscribeRequest(topic); - - final List<String> twoJsonMessages = List.of("{\"message\":\"message1\"}", - "{\"differentMessage\":\"message2\"}"); - final Flux<JsonObject> jsonMessageBatch = jsonBatch(twoJsonMessages); - - //when - registerTopic(publishRequest, subscribeRequest); - final Flux<JsonElement> result = publisher.put(publishRequest, jsonMessageBatch) - .thenMany(subscriber.getElements(subscribeRequest)); - - //then - StepVerifier.create(result) - .expectNext(getAsJsonObject(twoJsonMessages.get(0))) - .expectNext(getAsJsonObject(twoJsonMessages.get(1))) - .expectComplete() - .verify(TIMEOUT); - } - - @Test - void subscriber_shouldSubscribeToTopic(){ - //given - final String topic = "TOPIC4"; - final MessageRouterPublishRequest publishRequest = createMRPublishRequest(topic); - final MessageRouterSubscribeRequest subscribeRequest = createMRSubscribeRequest(topic); - - final List<String> twoJsonMessages = List.of("{\"message\":\"message1\"}", - "{\"differentMessage\":\"message2\"}"); - final List<JsonElement> messages = twoJsonMessages.map(parser::parse); - final Flux<JsonObject> jsonMessageBatch = jsonBatch(twoJsonMessages); - - //when - registerTopic(publishRequest, subscribeRequest); - final Flux<JsonElement> result = publisher.put(publishRequest, jsonMessageBatch) - .thenMany(subscriber.subscribeForElements(subscribeRequest, Duration.ofSeconds(1))); - - //then - StepVerifier.create(result.take(2)) - .expectNext(messages.get(0)) - .expectNext(messages.get(1)) - .expectComplete() - .verify(TIMEOUT); - } - - private static String getDockerComposeFilePath(String resourceName){ - URL resource = MessageRouterSubscriberCIT.class.getClassLoader() - .getResource(resourceName); - - if(resource != null) return resource.getFile(); - else throw new DockerComposeNotFoundException(String - .format("File %s does not exist", resourceName)); - } - - private static MessageRouterPublishRequest createMRPublishRequest(String topic){ - MessageRouterSink sinkDefinition = ImmutableMessageRouterSink.builder() - .name("the topic") - .topicUrl(String.format("%s/%s", EVENTS_PATH, topic)) - .build(); - - return ImmutableMessageRouterPublishRequest.builder() - .sinkDefinition(sinkDefinition) - .build(); - } - - private MessageRouterSubscribeRequest createMRSubscribeRequest(String topic) { - ImmutableMessageRouterSource sourceDefinition = ImmutableMessageRouterSource.builder() - .name("the topic") - .topicUrl(String.format("%s/%s", EVENTS_PATH, topic)) - .build(); - - return ImmutableMessageRouterSubscribeRequest - .builder() - .sourceDefinition(sourceDefinition) - .consumerGroup(CONSUMER_GROUP) - .consumerId(CONSUMER_ID) - .build(); - } - - private void registerTopic(MessageRouterPublishRequest publishRequest, - MessageRouterSubscribeRequest subscribeRequest) { - final List<String> sampleJsonMessages = List.of("{\"message\":\"message1\"}", - "{\"differentMessage\":\"message2\"}"); - final Flux<JsonObject> jsonMessageBatch = jsonBatch(sampleJsonMessages); - - publisher.put(publishRequest, jsonMessageBatch).blockLast(); - subscriber.get(subscribeRequest).block(); - } - - private static Flux<JsonObject> jsonBatch(List<String> messages){ - return Flux.fromIterable(messages).map(parser::parse).map(JsonElement::getAsJsonObject); - } - - private JsonObject getAsJsonObject(String item){ - return new Gson().fromJson(item, JsonObject.class); - } -} |