summaryrefslogtreecommitdiffstats
path: root/rest-services/dmaap-client/src
diff options
context:
space:
mode:
authorIzabela Zawadzka <izabela.zawadzka@nokia.com>2019-05-20 12:22:07 +0200
committerIzabela Zawadzka <izabela.zawadzka@nokia.com>2019-05-28 11:12:53 +0200
commit7e893708bbdc36698a5d90502b318a5fd4f3be21 (patch)
treec07793027bd67f42cea070ade1bf6c511d5fd3bd /rest-services/dmaap-client/src
parent47375ee6aa3052cee8f1ee0b796ddf6baf971c69 (diff)
Add sample tests that runs DMaap MR in container
Testcontainers framework can be used to test integration with Dmaap MR Change-Id: I5ef7b792226da09eb7cc0ea441f528e236e41053 Issue-ID: DCAEGEN2-1512 Signed-off-by: Izabela Zawadzka <izabela.zawadzka@nokia.com>
Diffstat (limited to 'rest-services/dmaap-client/src')
-rw-r--r--rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/DockerComposeNotFoundException.java27
-rw-r--r--rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterSubscriberCIT.java191
-rw-r--r--rest-services/dmaap-client/src/test/resources/dmaap-msg-router/MsgRtrApi.properties51
-rw-r--r--rest-services/dmaap-client/src/test/resources/dmaap-msg-router/message-router-compose.yml48
4 files changed, 317 insertions, 0 deletions
diff --git a/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/DockerComposeNotFoundException.java b/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/DockerComposeNotFoundException.java
new file mode 100644
index 00000000..727f17ac
--- /dev/null
+++ b/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/DockerComposeNotFoundException.java
@@ -0,0 +1,27 @@
+/*
+ * ============LICENSE_START====================================
+ * DCAEGEN2-SERVICES-SDK
+ * =========================================================
+ * Copyright (C) 2019 Nokia. All rights reserved.
+ * =========================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=====================================
+ */
+
+package org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api;
+
+class DockerComposeNotFoundException extends RuntimeException{
+ DockerComposeNotFoundException(String s){
+ super(s);
+ }
+}
diff --git a/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterSubscriberCIT.java b/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterSubscriberCIT.java
new file mode 100644
index 00000000..5b113503
--- /dev/null
+++ b/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterSubscriberCIT.java
@@ -0,0 +1,191 @@
+/*
+ * ============LICENSE_START====================================
+ * DCAEGEN2-SERVICES-SDK
+ * =========================================================
+ * Copyright (C) 2019 Nokia. All rights reserved.
+ * =========================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=====================================
+ */
+
+package org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api;
+
+import com.google.gson.Gson;
+import com.google.gson.JsonArray;
+import com.google.gson.JsonParser;
+import com.google.gson.JsonPrimitive;
+import java.io.File;
+import java.net.URL;
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.List;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Disabled;
+import org.junit.jupiter.api.Test;
+import org.onap.dcaegen2.services.sdk.model.streams.dmaap.ImmutableMessageRouterSink;
+import org.onap.dcaegen2.services.sdk.model.streams.dmaap.ImmutableMessageRouterSource;
+import org.onap.dcaegen2.services.sdk.model.streams.dmaap.MessageRouterSink;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.ImmutableMessageRouterPublishRequest;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.ImmutableMessageRouterSubscribeRequest;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.ImmutableMessageRouterSubscribeResponse;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishRequest;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterSubscribeRequest;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterSubscribeResponse;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.MessageRouterPublisherConfig;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.MessageRouterSubscriberConfig;
+import org.testcontainers.containers.DockerComposeContainer;
+import org.testcontainers.junit.jupiter.Container;
+import org.testcontainers.junit.jupiter.Testcontainers;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+import reactor.test.StepVerifier;
+
+@Disabled("Disabled until fix messages formatting in MessageRouterPublisher::put ")
+@Testcontainers
+class MessageRouterSubscriberCIT {
+ private static final Gson gson = new Gson();
+ private static final Duration TIMEOUT = Duration.ofSeconds(10);
+ private static final int DMAAP_SERVICE_EXPOSED_PORT = 3904;
+ private static final List<String> messageBatchItems = Arrays.asList("I", "like", "pizza");
+ private static final Flux<JsonPrimitive> messageBatch = Flux.fromIterable(messageBatchItems)
+ .map(JsonPrimitive::new);
+ private static final String CONSUMER_GROUP = "group1";
+ private static final String CONSUMER_ID = "consumer200";
+ private static final String DMAAP_SERVICE_NAME = "dmaap";
+ private static final String MR_COMPOSE_RESOURCE_NAME = "dmaap-msg-router/message-router-compose.yml";
+ private static final String DOCKER_COMPOSE_FILE_PATH = getDockerComposeFilePath(
+ MR_COMPOSE_RESOURCE_NAME);
+ private static final String DMAAP_404_ERROR_RESPONSE_FORMAT = "404 Not Found\n" +
+ "{" +
+ "\"mrstatus\":3001," +
+ "\"helpURL\":\"http://onap.readthedocs.io\"," +
+ "\"message\":\"No such topic exists.-[%s]\"," +
+ "\"status\":404" +
+ "}";
+
+ @Container
+ private static final DockerComposeContainer CONTAINER = new DockerComposeContainer(
+ new File(DOCKER_COMPOSE_FILE_PATH))
+ .withExposedService(DMAAP_SERVICE_NAME, DMAAP_SERVICE_EXPOSED_PORT);
+
+ private static String EVENTS_PATH;
+
+ private MessageRouterPublisher publisher = DmaapClientFactory
+ .createMessageRouterPublisher(MessageRouterPublisherConfig.createDefault());
+ private MessageRouterSubscriber sut = DmaapClientFactory
+ .createMessageRouterSubscriber(MessageRouterSubscriberConfig.createDefault());
+
+
+ @BeforeAll
+ static void setUp() {
+ EVENTS_PATH = String.format("http://%s:%d/events",
+ CONTAINER.getServiceHost(DMAAP_SERVICE_NAME, DMAAP_SERVICE_EXPOSED_PORT),
+ CONTAINER.getServicePort(DMAAP_SERVICE_NAME, DMAAP_SERVICE_EXPOSED_PORT));
+ }
+
+ @Test
+ void subscriber_shouldHandleNoSuchTopicException() {
+ //given
+ final String topic = "newTopic";
+ final MessageRouterSubscribeRequest mrSubscribeRequest = createMRSubscribeRequest(topic);
+ final String expectedFailReason = String.format(DMAAP_404_ERROR_RESPONSE_FORMAT, topic);
+ final MessageRouterSubscribeResponse expectedResponse = ImmutableMessageRouterSubscribeResponse
+ .builder()
+ .failReason(expectedFailReason)
+ .build();
+
+ //when
+ Mono<MessageRouterSubscribeResponse> response = sut
+ .get(mrSubscribeRequest);
+
+ //then
+ StepVerifier.create(response)
+ .expectNext(expectedResponse)
+ .expectComplete()
+ .verify(TIMEOUT);
+ }
+
+ @Test
+ void subscriber_shouldGetCorrectResponse() {
+ //given
+ final String topic = "TOPIC";
+ final MessageRouterPublishRequest publishRequest = createMRPublishRequest(topic, "text/plain");
+ final MessageRouterSubscribeRequest subscribeRequest = createMRSubscribeRequest(topic);
+ final JsonArray expectedItems = getAsJsonArray(messageBatchItems);
+ final ImmutableMessageRouterSubscribeResponse expectedResponse = ImmutableMessageRouterSubscribeResponse
+ .builder()
+ .items(expectedItems)
+ .build();
+
+ //when
+ registerTopic(publishRequest, subscribeRequest);
+ Mono<MessageRouterSubscribeResponse> response = publisher
+ .put(publishRequest, messageBatch)
+ .then(sut.get(subscribeRequest));
+
+ //then
+ StepVerifier.create(response)
+ .expectNext(expectedResponse)
+ .expectComplete()
+ .verify();
+ }
+
+ private static String getDockerComposeFilePath(String resourceName){
+ URL resource = MessageRouterSubscriberCIT.class.getClassLoader()
+ .getResource(resourceName);
+
+ if(resource != null) return resource.getFile();
+ else throw new DockerComposeNotFoundException(String
+ .format("File %s does not exist", resourceName));
+ }
+
+ private static MessageRouterPublishRequest createMRPublishRequest(String topic,
+ String contentType) {
+ MessageRouterSink sinkDefinition = ImmutableMessageRouterSink.builder()
+ .name("the topic")
+ .topicUrl(String.format("%s/%s", EVENTS_PATH, topic))
+ .build();
+
+ return ImmutableMessageRouterPublishRequest.builder()
+ .sinkDefinition(sinkDefinition)
+ .contentType(contentType)
+ .build();
+ }
+
+ private MessageRouterSubscribeRequest createMRSubscribeRequest(String topic) {
+ ImmutableMessageRouterSource sourceDefinition = ImmutableMessageRouterSource.builder()
+ .name("the topic")
+ .topicUrl(String.format("%s/%s", EVENTS_PATH, topic))
+ .build();
+
+ return ImmutableMessageRouterSubscribeRequest
+ .builder()
+ .sourceDefinition(sourceDefinition)
+ .consumerGroup(CONSUMER_GROUP)
+ .consumerId(CONSUMER_ID)
+ .build();
+ }
+
+ private void registerTopic(MessageRouterPublishRequest publishRequest,
+ MessageRouterSubscribeRequest subscribeRequest) {
+ Flux<JsonPrimitive> sampleMessage = Flux.just("sample message").map(JsonPrimitive::new);
+
+ publisher.put(publishRequest, sampleMessage).blockLast();
+ sut.get(subscribeRequest).block();
+ }
+
+ private JsonArray getAsJsonArray(List<String> list) {
+ String listsJsonString = gson.toJson(list);
+ return new JsonParser().parse(listsJsonString).getAsJsonArray();
+ }
+}
diff --git a/rest-services/dmaap-client/src/test/resources/dmaap-msg-router/MsgRtrApi.properties b/rest-services/dmaap-client/src/test/resources/dmaap-msg-router/MsgRtrApi.properties
new file mode 100644
index 00000000..68b0f1e6
--- /dev/null
+++ b/rest-services/dmaap-client/src/test/resources/dmaap-msg-router/MsgRtrApi.properties
@@ -0,0 +1,51 @@
+###############################################################################
+# ============LICENSE_START=======================================================
+# org.onap.dmaap
+# ================================================================================
+# Copyright � 2017 AT&T Intellectual Property. All rights reserved.
+# ================================================================================
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# ============LICENSE_END=========================================================
+#
+# ECOMP is a trademark and service mark of AT&T Intellectual Property.
+#
+###############################################################################
+
+authentication.adminSecret=fe3cCompound
+kafka.max.poll.interval.ms=300000
+kafka.heartbeat.interval.ms=60000
+kafka.session.timeout.ms=240000
+kafka.max.poll.records=1000
+msgRtr.namespace.aaf=org.onap.dmaap.mr.topic
+msgRtr.topicfactory.aaf=org.onap.dmaap.mr.topicFactory|:org.onap.dmaap.mr.topic:
+enforced.topic.name.AAF=org.onap.dmaap.mr
+forceAAF=false
+transidUEBtopicreqd=false
+defaultNSforUEB=org.onap.dmaap.mr
+consumer.timeout=17
+maxcontentlength=10000
+
+
+###############################################################################
+## Kafka Connection
+
+config.zk.servers=zookeeper
+kafka.metadata.broker.list=kafka:9092
+consumer.timeout.ms=100
+zookeeper.connection.timeout.ms=6000
+zookeeper.session.timeout.ms=20000
+zookeeper.sync.time.ms=2000
+auto.commit.interval.ms=1000
+fetch.message.max.bytes =1000000
+auto.commit.enable=false
+kafka.rebalance.backoff.ms=10000
+kafka.rebalance.max.retries=6 \ No newline at end of file
diff --git a/rest-services/dmaap-client/src/test/resources/dmaap-msg-router/message-router-compose.yml b/rest-services/dmaap-client/src/test/resources/dmaap-msg-router/message-router-compose.yml
new file mode 100644
index 00000000..85465cbf
--- /dev/null
+++ b/rest-services/dmaap-client/src/test/resources/dmaap-msg-router/message-router-compose.yml
@@ -0,0 +1,48 @@
+version: '2'
+services:
+ zookeeper:
+ image: nexus3.onap.org:10001/onap/dmaap/zookeeper:6.0.0
+ ports:
+ - "2181:2181"
+
+ kafka:
+ image: nexus3.onap.org:10001/onap/dmaap/kafka111:1.0.1
+ ports:
+ - "9092:9092"
+ environment:
+ # For creating authenticated topics add AAF locate aplication ip address to host alias aaf-onap-test.osaaf.org
+ # For creating the authenticated topics enable the following property
+ enableCadi: 'false'
+ KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
+ KAFKA_LOG_DIRS: /opt/kafka/data
+ KAFKA_ZOOKEEPER_CONNECTION_TIMEOUT_MS: 40000
+ KAFKA_ZOOKEEPER_SESSION_TIMEOUT_MS: 40000
+ # Uncomment the following lines to create authenticated topics
+ #KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INTERNAL_SASL_PLAINTEXT:SASL_PLAINTEXT,EXTERNAL_SASL_PLAINTEXT:SASL_PLAINTEXT
+ #KAFKA_ADVERTISED_LISTENERS: INTERNAL_SASL_PLAINTEXT://kafka:9092
+ #KAFKA_LISTENERS: INTERNAL_SASL_PLAINTEXT://0.0.0.0:9092
+ #KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL_SASL_PLAINTEXT
+ #KAFKA_SASL_ENABLED_MECHANISMS: PLAIN
+ #KAFKA_SASL_MECHANISM_INTER_BROKER_PROTOCOL: PLAIN
+ #KAFKA_AUTHORIZER_CLASS_NAME: org.onap.dmaap.kafkaAuthorize.KafkaCustomAuthorizer
+ #aaf_locate_url: https://aaf-onap-test.osaaf.org:8095
+ # Remove the following 4 lines to create authenticated topics
+ KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INTERNAL_PLAINTEXT:PLAINTEXT,EXTERNAL_PLAINTEXT:PLAINTEXT
+ KAFKA_ADVERTISED_LISTENERS: INTERNAL_PLAINTEXT://kafka:9092
+ KAFKA_LISTENERS: INTERNAL_PLAINTEXT://0.0.0.0:9092
+ KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL_PLAINTEXT
+ volumes:
+ - /var/run/docker.sock:/var/run/docker.sock
+ depends_on:
+ - zookeeper
+
+ dmaap:
+ image: nexus3.onap.org:10001/onap/dmaap/dmaap-mr:1.1.14
+ ports:
+ - "3904:3904"
+ - "3905:3905"
+ volumes:
+ - ./MsgRtrApi.properties:/appl/dmaapMR1/bundleconfig/etc/appprops/MsgRtrApi.properties
+ depends_on:
+ - zookeeper
+ - kafka