diff options
6 files changed, 338 insertions, 0 deletions
@@ -40,6 +40,7 @@ <jetbrains-annotations.version>16.0.3</jetbrains-annotations.version> <protoc-jar-maven-plugin.version>3.6.0.2</protoc-jar-maven-plugin.version> <micrometer.version>1.1.4</micrometer.version> + <testcontainers.version>1.11.2</testcontainers.version> </properties> <modules> @@ -242,6 +243,18 @@ <version>${assertj-core.version}</version> <scope>test</scope> </dependency> + <dependency> + <groupId>org.testcontainers</groupId> + <artifactId>testcontainers</artifactId> + <version>${testcontainers.version}</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.testcontainers</groupId> + <artifactId>junit-jupiter</artifactId> + <version>${testcontainers.version}</version> + <scope>test</scope> + </dependency> </dependencies> </dependencyManagement> </project>
\ No newline at end of file diff --git a/rest-services/dmaap-client/pom.xml b/rest-services/dmaap-client/pom.xml index 799a8a48..86a4ab07 100644 --- a/rest-services/dmaap-client/pom.xml +++ b/rest-services/dmaap-client/pom.xml @@ -70,5 +70,13 @@ <version>${project.version}</version> <scope>test</scope> </dependency> + <dependency> + <groupId>org.testcontainers</groupId> + <artifactId>testcontainers</artifactId> + </dependency> + <dependency> + <groupId>org.testcontainers</groupId> + <artifactId>junit-jupiter</artifactId> + </dependency> </dependencies> </project>
\ No newline at end of file diff --git a/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/DockerComposeNotFoundException.java b/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/DockerComposeNotFoundException.java new file mode 100644 index 00000000..727f17ac --- /dev/null +++ b/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/DockerComposeNotFoundException.java @@ -0,0 +1,27 @@ +/* + * ============LICENSE_START==================================== + * DCAEGEN2-SERVICES-SDK + * ========================================================= + * Copyright (C) 2019 Nokia. All rights reserved. + * ========================================================= + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END===================================== + */ + +package org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api; + +class DockerComposeNotFoundException extends RuntimeException{ + DockerComposeNotFoundException(String s){ + super(s); + } +} diff --git a/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterSubscriberCIT.java b/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterSubscriberCIT.java new file mode 100644 index 00000000..5b113503 --- /dev/null +++ b/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterSubscriberCIT.java @@ -0,0 +1,191 @@ +/* + * ============LICENSE_START==================================== + * DCAEGEN2-SERVICES-SDK + * ========================================================= + * Copyright (C) 2019 Nokia. All rights reserved. + * ========================================================= + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END===================================== + */ + +package org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api; + +import com.google.gson.Gson; +import com.google.gson.JsonArray; +import com.google.gson.JsonParser; +import com.google.gson.JsonPrimitive; +import java.io.File; +import java.net.URL; +import java.time.Duration; +import java.util.Arrays; +import java.util.List; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Disabled; +import org.junit.jupiter.api.Test; +import org.onap.dcaegen2.services.sdk.model.streams.dmaap.ImmutableMessageRouterSink; +import org.onap.dcaegen2.services.sdk.model.streams.dmaap.ImmutableMessageRouterSource; +import org.onap.dcaegen2.services.sdk.model.streams.dmaap.MessageRouterSink; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.ImmutableMessageRouterPublishRequest; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.ImmutableMessageRouterSubscribeRequest; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.ImmutableMessageRouterSubscribeResponse; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishRequest; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterSubscribeRequest; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterSubscribeResponse; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.MessageRouterPublisherConfig; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.MessageRouterSubscriberConfig; +import org.testcontainers.containers.DockerComposeContainer; +import org.testcontainers.junit.jupiter.Container; +import org.testcontainers.junit.jupiter.Testcontainers; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; +import reactor.test.StepVerifier; + +@Disabled("Disabled until fix messages formatting in MessageRouterPublisher::put ") +@Testcontainers +class MessageRouterSubscriberCIT { + private static final Gson gson = new Gson(); + private static final Duration TIMEOUT = Duration.ofSeconds(10); + private static final int DMAAP_SERVICE_EXPOSED_PORT = 3904; + private static final List<String> messageBatchItems = Arrays.asList("I", "like", "pizza"); + private static final Flux<JsonPrimitive> messageBatch = Flux.fromIterable(messageBatchItems) + .map(JsonPrimitive::new); + private static final String CONSUMER_GROUP = "group1"; + private static final String CONSUMER_ID = "consumer200"; + private static final String DMAAP_SERVICE_NAME = "dmaap"; + private static final String MR_COMPOSE_RESOURCE_NAME = "dmaap-msg-router/message-router-compose.yml"; + private static final String DOCKER_COMPOSE_FILE_PATH = getDockerComposeFilePath( + MR_COMPOSE_RESOURCE_NAME); + private static final String DMAAP_404_ERROR_RESPONSE_FORMAT = "404 Not Found\n" + + "{" + + "\"mrstatus\":3001," + + "\"helpURL\":\"http://onap.readthedocs.io\"," + + "\"message\":\"No such topic exists.-[%s]\"," + + "\"status\":404" + + "}"; + + @Container + private static final DockerComposeContainer CONTAINER = new DockerComposeContainer( + new File(DOCKER_COMPOSE_FILE_PATH)) + .withExposedService(DMAAP_SERVICE_NAME, DMAAP_SERVICE_EXPOSED_PORT); + + private static String EVENTS_PATH; + + private MessageRouterPublisher publisher = DmaapClientFactory + .createMessageRouterPublisher(MessageRouterPublisherConfig.createDefault()); + private MessageRouterSubscriber sut = DmaapClientFactory + .createMessageRouterSubscriber(MessageRouterSubscriberConfig.createDefault()); + + + @BeforeAll + static void setUp() { + EVENTS_PATH = String.format("http://%s:%d/events", + CONTAINER.getServiceHost(DMAAP_SERVICE_NAME, DMAAP_SERVICE_EXPOSED_PORT), + CONTAINER.getServicePort(DMAAP_SERVICE_NAME, DMAAP_SERVICE_EXPOSED_PORT)); + } + + @Test + void subscriber_shouldHandleNoSuchTopicException() { + //given + final String topic = "newTopic"; + final MessageRouterSubscribeRequest mrSubscribeRequest = createMRSubscribeRequest(topic); + final String expectedFailReason = String.format(DMAAP_404_ERROR_RESPONSE_FORMAT, topic); + final MessageRouterSubscribeResponse expectedResponse = ImmutableMessageRouterSubscribeResponse + .builder() + .failReason(expectedFailReason) + .build(); + + //when + Mono<MessageRouterSubscribeResponse> response = sut + .get(mrSubscribeRequest); + + //then + StepVerifier.create(response) + .expectNext(expectedResponse) + .expectComplete() + .verify(TIMEOUT); + } + + @Test + void subscriber_shouldGetCorrectResponse() { + //given + final String topic = "TOPIC"; + final MessageRouterPublishRequest publishRequest = createMRPublishRequest(topic, "text/plain"); + final MessageRouterSubscribeRequest subscribeRequest = createMRSubscribeRequest(topic); + final JsonArray expectedItems = getAsJsonArray(messageBatchItems); + final ImmutableMessageRouterSubscribeResponse expectedResponse = ImmutableMessageRouterSubscribeResponse + .builder() + .items(expectedItems) + .build(); + + //when + registerTopic(publishRequest, subscribeRequest); + Mono<MessageRouterSubscribeResponse> response = publisher + .put(publishRequest, messageBatch) + .then(sut.get(subscribeRequest)); + + //then + StepVerifier.create(response) + .expectNext(expectedResponse) + .expectComplete() + .verify(); + } + + private static String getDockerComposeFilePath(String resourceName){ + URL resource = MessageRouterSubscriberCIT.class.getClassLoader() + .getResource(resourceName); + + if(resource != null) return resource.getFile(); + else throw new DockerComposeNotFoundException(String + .format("File %s does not exist", resourceName)); + } + + private static MessageRouterPublishRequest createMRPublishRequest(String topic, + String contentType) { + MessageRouterSink sinkDefinition = ImmutableMessageRouterSink.builder() + .name("the topic") + .topicUrl(String.format("%s/%s", EVENTS_PATH, topic)) + .build(); + + return ImmutableMessageRouterPublishRequest.builder() + .sinkDefinition(sinkDefinition) + .contentType(contentType) + .build(); + } + + private MessageRouterSubscribeRequest createMRSubscribeRequest(String topic) { + ImmutableMessageRouterSource sourceDefinition = ImmutableMessageRouterSource.builder() + .name("the topic") + .topicUrl(String.format("%s/%s", EVENTS_PATH, topic)) + .build(); + + return ImmutableMessageRouterSubscribeRequest + .builder() + .sourceDefinition(sourceDefinition) + .consumerGroup(CONSUMER_GROUP) + .consumerId(CONSUMER_ID) + .build(); + } + + private void registerTopic(MessageRouterPublishRequest publishRequest, + MessageRouterSubscribeRequest subscribeRequest) { + Flux<JsonPrimitive> sampleMessage = Flux.just("sample message").map(JsonPrimitive::new); + + publisher.put(publishRequest, sampleMessage).blockLast(); + sut.get(subscribeRequest).block(); + } + + private JsonArray getAsJsonArray(List<String> list) { + String listsJsonString = gson.toJson(list); + return new JsonParser().parse(listsJsonString).getAsJsonArray(); + } +} diff --git a/rest-services/dmaap-client/src/test/resources/dmaap-msg-router/MsgRtrApi.properties b/rest-services/dmaap-client/src/test/resources/dmaap-msg-router/MsgRtrApi.properties new file mode 100644 index 00000000..68b0f1e6 --- /dev/null +++ b/rest-services/dmaap-client/src/test/resources/dmaap-msg-router/MsgRtrApi.properties @@ -0,0 +1,51 @@ +############################################################################### +# ============LICENSE_START======================================================= +# org.onap.dmaap +# ================================================================================ +# Copyright � 2017 AT&T Intellectual Property. All rights reserved. +# ================================================================================ +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============LICENSE_END========================================================= +# +# ECOMP is a trademark and service mark of AT&T Intellectual Property. +# +############################################################################### + +authentication.adminSecret=fe3cCompound +kafka.max.poll.interval.ms=300000 +kafka.heartbeat.interval.ms=60000 +kafka.session.timeout.ms=240000 +kafka.max.poll.records=1000 +msgRtr.namespace.aaf=org.onap.dmaap.mr.topic +msgRtr.topicfactory.aaf=org.onap.dmaap.mr.topicFactory|:org.onap.dmaap.mr.topic: +enforced.topic.name.AAF=org.onap.dmaap.mr +forceAAF=false +transidUEBtopicreqd=false +defaultNSforUEB=org.onap.dmaap.mr +consumer.timeout=17 +maxcontentlength=10000 + + +############################################################################### +## Kafka Connection + +config.zk.servers=zookeeper +kafka.metadata.broker.list=kafka:9092 +consumer.timeout.ms=100 +zookeeper.connection.timeout.ms=6000 +zookeeper.session.timeout.ms=20000 +zookeeper.sync.time.ms=2000 +auto.commit.interval.ms=1000 +fetch.message.max.bytes =1000000 +auto.commit.enable=false +kafka.rebalance.backoff.ms=10000 +kafka.rebalance.max.retries=6
\ No newline at end of file diff --git a/rest-services/dmaap-client/src/test/resources/dmaap-msg-router/message-router-compose.yml b/rest-services/dmaap-client/src/test/resources/dmaap-msg-router/message-router-compose.yml new file mode 100644 index 00000000..85465cbf --- /dev/null +++ b/rest-services/dmaap-client/src/test/resources/dmaap-msg-router/message-router-compose.yml @@ -0,0 +1,48 @@ +version: '2' +services: + zookeeper: + image: nexus3.onap.org:10001/onap/dmaap/zookeeper:6.0.0 + ports: + - "2181:2181" + + kafka: + image: nexus3.onap.org:10001/onap/dmaap/kafka111:1.0.1 + ports: + - "9092:9092" + environment: + # For creating authenticated topics add AAF locate aplication ip address to host alias aaf-onap-test.osaaf.org + # For creating the authenticated topics enable the following property + enableCadi: 'false' + KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 + KAFKA_LOG_DIRS: /opt/kafka/data + KAFKA_ZOOKEEPER_CONNECTION_TIMEOUT_MS: 40000 + KAFKA_ZOOKEEPER_SESSION_TIMEOUT_MS: 40000 + # Uncomment the following lines to create authenticated topics + #KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INTERNAL_SASL_PLAINTEXT:SASL_PLAINTEXT,EXTERNAL_SASL_PLAINTEXT:SASL_PLAINTEXT + #KAFKA_ADVERTISED_LISTENERS: INTERNAL_SASL_PLAINTEXT://kafka:9092 + #KAFKA_LISTENERS: INTERNAL_SASL_PLAINTEXT://0.0.0.0:9092 + #KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL_SASL_PLAINTEXT + #KAFKA_SASL_ENABLED_MECHANISMS: PLAIN + #KAFKA_SASL_MECHANISM_INTER_BROKER_PROTOCOL: PLAIN + #KAFKA_AUTHORIZER_CLASS_NAME: org.onap.dmaap.kafkaAuthorize.KafkaCustomAuthorizer + #aaf_locate_url: https://aaf-onap-test.osaaf.org:8095 + # Remove the following 4 lines to create authenticated topics + KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INTERNAL_PLAINTEXT:PLAINTEXT,EXTERNAL_PLAINTEXT:PLAINTEXT + KAFKA_ADVERTISED_LISTENERS: INTERNAL_PLAINTEXT://kafka:9092 + KAFKA_LISTENERS: INTERNAL_PLAINTEXT://0.0.0.0:9092 + KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL_PLAINTEXT + volumes: + - /var/run/docker.sock:/var/run/docker.sock + depends_on: + - zookeeper + + dmaap: + image: nexus3.onap.org:10001/onap/dmaap/dmaap-mr:1.1.14 + ports: + - "3904:3904" + - "3905:3905" + volumes: + - ./MsgRtrApi.properties:/appl/dmaapMR1/bundleconfig/etc/appprops/MsgRtrApi.properties + depends_on: + - zookeeper + - kafka |