aboutsummaryrefslogtreecommitdiffstats
path: root/rest-services
diff options
context:
space:
mode:
authorsushant53 <sushant.jadhav@t-systems.com>2023-08-11 19:45:44 +0530
committerSushant Jadhav <sushant.jadhav@t-systems.com>2023-09-11 12:09:33 +0000
commit86513b7ca5b8cc8ba93bf23176aeac57656b7c66 (patch)
tree0b1a0499dbccbb937c8eca7b2cef075ad63134df /rest-services
parent9d8a9326758a162eb26236a1dd9de1c29c504554 (diff)
[DCAEGEN2] Use kafka API directly in DMaaP library
Use kafka API directly in dmaap-client library instead of the DMaaP Rest APIs. Issue-ID: DCAEGEN2-3364 Change-Id: I7f27d9d5f443fe3934896fa01f907b6001898495 Signed-off-by: sushant53 <sushant.jadhav@t-systems.com>
Diffstat (limited to 'rest-services')
-rw-r--r--rest-services/dmaap-client/pom.xml18
-rw-r--r--rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/DmaapClientFactory.java26
-rw-r--r--rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterPublisher.java10
-rw-r--r--rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterSubscriber.java11
-rw-r--r--rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/impl/Commons.java92
-rw-r--r--rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/impl/MessageRouterPublisherImpl.java151
-rw-r--r--rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/impl/MessageRouterSubscriberImpl.java154
-rw-r--r--rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/DMaapContainer.java2
-rw-r--r--rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterPublisherIT.java159
-rw-r--r--rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterPublisherTest.java94
-rw-r--r--rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterSubscriberIT.java90
-rw-r--r--rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterSubscriberTest.java182
-rw-r--r--rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/impl/CommonsTest.java27
-rw-r--r--rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/impl/MessageRouterPublisherImplTest.java13
-rw-r--r--rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/impl/MessageRouterSubscriberImplTest.java19
-rw-r--r--rest-services/dmaap-client/src/test/resources/dmaap-msg-router/message-router-compose.yml23
16 files changed, 922 insertions, 149 deletions
diff --git a/rest-services/dmaap-client/pom.xml b/rest-services/dmaap-client/pom.xml
index 8123af31..5a3ef94b 100644
--- a/rest-services/dmaap-client/pom.xml
+++ b/rest-services/dmaap-client/pom.xml
@@ -2,6 +2,7 @@
<!--
============LICENSE_START=======================================================
Copyright (c) 2022 Nokia. All rights reserved.
+Copyright (c) 2023 Deutsche Telekom AG. All rights reserved.
================================================================================
Licensed under the Apache License, Version 2.0 (the "License"); you may not
use this file except in compliance with the License. You may obtain a copy
@@ -98,5 +99,22 @@ language governing permissions and limitations under the License.
<artifactId>mockserver-client-java</artifactId>
<version>${mockserver-client.version}</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.kafka</groupId>
+ <artifactId>kafka-clients</artifactId>
+ <version>3.3.1</version>
+ </dependency>
+ <dependency>
+ <groupId>uk.org.webcompere</groupId>
+ <artifactId>system-stubs-jupiter</artifactId>
+ <version>1.1.0</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.mockito</groupId>
+ <artifactId>mockito-junit-jupiter</artifactId>
+ <scope>test</scope>
+ </dependency>
+
</dependencies>
</project>
diff --git a/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/DmaapClientFactory.java b/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/DmaapClientFactory.java
index ee4f6d38..6e7f6049 100644
--- a/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/DmaapClientFactory.java
+++ b/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/DmaapClientFactory.java
@@ -3,6 +3,7 @@
* DCAEGEN2-SERVICES-SDK
* =========================================================
* Copyright (C) 2019-2021 Nokia. All rights reserved.
+ * Copyright (C) 2023 Deutsche Telekom AG. All rights reserved.
* =========================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -35,6 +36,8 @@ import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.impl.MessageRou
import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.DmaapClientConfiguration;
import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.MessageRouterPublisherConfig;
import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.MessageRouterSubscriberConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.time.Duration;
@@ -46,26 +49,37 @@ import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.co
* @since 1.1.4
*/
public final class DmaapClientFactory {
-
+ private static final Logger LOGGER = LoggerFactory.getLogger(DmaapClientFactory.class);
private DmaapClientFactory() {
}
public static @NotNull MessageRouterPublisher createMessageRouterPublisher(
@NotNull MessageRouterPublisherConfig clientConfiguration) {
- return new MessageRouterPublisherImpl(
+ try {
+ return new MessageRouterPublisherImpl(
createHttpClient(clientConfiguration),
clientConfiguration.maxBatchSize(),
clientConfiguration.maxBatchDuration(),
new ClientErrorReasonPresenter());
+ } catch (Exception e) {
+ LOGGER.error("Error while creating the Message Router Publisher.");
+ return null;
+ }
}
public static @NotNull MessageRouterSubscriber createMessageRouterSubscriber(
@NotNull MessageRouterSubscriberConfig clientConfiguration) {
- return new MessageRouterSubscriberImpl(
- createHttpClient(clientConfiguration),
- clientConfiguration.gsonInstance(),
- new ClientErrorReasonPresenter());
+ try {
+ return new MessageRouterSubscriberImpl(
+ createHttpClient(clientConfiguration),
+ clientConfiguration.gsonInstance(),
+ new ClientErrorReasonPresenter());
+ } catch (Exception e) {
+ LOGGER.error("Error while creating the Message Router Subscriber.");
+ return null;
+ }
+
}
private static @NotNull RxHttpClient createHttpClient(DmaapClientConfiguration config) {
diff --git a/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterPublisher.java b/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterPublisher.java
index 08825b4c..f98e8198 100644
--- a/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterPublisher.java
+++ b/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterPublisher.java
@@ -3,6 +3,7 @@
* DCAEGEN2-SERVICES-SDK
* =========================================================
* Copyright (C) 2019 Nokia. All rights reserved.
+ * Copyright (C) 2023 Deutsche Telekom AG. All rights reserved.
* =========================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -21,6 +22,13 @@
package org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api;
import com.google.gson.JsonElement;
+
+import io.vavr.collection.List;
+
+import java.util.Properties;
+
+import org.apache.kafka.clients.producer.Producer;
+import org.onap.dcaegen2.services.sdk.model.streams.dmaap.KafkaSink;
import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishRequest;
import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishResponse;
import reactor.core.publisher.Flux;
@@ -30,5 +38,7 @@ import reactor.core.publisher.Flux;
* @since 1.1.4
*/
public interface MessageRouterPublisher {
+ void close();
+ void setKafkaProducer(Producer<String, String> kafkaProducer);
Flux<MessageRouterPublishResponse> put(MessageRouterPublishRequest request, Flux<? extends JsonElement> items);
}
diff --git a/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterSubscriber.java b/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterSubscriber.java
index d91535d3..fbf90d9e 100644
--- a/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterSubscriber.java
+++ b/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterSubscriber.java
@@ -3,6 +3,7 @@
* DCAEGEN2-SERVICES-SDK
* =========================================================
* Copyright (C) 2019 Nokia. All rights reserved.
+ * Copyright (C) 2023 Deutsche Telekom AG. All rights reserved.
* =========================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -21,7 +22,14 @@
package org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api;
import com.google.gson.JsonElement;
+
+import io.vavr.collection.List;
+
import java.time.Duration;
+
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.onap.dcaegen2.services.sdk.model.streams.dmaap.KafkaSource;
import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterSubscribeRequest;
import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterSubscribeResponse;
import reactor.core.publisher.Flux;
@@ -33,6 +41,8 @@ import reactor.core.publisher.Mono;
*/
public interface MessageRouterSubscriber {
+ void setConsumer(Consumer<String, String> consumer);
+ void close();
Mono<MessageRouterSubscribeResponse> get(MessageRouterSubscribeRequest request);
default Flux<JsonElement> getElements(MessageRouterSubscribeRequest request) {
@@ -49,4 +59,5 @@ public interface MessageRouterSubscriber {
default Flux<JsonElement> subscribeForElements(MessageRouterSubscribeRequest request, Duration period) {
return Flux.interval(period).concatMap(i->getElements(request));
}
+
}
diff --git a/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/impl/Commons.java b/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/impl/Commons.java
index 9f534d8f..4ea80e48 100644
--- a/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/impl/Commons.java
+++ b/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/impl/Commons.java
@@ -3,6 +3,7 @@
* DCAEGEN2-SERVICES-SDK
* =========================================================
* Copyright (C) 2019-2021 Nokia. All rights reserved.
+ * Copyright (C) 2023 Deutsche Telekom AG. All rights reserved.
* =========================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -23,19 +24,43 @@ package org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.impl;
import io.vavr.Tuple;
import io.vavr.Tuple2;
import io.vavr.control.Option;
+
+import org.apache.kafka.clients.admin.AdminClient;
import org.onap.dcaegen2.services.sdk.model.streams.AafCredentials;
import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.HttpResponse;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.nio.charset.StandardCharsets;
import java.util.Base64;
+import java.util.HashMap;
+import java.util.Map;
import java.util.Objects;
+import java.util.Properties;
+import java.util.concurrent.ExecutionException;
/**
* @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a>
* @since April 2019
*/
-final class Commons {
+public final class Commons {
+ static String commonInURL = "/events/";
+ static String KAFKA_PROPS_PREFIX = "kafka.";
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(Commons.class);
+ private static AdminClient kafkaAdminClient;
+ private static Map<String,Object> map = new HashMap<>();
+ static {
+ map.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
+ map.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
+ map.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
+ map.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
+ map.put("max.poll.interval.ms", 300000);
+ map.put("heartbeat.interval.ms", 60000);
+ map.put("session.timeout.ms", 240000);
+ map.put("max.poll.records", 1000);
+ }
private Commons() {
}
@@ -67,4 +92,69 @@ final class Commons {
.map(s -> s.getBytes(StandardCharsets.UTF_8))
.getOrElse(new byte[0]);
}
+ /**
+ * Extracts the topic name from the topicUrl.
+ *
+ * <p>Condition for extracting topic name : Substring after '/events/' in the topicUrl</p>
+ *
+ * @param topicUrl
+ * @return topic
+ */
+ public static String getTopicFromTopicUrl(String topicUrl) {
+ if(topicUrl.endsWith("/")) {
+ return topicUrl.substring(topicUrl.indexOf(commonInURL)+commonInURL.length(), topicUrl.lastIndexOf("/"));
+ }
+ return topicUrl.substring(topicUrl.indexOf(commonInURL)+commonInURL.length());
+ }
+
+ public static Properties setKafkaPropertiesFromSystemEnv(Map<String, String> envs) {
+ Map<String, Object> propMap= getKafkaPropertiesMap(envs);
+ Properties props = new Properties();
+ propMap.forEach((k ,v) -> props.put(k, v));
+ map.forEach((k,v) -> {
+ if(!propMap.containsKey(k)) {
+ props.put(k, v);
+ }
+ });
+
+ return props;
+ }
+
+ static Map<String, Object> getKafkaPropertiesMap(Map<String, String> envs){
+ Map<String, Object> propMap = new HashMap<>();
+ envs.forEach((k ,v) -> {
+ if(k.startsWith(KAFKA_PROPS_PREFIX)){
+ String key = k.substring(KAFKA_PROPS_PREFIX.length());
+ propMap.put(key, v);
+ }
+ });
+ return propMap;
+ }
+
+ public static void closeKafkaAdminClient() {
+ if(kafkaAdminClient != null) {
+ LOGGER.info("Closing the Kafka AdminClient.");
+ kafkaAdminClient.close();
+ kafkaAdminClient=null;
+ }
+ }
+
+ public static boolean checkIfTopicIsPresentInKafka(String topic, Properties adminProps) {
+ if(kafkaAdminClient == null) {
+ kafkaAdminClient = AdminClient.create(adminProps);
+ }
+ try {
+ for (String name : kafkaAdminClient.listTopics().names().get()) {
+ if (name.equals(topic)) {
+ LOGGER.debug("TOPIC_NAME: {} is equal to : {}", name, topic);
+ return true;
+ }
+ }
+ } catch (InterruptedException | ExecutionException e) {
+ LOGGER.error("GetTopicFromKafka: Failed to retrieve topic list from kafka.", e);
+ return false;
+ }
+ return false;
+ }
+
}
diff --git a/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/impl/MessageRouterPublisherImpl.java b/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/impl/MessageRouterPublisherImpl.java
index 534fca6b..329c2a36 100644
--- a/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/impl/MessageRouterPublisherImpl.java
+++ b/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/impl/MessageRouterPublisherImpl.java
@@ -3,6 +3,7 @@
* DCAEGEN2-SERVICES-SDK
* =========================================================
* Copyright (C) 2019-2021 Nokia. All rights reserved.
+ * Copyright (C) 2023 Deutsche Telekom AG. All rights reserved.
* =========================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -22,12 +23,28 @@ package org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.impl;
import com.google.gson.JsonArray;
import com.google.gson.JsonElement;
+import com.google.gson.JsonObject;
+
import io.netty.handler.timeout.ReadTimeoutException;
import io.vavr.collection.HashMap;
import io.vavr.collection.List;
import io.vavr.collection.Map;
+import io.vavr.collection.Stream;
import io.vavr.control.Option;
+
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.producer.Callback;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.config.SaslConfigs;
+import org.apache.kafka.common.errors.TimeoutException;
+import org.apache.kafka.common.security.auth.SecurityProtocol;
+import org.apache.kafka.common.security.scram.internals.ScramMechanism;
import org.jetbrains.annotations.NotNull;
+import org.onap.dcaegen2.services.sdk.model.streams.dmaap.ImmutableKafkaSink;
+import org.onap.dcaegen2.services.sdk.model.streams.dmaap.KafkaSink;
import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.HttpHeaders;
import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.HttpMethod;
import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.HttpRequest;
@@ -54,9 +71,14 @@ import reactor.netty.internal.shaded.reactor.pool.PoolAcquirePendingLimitExcepti
import java.net.ConnectException;
import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Properties;
+import java.util.concurrent.Future;
import java.util.stream.Collectors;
import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.impl.Commons.extractFailReason;
+import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.impl.Commons.getTopicFromTopicUrl;
+import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.impl.Commons.setKafkaPropertiesFromSystemEnv;
/**
* @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a>
@@ -67,24 +89,136 @@ public class MessageRouterPublisherImpl implements MessageRouterPublisher {
private final int maxBatchSize;
private final Duration maxBatchDuration;
private final ClientErrorReasonPresenter clientErrorReasonPresenter;
-
- private static final Logger LOGGER = LoggerFactory.getLogger(MessageRouterPublisherImpl.class);
-
- public MessageRouterPublisherImpl(RxHttpClient httpClient, int maxBatchSize, Duration maxBatchDuration, ClientErrorReasonPresenter clientErrorReasonPresenter) {
+
+ private static Properties props;
+ private static final String kafkaBootstrapServers = "BOOTSTRAP_SERVERS";
+ private static final Logger LOGGER = LoggerFactory.getLogger(MessageRouterPublisherImpl.class);
+ private static Producer<String, String> kafkaProducer;
+ public static Future<RecordMetadata> future;
+ static boolean flag;
+ static Exception exception;
+ public MessageRouterPublisherImpl(RxHttpClient httpClient, int maxBatchSize, Duration maxBatchDuration, ClientErrorReasonPresenter clientErrorReasonPresenter) throws Exception {
this.httpClient = httpClient;
this.maxBatchSize = maxBatchSize;
this.maxBatchDuration = maxBatchDuration;
this.clientErrorReasonPresenter = clientErrorReasonPresenter;
+ setProperties();
}
-
+
+ /**
+ * New constructor that does not take DMaaP parameters as arguments.
+ *
+ * @throws Exception
+ */
+ public MessageRouterPublisherImpl() throws Exception {
+ this.httpClient = null;
+ this.maxBatchSize = 0;
+ this.maxBatchDuration = null;
+ this.clientErrorReasonPresenter = null;
+ setProperties();
+ }
+
+// @Override
+// public Flux<MessageRouterPublishResponse> put(
+// MessageRouterPublishRequest request,
+// Flux<? extends JsonElement> items) {
+// return items.bufferTimeout(maxBatchSize, maxBatchDuration)
+// .flatMap(subItems -> subItems.isEmpty() ? Mono.empty() : pushBatchToMr(request, List.ofAll(subItems)));
+// }
+
+
@Override
public Flux<MessageRouterPublishResponse> put(
MessageRouterPublishRequest request,
Flux<? extends JsonElement> items) {
- return items.bufferTimeout(maxBatchSize, maxBatchDuration)
- .flatMap(subItems -> subItems.isEmpty() ? Mono.empty() : pushBatchToMr(request, List.ofAll(subItems)));
+ flag = true;
+ exception=null;
+ future = null;
+ List<String> batch = getBatch(items);
+ String topic = getTopicFromTopicUrl(request.sinkDefinition().topicUrl());
+ LOGGER.info("Topic extracted from URL {} is : {} ",request.sinkDefinition().topicUrl(),topic);
+ LOGGER.info("Sending a batch of {} items for topic {} to kafka", batch.size(),topic);
+ LOGGER.trace("The items to be sent: {}", batch);
+ if(kafkaProducer == null) {
+ kafkaProducer = new KafkaProducer<>(props);
+ }
+ Flux<MessageRouterPublishResponse> response;
+ try {
+
+ for (String msg : batch) {
+ ProducerRecord<String, String> data =
+ new ProducerRecord<>(topic, msg);
+ future = kafkaProducer.send(data,new Callback() {
+
+ @Override
+ public void onCompletion(RecordMetadata metadata, Exception e) {
+
+ if(e != null) {
+ flag=false;
+ exception = e;
+ }
+ }
+ });
+ }
+ if(flag) {
+ LOGGER.info("Sent a batch of {} items for topic {} to kafka", batch.size(),topic);
+ response = Flux.just(ImmutableMessageRouterPublishResponse.builder().items(List.ofAll(items.collectList().block())).build());
+ }else {
+ throw exception;
+ }
+ }catch(Exception e) {
+ LOGGER.error("Error while publishing the messages : {}",e.getStackTrace());
+ response = Flux.just(ImmutableMessageRouterPublishResponse.builder()
+ .failReason(e.getMessage())
+ .build());
+ }
+ return response;
}
-
+
+ @Override
+ public void close() {
+ LOGGER.info("Closing the Kafka Producer");
+ if(kafkaProducer != null) {
+ kafkaProducer.close();
+ kafkaProducer=null;
+ }
+ }
+
+ @Override
+ public void setKafkaProducer(Producer<String, String> kafkaProducer) {
+ this.kafkaProducer = kafkaProducer;
+ }
+
+ public static Future<RecordMetadata> getFuture(){
+ return future;
+ }
+
+ void setProperties() throws Exception {
+ props = setKafkaPropertiesFromSystemEnv(System.getenv());
+
+ if(System.getenv(kafkaBootstrapServers) == null) {
+ LOGGER.error("Environment Variable "+ kafkaBootstrapServers+" is missing");
+ throw new Exception("Environment Variable "+ kafkaBootstrapServers+" is missing");
+ }else {
+ props.put("bootstrap.servers", System.getenv(kafkaBootstrapServers));
+ }
+ if(System.getenv("JAAS_CONFIG") == null) {
+ LOGGER.info("Not using any authentication for kafka interaction");
+ }else {
+ LOGGER.info("Using {} authentication provided for kafka interaction",ScramMechanism.SCRAM_SHA_512.mechanismName());
+ props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG,SecurityProtocol.SASL_PLAINTEXT.name);
+ props.put(SaslConfigs.SASL_MECHANISM, ScramMechanism.SCRAM_SHA_512.mechanismName());
+ props.put(SaslConfigs.SASL_JAAS_CONFIG, System.getenv("JAAS_CONFIG"));
+ }
+ }
+
+ static List<String> getBatch(Flux<? extends JsonElement> items){
+ java.util.List<String> list = new ArrayList<>();
+ items.map(msg -> msg.toString()).collectList().subscribe(data -> list.addAll(data));
+ return List.ofAll(list);
+
+ }
+
private Publisher<? extends MessageRouterPublishResponse> pushBatchToMr(
MessageRouterPublishRequest request,
List<JsonElement> batch) {
@@ -153,4 +287,5 @@ public class MessageRouterPublisherImpl implements MessageRouterPublisher {
.getOrElse(HashMap.empty());
return headers.put(HttpHeaders.CONTENT_TYPE, request.contentType().toString());
}
+
}
diff --git a/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/impl/MessageRouterSubscriberImpl.java b/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/impl/MessageRouterSubscriberImpl.java
index d98e8d3a..c90a8064 100644
--- a/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/impl/MessageRouterSubscriberImpl.java
+++ b/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/impl/MessageRouterSubscriberImpl.java
@@ -3,6 +3,7 @@
* DCAEGEN2-SERVICES-SDK
* =========================================================
* Copyright (C) 2019-2021 Nokia. All rights reserved.
+ * Copyright (C) 2023 Deutsche Telekom AG. All rights reserved.
* =========================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -29,6 +30,16 @@ import io.vavr.collection.HashMap;
import io.vavr.collection.List;
import io.vavr.collection.Map;
import io.vavr.control.Option;
+
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.config.SaslConfigs;
+import org.apache.kafka.common.security.auth.SecurityProtocol;
+import org.apache.kafka.common.security.scram.internals.ScramMechanism;
import org.jetbrains.annotations.NotNull;
import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.HttpMethod;
import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.HttpRequest;
@@ -40,19 +51,28 @@ import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.MessageRout
import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.error.ClientErrorReason;
import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.error.ClientErrorReasonPresenter;
import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.error.ClientErrorReasons;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.ImmutableMessageRouterPublishResponse;
import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.ImmutableMessageRouterSubscribeResponse;
import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterSubscribeRequest;
import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterSubscribeResponse;
import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.DmaapTimeoutConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+
+import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import java.net.ConnectException;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Properties;
import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.impl.Commons.extractFailReason;
+import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.impl.Commons.getTopicFromTopicUrl;
+import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.impl.Commons.checkIfTopicIsPresentInKafka;
+import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.impl.Commons.setKafkaPropertiesFromSystemEnv;
/**
* @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a>
@@ -63,27 +83,134 @@ public class MessageRouterSubscriberImpl implements MessageRouterSubscriber {
private final Gson gson;
private final ClientErrorReasonPresenter clientErrorReasonPresenter;
private static final Logger LOGGER = LoggerFactory.getLogger(MessageRouterSubscriberImpl.class);
-
+ private static Properties props;
+ private static final String kafkaBootstrapServers = "BOOTSTRAP_SERVERS";
+ private static Consumer<String, String> consumer;
+
public MessageRouterSubscriberImpl(RxHttpClient httpClient, Gson gson,
- ClientErrorReasonPresenter clientErrorReasonPresenter) {
+ ClientErrorReasonPresenter clientErrorReasonPresenter) throws Exception {
this.httpClient = httpClient;
this.gson = gson;
this.clientErrorReasonPresenter = clientErrorReasonPresenter;
+ setProperties();
+ }
+
+ /**
+ * New constructor that does not take DMaaP parameters as arguments.
+ *
+ * @throws Exception
+ */
+ public MessageRouterSubscriberImpl() throws Exception {
+ this.httpClient = null;
+ this.gson = null;
+ this.clientErrorReasonPresenter = null;
+ setProperties();
}
@Override
public Mono<MessageRouterSubscribeResponse> get(MessageRouterSubscribeRequest request) {
- LOGGER.debug("Requesting new items from DMaaP MR: {}", request);
- return httpClient.call(buildGetHttpRequest(request))
- .map(this::buildGetResponse)
- .doOnError(ReadTimeoutException.class,
- e -> LOGGER.error("Timeout exception occurred when subscribe items from DMaaP MR", e))
- .onErrorResume(ReadTimeoutException.class, e -> buildErrorResponse(ClientErrorReasons.TIMEOUT))
- .doOnError(ConnectException.class, e -> LOGGER.error("DMaaP MR is unavailable, {}", e.getMessage()))
- .onErrorResume(ConnectException.class, e -> buildErrorResponse(ClientErrorReasons.SERVICE_UNAVAILABLE))
- .onErrorResume(RetryableException.class, e -> Mono.just(buildGetResponse(e.getResponse())));
+ LOGGER.info("Requesting new items from DMaaP MR: {}", request);
+ String topic = getTopicFromTopicUrl(request.sourceDefinition().topicUrl());
+
+ String fakeGroupName = request.consumerGroup();
+ props.put("client.id", request.consumerId());
+ props.put("group.id", fakeGroupName);
+
+ try{
+ if (consumer == null) {
+ if(!checkIfTopicIsPresentInKafka(topic,getAdminProps())) {
+ LOGGER.error("No such topic exists, TOPIC_NAME : {}", topic);
+ return Mono.just(ImmutableMessageRouterSubscribeResponse.builder()
+ .failReason("404 Topic Not Found")
+ .build());
+ }
+ consumer = getKafkaConsumer(props);
+ consumer.subscribe(Arrays.asList(topic));
+ }
+ ArrayList<String> msgs = new ArrayList<>();
+
+ ConsumerRecords<String, String> records = null;
+ synchronized (consumer) {
+ records = consumer.poll(Duration.ofMillis(500));
+ }
+ for (ConsumerRecord<String, String> rec : records) {
+ msgs.add(rec.value());
+ }
+ List<JsonElement> list = List.ofAll(msgs).map(r -> JsonParser.parseString(r));
+ return Mono.just(ImmutableMessageRouterSubscribeResponse.builder()
+ .items(list)
+ .build());
+ } catch(Exception e) {
+ LOGGER.error("Error while consuming the messages : {}",e.getMessage());
+ return Mono.just(ImmutableMessageRouterSubscribeResponse.builder()
+ .failReason(e.getMessage())
+ .build());
+ }
}
-
+
+ @Override
+ public void setConsumer(Consumer<String, String> consumer) {
+ MessageRouterSubscriberImpl.consumer = consumer;
+ }
+
+ public static KafkaConsumer<String, String> getKafkaConsumer(Properties props){
+ return new KafkaConsumer<>(props);
+ }
+
+ @Override
+ public void close(){
+ if(consumer != null) {
+ LOGGER.info("Closing the Kafka Consumer");
+ consumer.close();
+ consumer = null;
+ }
+ Commons.closeKafkaAdminClient();
+ }
+
+ void setProperties() throws Exception {
+ props = setKafkaPropertiesFromSystemEnv(System.getenv());
+
+ if(System.getenv(kafkaBootstrapServers) == null) {
+ LOGGER.error("Environment Variable "+ kafkaBootstrapServers+" is missing");
+ throw new Exception("Environment Variable "+ kafkaBootstrapServers+" is missing");
+ }else {
+ props.put("bootstrap.servers", System.getenv(kafkaBootstrapServers));
+ }
+ props.put(ConsumerConfig.ALLOW_AUTO_CREATE_TOPICS_CONFIG,false);
+
+ if(System.getenv("JAAS_CONFIG") == null) {
+ LOGGER.info("Not using any authentication for kafka interaction");
+ }else {
+ LOGGER.info("Using {} authentication provided for kafka interaction",ScramMechanism.SCRAM_SHA_512.mechanismName());
+ props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG,SecurityProtocol.SASL_PLAINTEXT.name);
+ props.put(SaslConfigs.SASL_MECHANISM, ScramMechanism.SCRAM_SHA_512.mechanismName());
+ props.put(SaslConfigs.SASL_JAAS_CONFIG, System.getenv("JAAS_CONFIG"));
+ }
+ }
+
+// @Override
+// public Mono<MessageRouterSubscribeResponse> get(MessageRouterSubscribeRequest request) {
+// LOGGER.debug("Requesting new items from DMaaP MR: {}", request);
+// return httpClient.call(buildGetHttpRequest(request))
+// .map(this::buildGetResponse)
+// .doOnError(ReadTimeoutException.class,
+// e -> LOGGER.error("Timeout exception occurred when subscribe items from DMaaP MR", e))
+// .onErrorResume(ReadTimeoutException.class, e -> buildErrorResponse(ClientErrorReasons.TIMEOUT))
+// .doOnError(ConnectException.class, e -> LOGGER.error("DMaaP MR is unavailable, {}", e.getMessage()))
+// .onErrorResume(ConnectException.class, e -> buildErrorResponse(ClientErrorReasons.SERVICE_UNAVAILABLE))
+// .onErrorResume(RetryableException.class, e -> Mono.just(buildGetResponse(e.getResponse())));
+// }
+ public static Properties getAdminProps() {
+ Properties adminProps = new Properties();
+ adminProps.put("bootstrap.servers", System.getenv(kafkaBootstrapServers));
+ if(System.getenv("JAAS_CONFIG") != null) {
+ adminProps.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG,SecurityProtocol.SASL_PLAINTEXT.name);
+ adminProps.put(SaslConfigs.SASL_MECHANISM, ScramMechanism.SCRAM_SHA_512.mechanismName());
+ adminProps.put(SaslConfigs.SASL_JAAS_CONFIG, System.getenv("JAAS_CONFIG"));
+ }
+ return adminProps;
+ }
+
private @NotNull HttpRequest buildGetHttpRequest(MessageRouterSubscribeRequest request) {
return ImmutableHttpRequest.builder()
.method(HttpMethod.GET)
@@ -132,4 +259,7 @@ public class MessageRouterSubscriberImpl implements MessageRouterSubscriber {
.map(HashMap::of)
.getOrElse(HashMap.empty());
}
+
+
+
}
diff --git a/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/DMaapContainer.java b/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/DMaapContainer.java
index 5b1984df..a1f9ac9f 100644
--- a/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/DMaapContainer.java
+++ b/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/DMaapContainer.java
@@ -3,6 +3,7 @@
* DCAEGEN2-SERVICES-SDK
* =========================================================
* Copyright (C) 2019-2021 Nokia. All rights reserved.
+ * Copyright (C) 2023 Deutsche Telekom AG. All rights reserved.
* =========================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -38,7 +39,6 @@ final class DMaapContainer {
static DockerComposeContainer createContainerInstance(){
return new DockerComposeContainer(
new File(DOCKER_COMPOSE_FILE_PATH))
- .withExposedService(DMAAP_SERVICE_NAME, DMAAP_SERVICE_EXPOSED_PORT)
.withLocalCompose(true);
}
diff --git a/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterPublisherIT.java b/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterPublisherIT.java
index a1ad951f..a806ba19 100644
--- a/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterPublisherIT.java
+++ b/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterPublisherIT.java
@@ -3,6 +3,7 @@
* DCAEGEN2-SERVICES-SDK
* =========================================================
* Copyright (C) 2019-2021 Nokia. All rights reserved.
+ * Copyright (C) 2023 Deutsche Telekom AG. All rights reserved.
* =========================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -23,13 +24,22 @@ package org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api;
import com.google.gson.JsonElement;
import com.google.gson.JsonObject;
import io.vavr.collection.List;
+
+import org.apache.kafka.clients.admin.AdminClient;
+import org.apache.kafka.clients.admin.NewTopic;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.MockedStatic;
import org.mockserver.client.MockServerClient;
import org.mockserver.matchers.Times;
import org.mockserver.verify.VerificationTimes;
import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.ContentType;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.impl.Commons;
import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishRequest;
import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishResponse;
import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterSubscribeRequest;
@@ -45,10 +55,16 @@ import org.testcontainers.junit.jupiter.Testcontainers;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.test.StepVerifier;
+import uk.org.webcompere.systemstubs.environment.EnvironmentVariables;
+import uk.org.webcompere.systemstubs.jupiter.SystemStub;
+import uk.org.webcompere.systemstubs.jupiter.SystemStubsExtension;
import java.time.Duration;
+import java.util.Arrays;
+import java.util.Properties;
import java.util.concurrent.TimeUnit;
+import static org.mockito.Mockito.mockStatic;
import static org.mockserver.model.HttpRequest.request;
import static org.mockserver.model.HttpResponse.response;
import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.MessageRouterTestsUtils.createMRSubscribeRequest;
@@ -65,6 +81,7 @@ import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.DMaa
import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.DMaapContainer.PROXY_MOCK_SERVICE_EXPOSED_PORT;
import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.DMaapContainer.createContainerInstance;
+@ExtendWith(SystemStubsExtension.class)
@Testcontainers
class MessageRouterPublisherIT {
@Container
@@ -74,6 +91,7 @@ class MessageRouterPublisherIT {
private static String EVENTS_PATH;
private static String PROXY_MOCK_EVENTS_PATH;
+ private static final long REPEAT_SUBSCRIPTION = 20;
private static final Duration TIMEOUT = Duration.ofSeconds(10);
private static final String DMAAP_400_ERROR_RESPONSE_FORMAT = "400 Bad Request\n"
+ "{"
@@ -108,22 +126,46 @@ class MessageRouterPublisherIT {
+ "}"
+ "}";
- private final MessageRouterPublisher publisher = DmaapClientFactory
- .createMessageRouterPublisher(MessageRouterPublisherConfig.createDefault());
- private final MessageRouterSubscriber subscriber = DmaapClientFactory
- .createMessageRouterSubscriber(MessageRouterSubscriberConfig.createDefault());
-
+ private MessageRouterPublisher publisher;
+ private MessageRouterSubscriber subscriber;
+ Mono<MessageRouterSubscribeResponse> response;
+
+ @SystemStub
+ EnvironmentVariables environmentVariables = new EnvironmentVariables();
+
@BeforeAll
static void setUp() {
EVENTS_PATH = String.format("http://%s:%d/events", LOCALHOST, DMAAP_SERVICE_EXPOSED_PORT);
PROXY_MOCK_EVENTS_PATH = String.format("http://%s:%d/events", LOCALHOST, PROXY_MOCK_SERVICE_EXPOSED_PORT);
+ //sleep introduced to wait till all containers are started
+ try {
+ Thread.sleep(10000);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
}
-
+
+ @AfterEach
+ void afterEach() {
+ publisher.close();
+ subscriber.close();
+ }
+
@BeforeEach
void set() {
MOCK_SERVER_CLIENT.reset();
+ environmentVariables
+ .set("BOOTSTRAP_SERVERS", "localhost:9092")
+ .set("kafka.auto.offset.reset","earliest");
+ publisher = DmaapClientFactory
+ .createMessageRouterPublisher(MessageRouterPublisherConfig.createDefault());
+ subscriber = DmaapClientFactory
+ .createMessageRouterSubscriber(MessageRouterSubscriberConfig.createDefault());
+ response=null;
+
}
-
+
+ @Disabled
@Test
void test_put_givenMessageBatch_shouldMakeSuccessfulPostRequestReturningBatch() {
//given
@@ -143,7 +185,8 @@ class MessageRouterPublisherIT {
.expectComplete()
.verify(TIMEOUT);
}
-
+
+ @Disabled
@Test
void publisher_shouldHandleBadRequestError() {
//given
@@ -175,13 +218,16 @@ class MessageRouterPublisherIT {
final MessageRouterPublishRequest publishRequest = createPublishRequest(topicUrl);
final MessageRouterSubscribeRequest subscribeRequest = createMRSubscribeRequest(topicUrl, "sampleGroup", "sampleId");
final MessageRouterSubscribeResponse expectedResponse = successSubscribeResponse(expectedItems);
-
+
//when
- registerTopic(publisher, publishRequest, subscriber, subscribeRequest);
- Mono<MessageRouterSubscribeResponse> response = publisher
- .put(publishRequest, jsonMessageBatch)
- .then(subscriber.get(subscribeRequest));
-
+ publisher.put(publishRequest, jsonMessageBatch).repeat(REPEAT_SUBSCRIPTION)
+ .subscribe(r -> { subscriber.get(subscribeRequest).subscribe(resp -> {
+ if(!resp.items().isEmpty()) {
+ response = Mono.just(resp);
+ }
+ });
+ });
+
//then
StepVerifier.create(response)
.expectNext(expectedResponse)
@@ -200,12 +246,15 @@ class MessageRouterPublisherIT {
final MessageRouterPublishRequest publishRequest = createPublishRequest(topicUrl);
final MessageRouterSubscribeRequest subscribeRequest = createMRSubscribeRequest(topicUrl, "sampleGroup", "sampleId");
final MessageRouterSubscribeResponse expectedResponse = successSubscribeResponse(expectedItems);
-
+
//when
- registerTopic(publisher, publishRequest, subscriber, subscribeRequest);
- Mono<MessageRouterSubscribeResponse> response = publisher
- .put(publishRequest, jsonMessageBatch)
- .then(subscriber.get(subscribeRequest));
+ publisher.put(publishRequest, jsonMessageBatch).repeat(REPEAT_SUBSCRIPTION)
+ .subscribe(r -> { subscriber.get(subscribeRequest).subscribe(resp -> {
+ if(!resp.items().isEmpty()) {
+ response = Mono.just(resp);
+ }
+ });
+ });
//then
StepVerifier.create(response)
@@ -229,11 +278,13 @@ class MessageRouterPublisherIT {
final MessageRouterSubscribeResponse expectedResponse = successSubscribeResponse(expectedItems);
//when
- registerTopic(publisher, publishRequest, subscriber, subscribeRequest);
- Mono<MessageRouterSubscribeResponse> response = publisher
- .put(publishRequest, plainBatch)
- .then(subscriber.get(subscribeRequest));
-
+ publisher.put(publishRequest, plainBatch).repeat(REPEAT_SUBSCRIPTION)
+ .subscribe(r -> { subscriber.get(subscribeRequest).subscribe(resp -> {
+ if(!resp.items().isEmpty()) {
+ response = Mono.just(resp);
+ }
+ });
+ });
//then
StepVerifier.create(response)
.expectNext(expectedResponse)
@@ -256,10 +307,13 @@ class MessageRouterPublisherIT {
final MessageRouterSubscribeResponse expectedResponse = successSubscribeResponse(expectedItems);
//when
- registerTopic(publisher, publishRequest, subscriber, subscribeRequest);
- Mono<MessageRouterSubscribeResponse> response = publisher
- .put(publishRequest, plainBatch)
- .then(subscriber.get(subscribeRequest));
+ publisher.put(publishRequest, plainBatch).repeat(REPEAT_SUBSCRIPTION)
+ .subscribe(r -> { subscriber.get(subscribeRequest).subscribe(resp -> {
+ if(!resp.items().isEmpty()) {
+ response = Mono.just(resp);
+ }
+ });
+ });
//then
StepVerifier.create(response)
@@ -283,10 +337,13 @@ class MessageRouterPublisherIT {
final MessageRouterSubscribeResponse expectedResponse = successSubscribeResponse(expectedItems);
//when
- registerTopic(publisher, publishRequest, subscriber, subscribeRequest);
- Mono<MessageRouterSubscribeResponse> response = publisher
- .put(publishRequest, plainBatch)
- .then(subscriber.get(subscribeRequest));
+ publisher.put(publishRequest, plainBatch).repeat(REPEAT_SUBSCRIPTION)
+ .subscribe(r -> { subscriber.get(subscribeRequest).subscribe(resp -> {
+ if(!resp.items().isEmpty()) {
+ response = Mono.just(resp);
+ }
+ });
+ });
//then
StepVerifier.create(response)
@@ -310,10 +367,13 @@ class MessageRouterPublisherIT {
final MessageRouterSubscribeResponse expectedResponse = successSubscribeResponse(expectedItems);
//when
- registerTopic(publisher, publishRequest, subscriber, subscribeRequest);
- Mono<MessageRouterSubscribeResponse> response = publisher
- .put(publishRequest, plainBatch)
- .then(subscriber.get(subscribeRequest));
+ publisher.put(publishRequest, plainBatch).repeat(REPEAT_SUBSCRIPTION)
+ .subscribe(r -> { subscriber.get(subscribeRequest).subscribe(resp -> {
+ if(!resp.items().isEmpty()) {
+ response = Mono.just(resp);
+ }
+ });
+ });
//then
StepVerifier.create(response)
@@ -321,7 +381,8 @@ class MessageRouterPublisherIT {
.expectComplete()
.verify();
}
-
+
+ @Disabled
@Test
void publisher_shouldHandleClientTimeoutErrorWhenTimeoutDefined() {
//given
@@ -347,7 +408,8 @@ class MessageRouterPublisherIT {
MOCK_SERVER_CLIENT.verify(request().withPath(path), VerificationTimes.exactly(1));
}
-
+
+ @Disabled
@Test
void publisher_shouldRetryWhenRetryableHttpCodeAndSuccessfullyPublish() {
//given
@@ -380,7 +442,8 @@ class MessageRouterPublisherIT {
MOCK_SERVER_CLIENT.verify(request().withPath(path), VerificationTimes.exactly(2));
}
-
+
+ @Disabled
@Test
void publisher_shouldRetryWhenClientTimeoutAndSuccessfullyPublish() {
//given
@@ -412,7 +475,8 @@ class MessageRouterPublisherIT {
MOCK_SERVER_CLIENT.verify(request().withPath(path), VerificationTimes.exactly(2));
}
-
+
+ @Disabled
@Test
void publisher_shouldRetryManyTimesAndSuccessfullyPublish() {
//given
@@ -453,7 +517,8 @@ class MessageRouterPublisherIT {
MOCK_SERVER_CLIENT.verify(request().withPath(path), VerificationTimes.exactly(5));
}
-
+
+ @Disabled
@Test
void publisher_shouldHandleLastRetryError500() {
//given
@@ -489,7 +554,8 @@ class MessageRouterPublisherIT {
MOCK_SERVER_CLIENT.verify(request().withPath(path), VerificationTimes.exactly(2));
}
-
+
+ @Disabled
@Test
void publisher_shouldSuccessfullyPublishWhenConnectionPoolConfigurationIsSet() {
//given
@@ -521,7 +587,8 @@ class MessageRouterPublisherIT {
MOCK_SERVER_CLIENT.verify(request().withPath(path).withKeepAlive(true), VerificationTimes.exactly(1));
}
-
+
+ @Disabled
@Test
void publisher_shouldRetryWhenClientTimeoutAndSuccessfullyPublishWithConnectionPoolConfiguration() {
//given
@@ -553,7 +620,8 @@ class MessageRouterPublisherIT {
MOCK_SERVER_CLIENT.verify(request().withPath(path).withKeepAlive(true), VerificationTimes.exactly(2));
}
-
+
+ @Disabled
@Test
void publisher_shouldSuccessfullyPublishSingleMessageWithBasicAuthHeader() {
//given
@@ -581,7 +649,8 @@ class MessageRouterPublisherIT {
MOCK_SERVER_CLIENT.verify(request().withPath(path)
.withHeader("Authorization" ,"Basic dXNlcm5hbWU6cGFzc3dvcmQ="), VerificationTimes.exactly(1));
}
-
+
+ @Disabled
@Test
void publisher_shouldHandleError429WhenConnectionPollLimitsHasBeenReached() {
//given
diff --git a/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterPublisherTest.java b/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterPublisherTest.java
index 97fd26f5..816021bb 100644
--- a/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterPublisherTest.java
+++ b/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterPublisherTest.java
@@ -3,6 +3,7 @@
* DCAEGEN2-SERVICES-SDK
* =========================================================
* Copyright (C) 2019-2021 Nokia. All rights reserved.
+ * Copyright (C) 2023 Deutsche Telekom AG. All rights reserved.
* =========================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -23,13 +24,24 @@ package org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api;
import com.google.gson.JsonElement;
import com.google.gson.JsonPrimitive;
import io.vavr.collection.List;
+
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.MockProducer;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.junit.Ignore;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.CsvSource;
import org.onap.dcaegen2.services.sdk.model.streams.dmaap.ImmutableMessageRouterSink;
import org.onap.dcaegen2.services.sdk.model.streams.dmaap.MessageRouterSink;
import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.test.DummyHttpServer;
import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.ContentType;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.impl.MessageRouterPublisherImpl;
import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.DmaapResponse;
import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.ImmutableMessageRouterPublishRequest;
import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.ImmutableMessageRouterPublishResponse;
@@ -40,10 +52,17 @@ import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.Me
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.test.StepVerifier;
+import uk.org.webcompere.systemstubs.environment.EnvironmentVariables;
+import uk.org.webcompere.systemstubs.jupiter.SystemStub;
+import uk.org.webcompere.systemstubs.jupiter.SystemStubsExtension;
import java.time.Duration;
+import java.util.concurrent.Future;
import static org.assertj.core.api.Assertions.assertThat;
+import static org.junit.Assert.assertTrue;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.mockito.Mockito.mock;
import static org.onap.dcaegen2.services.sdk.rest.services.adapters.http.test.DummyHttpServer.sendError;
import static org.onap.dcaegen2.services.sdk.rest.services.adapters.http.test.DummyHttpServer.sendString;
import static org.onap.dcaegen2.services.sdk.rest.services.adapters.http.test.DummyHttpServer.sendWithDelay;
@@ -52,6 +71,7 @@ import static org.onap.dcaegen2.services.sdk.rest.services.adapters.http.test.Du
* @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a>
* @since May 2019
*/
+@ExtendWith(SystemStubsExtension.class)
class MessageRouterPublisherTest {
private static final String ERROR_MESSAGE = "Something went wrong";
@@ -71,9 +91,9 @@ class MessageRouterPublisherTest {
private static final List<String> messageBatchItems = List.of("ala", "ma", "kota");
private static final DummyHttpServer DISPOSED_HTTP_SERVER = initialize().closeAndGet();
private static final DummyHttpServer SERVER = initialize();
- private MessageRouterPublisher sut = DmaapClientFactory
- .createMessageRouterPublisher(MessageRouterPublisherConfig.createDefault());
-
+ private MessageRouterPublisher sut;
+ MockProducer<String, String> mockProducer =
+ new MockProducer<>(true, new StringSerializer(), new StringSerializer());
private static DummyHttpServer initialize() {
return DummyHttpServer.start(routes -> routes
.post(SUCCESS_RESP_TOPIC_PATH, (req, resp) -> sendString(resp, Mono.just("OK")))
@@ -86,7 +106,25 @@ class MessageRouterPublisherTest {
.post(FAILING_WITH_429_RESP_PATH, (req, resp) -> sendError(resp, 429, ERROR_MESSAGE))
);
}
-
+
+ @SystemStub
+ EnvironmentVariables environmentVariables = new EnvironmentVariables();
+
+ @BeforeEach
+ void setUp() {
+ environmentVariables
+ .set("BOOTSTRAP_SERVERS", "localhost:9092")
+ .set("JAAS_CONFIG", "jaas.config");
+
+ sut = DmaapClientFactory
+ .createMessageRouterPublisher(MessageRouterPublisherConfig.createDefault());
+ sut.setKafkaProducer(mockProducer);
+ }
+ @AfterEach
+ void afterEach() {
+ sut.close();
+ }
+
@Test
void test_put_givenMessageBatch_shouldMakeSuccessfulPostRequestReturningBatch() {
//given
@@ -102,7 +140,45 @@ class MessageRouterPublisherTest {
.expectComplete()
.verify(TIMEOUT);
}
+
+ @Test
+ void test_put_givenMessageBatch_shouldMakeSuccessfulPostRequestReturningBatch_ForConstructorWithoutDMaapParameters() throws Exception {
+ //given
+ final MessageRouterPublishRequest mrRequest = createTextPlainMRRequest(SUCCESS_RESP_TOPIC_PATH, SERVER);
+ final List<JsonElement> expectedItems = messageBatchItems.map(JsonPrimitive::new);
+ sut = new MessageRouterPublisherImpl();
+ sut.setKafkaProducer(mockProducer);
+ //when
+ final Flux<MessageRouterPublishResponse> result = sut.put(mrRequest, messageBatch);
+ //then
+ StepVerifier.create(result)
+ .expectNext(ImmutableMessageRouterPublishResponse.builder().items(expectedItems).build())
+ .expectComplete()
+ .verify(TIMEOUT);
+ }
+ @Test
+ void publisher_shouldHandleError() {
+
+ sut.setKafkaProducer(mockProducer);
+
+ final MessageRouterPublishRequest mrRequest = createTextPlainMRRequest(SUCCESS_RESP_TOPIC_PATH, SERVER);
+
+ //when
+ final Flux<MessageRouterPublishResponse> result = sut.put(mrRequest, messageBatch);
+ RuntimeException e = new RuntimeException();
+ mockProducer.errorNext(e);
+ Future<RecordMetadata> record =MessageRouterPublisherImpl.getFuture();
+ try{
+ record.get();
+ }catch(Exception ex) {
+ assertEquals(e, ex);
+ }
+ assertTrue(record.isDone());
+
+ }
+
+ @Disabled
@ParameterizedTest
@CsvSource({
FAILING_WITH_400_RESP_PATH + "," + "400 Bad Request",
@@ -126,7 +202,8 @@ class MessageRouterPublisherTest {
.expectComplete()
.verify(TIMEOUT);
}
-
+
+ @Disabled
@Test
void publisher_shouldHandleClientTimeoutError() {
//given
@@ -142,7 +219,8 @@ class MessageRouterPublisherTest {
.expectComplete()
.verify(TIMEOUT);
}
-
+
+ @Disabled
@Test
void publisher_shouldHandleConnectionError() {
//given
@@ -179,9 +257,7 @@ class MessageRouterPublisherTest {
private static MessageRouterSink createMRSink(String topicPath, DummyHttpServer dummyHttpServer) {
return ImmutableMessageRouterSink.builder()
.name("the topic")
- .topicUrl(String.format("http://%s:%d%s",
- dummyHttpServer.host(),
- dummyHttpServer.port(),
+ .topicUrl(String.format("http://dmaap-mr%s",
topicPath)
)
.build();
diff --git a/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterSubscriberIT.java b/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterSubscriberIT.java
index 3d43e817..48a12455 100644
--- a/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterSubscriberIT.java
+++ b/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterSubscriberIT.java
@@ -3,6 +3,7 @@
* DCAEGEN2-SERVICES-SDK
* =========================================================
* Copyright (C) 2019-2021 Nokia. All rights reserved.
+ * Copyright (C) 2023 Deutsche Telekom AG. All rights reserved.
* =========================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -23,9 +24,13 @@ package org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api;
import com.google.gson.JsonElement;
import com.google.gson.JsonObject;
import io.vavr.collection.List;
+
+import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
import org.mockserver.client.MockServerClient;
import org.mockserver.matchers.Times;
import org.mockserver.verify.VerificationTimes;
@@ -43,6 +48,9 @@ import org.testcontainers.junit.jupiter.Testcontainers;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.test.StepVerifier;
+import uk.org.webcompere.systemstubs.environment.EnvironmentVariables;
+import uk.org.webcompere.systemstubs.jupiter.SystemStub;
+import uk.org.webcompere.systemstubs.jupiter.SystemStubsExtension;
import java.time.Duration;
import java.util.concurrent.TimeUnit;
@@ -62,6 +70,7 @@ import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.DMaa
import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.DMaapContainer.PROXY_MOCK_SERVICE_EXPOSED_PORT;
import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.DMaapContainer.createContainerInstance;
+@ExtendWith(SystemStubsExtension.class)
@Testcontainers
class MessageRouterSubscriberIT {
@Container
@@ -71,16 +80,11 @@ class MessageRouterSubscriberIT {
private static String EVENTS_PATH;
private static String PROXY_MOCK_EVENTS_PATH;
+ private static final long REPEAT_SUBSCRIPTION = 20;
private static final Duration TIMEOUT = Duration.ofSeconds(10);
private static final String CONSUMER_GROUP = "group1";
private static final String CONSUMER_ID = "consumer200";
- private static final String DMAAP_404_ERROR_RESPONSE_FORMAT = "404 Not Found\n" +
- "{" +
- "\"mrstatus\":3001," +
- "\"helpURL\":\"http://onap.readthedocs.io\"," +
- "\"message\":\"No such topic exists.-[%s]\"," +
- "\"status\":404" +
- "}";
+ private static final String DMAAP_404_ERROR_RESPONSE_FORMAT = "404 Topic Not Found";
private static final String TIMEOUT_ERROR_MESSAGE = "408 Request Timeout\n"
+ "{"
+ "\"requestError\":"
@@ -94,22 +98,44 @@ class MessageRouterSubscriberIT {
+ "}"
+ "}";
- private MessageRouterPublisher publisher = DmaapClientFactory
- .createMessageRouterPublisher(MessageRouterPublisherConfig.createDefault());
- private MessageRouterSubscriber subscriber = DmaapClientFactory
- .createMessageRouterSubscriber(MessageRouterSubscriberConfig.createDefault());
-
+ private MessageRouterPublisher publisher;
+ private MessageRouterSubscriber subscriber;
+ Mono<MessageRouterSubscribeResponse> response;
+ @SystemStub
+ EnvironmentVariables environmentVariables = new EnvironmentVariables();
+
@BeforeAll
static void setUp() {
EVENTS_PATH = String.format("http://%s:%d/events", LOCALHOST, DMAAP_SERVICE_EXPOSED_PORT);
PROXY_MOCK_EVENTS_PATH = String.format("http://%s:%d/events", LOCALHOST, PROXY_MOCK_SERVICE_EXPOSED_PORT);
+ //sleep introduced to wait till all containers are started
+ try {
+ Thread.sleep(10000);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
}
@BeforeEach
void set() {
MOCK_SERVER_CLIENT.reset();
+ environmentVariables
+ .set("BOOTSTRAP_SERVERS", "localhost:9092")
+ .set("kafka.auto.offset.reset","earliest");
+ publisher = DmaapClientFactory
+ .createMessageRouterPublisher(MessageRouterPublisherConfig.createDefault());
+ subscriber = DmaapClientFactory
+ .createMessageRouterSubscriber(MessageRouterSubscriberConfig.createDefault());
+ response=null;
+ }
+
+ @AfterEach
+ void afterEach() {
+ if(publisher != null)
+ publisher.close();
+ if(subscriber != null)
+ subscriber.close();
}
-
@Test
void subscriber_shouldHandleNoSuchTopicException() {
//given
@@ -144,11 +170,13 @@ class MessageRouterSubscriberIT {
final MessageRouterSubscribeResponse expectedResponse = successSubscribeResponse(expectedItems);
//when
- registerTopic(publisher, publishRequest, subscriber, subscribeRequest);
- Mono<MessageRouterSubscribeResponse> response = publisher
- .put(publishRequest, jsonMessageBatch)
- .then(subscriber.get(subscribeRequest));
-
+ publisher.put(publishRequest, jsonMessageBatch).repeat(REPEAT_SUBSCRIPTION)
+ .subscribe(r -> { subscriber.get(subscribeRequest).subscribe(resp -> {
+ if(!resp.items().isEmpty()) {
+ response = Mono.just(resp);
+ }
+ });
+ });
//then
StepVerifier.create(response)
.expectNext(expectedResponse)
@@ -171,11 +199,14 @@ class MessageRouterSubscriberIT {
final MessageRouterSubscribeResponse expectedResponse = successSubscribeResponse(expectedElements);
//when
- registerTopic(publisher, publishRequest, subscriber, subscribeRequest);
- Mono<MessageRouterSubscribeResponse> response = publisher
- .put(publishRequest, jsonMessageBatch)
- .then(subscriber.get(subscribeRequest));
-
+ publisher.put(publishRequest, jsonMessageBatch).repeat(REPEAT_SUBSCRIPTION)
+ .subscribe(r -> { subscriber.get(subscribeRequest).subscribe(resp -> {
+ if(!resp.items().isEmpty()) {
+ response = Mono.just(resp);
+ }
+ });
+ });
+
//then
StepVerifier.create(response)
.expectNext(expectedResponse)
@@ -183,6 +214,7 @@ class MessageRouterSubscriberIT {
.verify();
}
+ @Disabled
@Test
void subscriber_shouldExtractItemsFromResponse() {
//given
@@ -208,7 +240,8 @@ class MessageRouterSubscriberIT {
.expectComplete()
.verify(TIMEOUT);
}
-
+
+ @Disabled
@Test
void subscriber_shouldSubscribeToTopic() {
//given
@@ -235,7 +268,8 @@ class MessageRouterSubscriberIT {
.expectComplete()
.verify(TIMEOUT);
}
-
+
+ @Disabled
@Test
void subscriber_shouldHandleTimeoutException() {
//given
@@ -261,6 +295,7 @@ class MessageRouterSubscriberIT {
MOCK_SERVER_CLIENT.verify(request().withPath(path), VerificationTimes.exactly(1));
}
+ @Disabled
@Test
void subscriber_shouldRetryWhenRetryableHttpCodeAndSuccessfullySubscribe() {
//given
@@ -298,6 +333,7 @@ class MessageRouterSubscriberIT {
MOCK_SERVER_CLIENT.verify(request().withPath(path), VerificationTimes.exactly(2));
}
+ @Disabled
@Test
void subscriber_shouldRetryWhenClientTimeoutAndSuccessfullySubscribe() {
//given
@@ -336,6 +372,7 @@ class MessageRouterSubscriberIT {
MOCK_SERVER_CLIENT.verify(request().withPath(path), VerificationTimes.exactly(2));
}
+ @Disabled
@Test
void subscriber_shouldRetryManyTimesAndSuccessfullySubscribe() {
//given
@@ -383,6 +420,7 @@ class MessageRouterSubscriberIT {
MOCK_SERVER_CLIENT.verify(request().withPath(path), VerificationTimes.exactly(5));
}
+ @Disabled
@Test
void subscriber_shouldHandleLastRetryError500() {
//given
@@ -416,6 +454,7 @@ class MessageRouterSubscriberIT {
MOCK_SERVER_CLIENT.verify(request().withPath(path), VerificationTimes.exactly(2));
}
+ @Disabled
@Test
void subscriber_shouldSubscribeToTopicWithConnectionPoolConfiguration() {
//given
@@ -445,6 +484,7 @@ class MessageRouterSubscriberIT {
.verify(TIMEOUT);
}
+ @Disabled
@Test
void subscriber_shouldHandleSingleItemResponseWithBasicAuthHeader() {
//given
diff --git a/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterSubscriberTest.java b/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterSubscriberTest.java
index e928f03c..db1fb4fc 100644
--- a/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterSubscriberTest.java
+++ b/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterSubscriberTest.java
@@ -3,6 +3,7 @@
* DCAEGEN2-SERVICES-SDK
* =========================================================
* Copyright (C) 2019-2021 Nokia. All rights reserved.
+ * Copyright (C) 2023 Deutsche Telekom AG. All rights reserved.
* =========================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -23,12 +24,27 @@ package org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api;
import com.google.gson.JsonElement;
import com.google.gson.JsonPrimitive;
import io.vavr.collection.List;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.MockConsumer;
+import org.apache.kafka.clients.consumer.OffsetResetStrategy;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.TopicPartition;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.CsvSource;
+import org.mockito.MockedStatic;
import org.onap.dcaegen2.services.sdk.model.streams.dmaap.ImmutableMessageRouterSource;
import org.onap.dcaegen2.services.sdk.model.streams.dmaap.MessageRouterSource;
import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.test.DummyHttpServer;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.impl.Commons;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.impl.MessageRouterSubscriberImpl;
import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.DmaapResponse;
import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.ImmutableMessageRouterSubscribeRequest;
import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.ImmutableMessageRouterSubscribeResponse;
@@ -39,10 +55,19 @@ import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.Me
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.test.StepVerifier;
+import uk.org.webcompere.systemstubs.environment.EnvironmentVariables;
+import uk.org.webcompere.systemstubs.jupiter.SystemStub;
+import uk.org.webcompere.systemstubs.jupiter.SystemStubsExtension;
import java.time.Duration;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Properties;
import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatExceptionOfType;
+import static org.mockito.Mockito.mockStatic;
+import static org.mockito.Mockito.when;
import static org.onap.dcaegen2.services.sdk.rest.services.adapters.http.test.DummyHttpServer.sendError;
import static org.onap.dcaegen2.services.sdk.rest.services.adapters.http.test.DummyHttpServer.sendResource;
import static org.onap.dcaegen2.services.sdk.rest.services.adapters.http.test.DummyHttpServer.sendWithDelay;
@@ -51,6 +76,7 @@ import static org.onap.dcaegen2.services.sdk.rest.services.adapters.http.test.Du
* @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a>
* @since May 2019
*/
+@ExtendWith(SystemStubsExtension.class)
class MessageRouterSubscriberTest {
private static final Duration TIMEOUT = Duration.ofSeconds(10);
private static final String ERROR_MESSAGE = "Something went wrong";
@@ -64,6 +90,10 @@ class MessageRouterSubscriberTest {
private static final String FAILING_WITH_409_CONSUMER_ID = "consumer409";
private static final String FAILING_WITH_429_CONSUMER_ID = "consumer429";
private static final String FAILING_WITH_500_CONSUMER_ID = "consumer500";
+
+ private static final String POLL_EXCEPTION_MESSAGE = "Poll Exception";
+ private static final String TOPIC_NOT_FOUND_ERROR_MESSAGE = "404 Topic Not Found";
+
private static final String CONSUMER_PATH = String.format("/events/TOPIC/%s", CONSUMER_GROUP);
@@ -85,13 +115,15 @@ class MessageRouterSubscriberTest {
private static final DummyHttpServer DISPOSED_HTTP_SERVER = initialize().closeAndGet();
private static final DummyHttpServer SERVER = initialize();
- private MessageRouterSubscriber sut = DmaapClientFactory
- .createMessageRouterSubscriber(MessageRouterSubscriberConfig.createDefault());
+ private MessageRouterSubscriber sut;
private static MessageRouterSource sourceDefinition = createMessageRouterSource(SERVER);
private static MessageRouterSource failingSourceDefinition = createMessageRouterSource(DISPOSED_HTTP_SERVER);
private static MessageRouterSubscribeRequest mrSuccessRequest = createSuccessRequest(sourceDefinition);
private static MessageRouterSubscribeRequest mrFailingRequest = createFailingRequest(FAILING_WITH_500_CONSUMER_ID);
-
+ static MockConsumer<String, String> mockConsumer;// = new MockConsumer<>(OffsetResetStrategy.EARLIEST);
+ Properties prop = new Properties();
+ static MockedStatic<Commons> commonsMock;
+
private static DummyHttpServer initialize() {
return DummyHttpServer.start(routes -> routes
.get(SUCCESS_RESP_PATH, (req, resp) ->
@@ -103,9 +135,61 @@ class MessageRouterSubscriberTest {
.get(FAILING_WITH_429_RESP_PATH, (req, resp) -> sendError(resp, 429, ERROR_MESSAGE))
.get(FAILING_WITH_500_RESP_PATH, (req, resp) -> sendError(resp, 500, ERROR_MESSAGE)));
}
-
+
+ @SystemStub
+ EnvironmentVariables environmentVariables = new EnvironmentVariables();
+
+ @BeforeAll
+ static void set() {
+ commonsMock = mockStatic(Commons.class);
+ }
+ @AfterEach
+ void afterEach() {
+ sut.close();
+ }
+ @AfterAll
+ static void after() {
+ commonsMock.close();
+ }
+ @BeforeEach
+ void setup() {
+
+ when(Commons.setKafkaPropertiesFromSystemEnv(System.getenv())).thenReturn(prop);
+ mockConsumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST);
+ environmentVariables
+ .set("BOOTSTRAP_SERVERS", "localhost:9092")
+ .set("JAAS_CONFIG", "jaas.config");
+
+ sut = DmaapClientFactory
+ .createMessageRouterSubscriber(MessageRouterSubscriberConfig.createDefault());
+
+ configureMockConsumer();
+ sut.setConsumer(mockConsumer);
+
+
+ }
+
+ private void configureMockConsumer() {
+ mockConsumer.assign(Arrays.asList(new TopicPartition("TOPIC", 0)));
+
+ HashMap<TopicPartition, Long> beginningOffsets = new HashMap<>();
+ beginningOffsets.put(new TopicPartition("TOPIC", 0), 0L);
+ mockConsumer.updateBeginningOffsets(beginningOffsets);
+ mockConsumer.addRecord(new ConsumerRecord<String, String>("TOPIC", 0, 0L, "key", "I"));
+ mockConsumer.addRecord(new ConsumerRecord<String, String>("TOPIC", 0, 1L, "key", "like"));
+ mockConsumer.addRecord(new ConsumerRecord<String, String>("TOPIC", 0, 2L, "key", "pizza"));
+ }
+
+ private void stubForTopicCheck(boolean response) {
+
+ when(Commons.checkIfTopicIsPresentInKafka("TOPIC",MessageRouterSubscriberImpl.getAdminProps())).thenReturn(response);
+ when(Commons.getTopicFromTopicUrl("http://dmaap-mr/events/TOPIC")).thenReturn("TOPIC");
+ }
+
@Test
void subscriber_shouldGetCorrectResponse() {
+
+ stubForTopicCheck(true);
Mono<MessageRouterSubscribeResponse> response = sut
.get(mrSuccessRequest);
@@ -119,9 +203,49 @@ class MessageRouterSubscriberTest {
StepVerifier.create(response)
.expectNext(expectedResponse)
.expectComplete()
- .verify(TIMEOUT);
+ .verify();
}
+
+ @Test
+ void subscriber_shouldGetCorrectResponse_ForConstructorWithoutDMaapParameters() throws Exception {
+ sut = new MessageRouterSubscriberImpl();
+ sut.setConsumer(mockConsumer);
+ stubForTopicCheck(true);
+ Mono<MessageRouterSubscribeResponse> response = sut
+ .get(mrSuccessRequest);
+
+ List<String> expectedItems = List.of("I", "like", "pizza");
+ MessageRouterSubscribeResponse expectedResponse = ImmutableMessageRouterSubscribeResponse
+ .builder()
+ .items(expectedItems.map(JsonPrimitive::new))
+ .build();
+
+ StepVerifier.create(response)
+ .expectNext(expectedResponse)
+ .expectComplete()
+ .verify();
+ }
+
+ @Test
+ void whenTopicNotFound_shouldReturnError() {
+ stubForTopicCheck(false);
+ sut.setConsumer(null);
+ Mono<MessageRouterSubscribeResponse> response = sut
+ .get(mrSuccessRequest);
+
+ MessageRouterSubscribeResponse expectedResponse = ImmutableMessageRouterSubscribeResponse
+ .builder()
+ .failReason(TOPIC_NOT_FOUND_ERROR_MESSAGE)
+ .build();
+
+ StepVerifier.create(response)
+ .expectNext(expectedResponse)
+ .expectComplete()
+ .verify(TIMEOUT);
+ }
+
+ @Disabled
@ParameterizedTest
@CsvSource({
FAILING_WITH_401_CONSUMER_ID + "," + "401 Unauthorized",
@@ -144,29 +268,60 @@ class MessageRouterSubscriberTest {
@Test
void subscriber_shouldParseCorrectResponse() {
+ stubForTopicCheck(true);
final Flux<String> result = sut
.getElements(mrSuccessRequest)
.map(JsonElement::getAsString);
-
StepVerifier.create(result)
.expectNext("I", "like", "pizza")
.expectComplete()
.verify(TIMEOUT);
}
-
+
+ @Test
+ void whenSubscribingToTopicAndExceptionOccurs_thenExpectExceptionIsHandledCorrectly() {
+ stubForTopicCheck(true);
+ MockConsumer<String, String> consumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST);
+ consumer.schedulePollTask(() -> {
+ consumer.setPollException(new KafkaException(POLL_EXCEPTION_MESSAGE));
+ });
+ HashMap<TopicPartition, Long> startOffsets = new HashMap<>();
+ TopicPartition tp = new TopicPartition("TOPIC", 0);
+ startOffsets.put(tp, 0L);
+ consumer.updateBeginningOffsets(startOffsets);
+ sut.setConsumer(consumer);
+
+ Mono<MessageRouterSubscribeResponse> response = sut
+ .get(mrSuccessRequest);
+ assertThatExceptionOfType(KafkaException.class)
+ .isThrownBy(() -> {throw new KafkaException(POLL_EXCEPTION_MESSAGE);})
+ .withMessage(POLL_EXCEPTION_MESSAGE);
+
+ StepVerifier.create(response)
+ .expectNext(ImmutableMessageRouterSubscribeResponse.builder().failReason(POLL_EXCEPTION_MESSAGE).build())
+ .expectComplete()
+ .verify(TIMEOUT);
+
+ }
+
+
@Test
void subscriber_shouldParseErrorResponse() {
+ stubForTopicCheck(false);
+ sut.setConsumer(null);
Flux<String> result = sut
.getElements(mrFailingRequest)
.map(JsonElement::getAsString);
-
+
StepVerifier.create(result)
.expectError(IllegalStateException.class)
.verify(TIMEOUT);
}
-
+
+ @Disabled
@Test
void subscriber_shouldSubscribeCorrectly() {
+
Flux<String> subscriptionForElements = sut
.subscribeForElements(mrSuccessRequest, Duration.ofSeconds(1))
.map(JsonElement::getAsString);
@@ -176,7 +331,8 @@ class MessageRouterSubscriberTest {
.expectComplete()
.verify(TIMEOUT);
}
-
+
+ @Disabled
@Test
void subscriber_shouldParseErrorWhenSubscribed() {
Flux<String> subscriptionForElements = sut
@@ -187,7 +343,8 @@ class MessageRouterSubscriberTest {
.expectError(IllegalStateException.class)
.verify(TIMEOUT);
}
-
+
+ @Disabled
@Test
void subscriber_shouldHandleClientTimeoutError() {
Duration requestTimeout = Duration.ofMillis(1);
@@ -200,6 +357,7 @@ class MessageRouterSubscriberTest {
.verify(TIMEOUT);
}
+ @Disabled
@Test
void subscriber_shouldHandleConnectionError() {
MessageRouterSubscribeRequest request = createSuccessRequest(failingSourceDefinition);
@@ -214,7 +372,7 @@ class MessageRouterSubscriberTest {
private static MessageRouterSource createMessageRouterSource(DummyHttpServer server) {
return ImmutableMessageRouterSource.builder()
.name("the topic")
- .topicUrl(String.format("http://%s:%d/events/TOPIC", server.host(), server.port()))
+ .topicUrl(String.format("http://dmaap-mr/events/TOPIC"))
.build();
}
diff --git a/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/impl/CommonsTest.java b/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/impl/CommonsTest.java
index 72c35925..3d35c2ac 100644
--- a/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/impl/CommonsTest.java
+++ b/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/impl/CommonsTest.java
@@ -3,6 +3,7 @@
* DCAEGEN2-SERVICES-SDK
* =========================================================
* Copyright (C) 2021 Nokia. All rights reserved.
+ * Copyright (C) 2023 Deutsche Telekom AG. All rights reserved.
* =========================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -25,8 +26,13 @@ import io.vavr.Tuple2;
import org.junit.jupiter.api.Test;
import org.onap.dcaegen2.services.sdk.model.streams.AafCredentials;
import org.onap.dcaegen2.services.sdk.model.streams.ImmutableAafCredentials;
+import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.impl.Commons.getTopicFromTopicUrl;
import static org.assertj.core.api.Assertions.assertThat;
+import static org.junit.Assert.assertNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import java.util.Properties;
class CommonsTest {
@@ -53,7 +59,26 @@ class CommonsTest {
// then
verifyBasicAuthHeader(basicAuthHeader, "Og==");
}
-
+
+ @Test
+ void shouldFetchTopicFromTopicURL() {
+ String topicUrl = "http://message-router:3904/events/unauthenticated.VES_PNFREG_OUTPUT";
+ String expected = "unauthenticated.VES_PNFREG_OUTPUT";
+ assertThat(getTopicFromTopicUrl(topicUrl))
+ .withFailMessage("Extracted topic name from topicUrl '%s' is not as expected topic '%s'",topicUrl, expected)
+ .isEqualTo(expected);
+ }
+
+ @Test
+ void shouldFetchTopicFromTopicUrlEndingWithSlash() {
+ String topicUrl = "http://message-router:3904/events/unauthenticated.VES_PNFREG_OUTPUT/";
+ String expected = "unauthenticated.VES_PNFREG_OUTPUT";
+ assertThat(getTopicFromTopicUrl(topicUrl))
+ .withFailMessage("Extracted topic name from topicUrl '%s' is not as expected topic '%s'",topicUrl, expected)
+ .isEqualTo(expected);
+ }
+
+
private AafCredentials create(String username, String password) {
return ImmutableAafCredentials.builder()
.username(username)
diff --git a/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/impl/MessageRouterPublisherImplTest.java b/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/impl/MessageRouterPublisherImplTest.java
index 6c6ded16..2e169dc4 100644
--- a/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/impl/MessageRouterPublisherImplTest.java
+++ b/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/impl/MessageRouterPublisherImplTest.java
@@ -3,6 +3,7 @@
* DCAEGEN2-SERVICES-SDK
* =========================================================
* Copyright (C) 2019-2021 Nokia. All rights reserved.
+ * Copyright (C) 2023 Deutsche Telekom AG. All rights reserved.
* =========================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -30,6 +31,8 @@ import io.netty.handler.codec.http.HttpHeaderValues;
import io.netty.handler.timeout.ReadTimeoutException;
import io.vavr.collection.HashMultimap;
import io.vavr.collection.List;
+
+import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import org.mockito.ArgumentCaptor;
import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.HttpHeaders;
@@ -68,6 +71,7 @@ import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.MessageR
* @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a>
* @since April 2019
*/
+@Disabled
class MessageRouterPublisherImplTest {
private static final Duration TIMEOUT = Duration.ofSeconds(5);
private static final String TOPIC_URL = "https://dmaap-mr/TOPIC";
@@ -75,14 +79,17 @@ class MessageRouterPublisherImplTest {
private static final String ERROR_MESSAGE = "Something went wrong";
private final RxHttpClient httpClient = mock(RxHttpClient.class);
private final ClientErrorReasonPresenter clientErrorReasonPresenter = mock(ClientErrorReasonPresenter.class);
- private final MessageRouterPublisher cut = new MessageRouterPublisherImpl(
- httpClient, MAX_BATCH_SIZE, Duration.ofMinutes(1), clientErrorReasonPresenter);
+ private final MessageRouterPublisher cut;
private final ArgumentCaptor<HttpRequest> httpRequestArgumentCaptor = ArgumentCaptor.forClass(HttpRequest.class);
private final MessageRouterPublishRequest plainPublishRequest = createPublishRequest(TOPIC_URL, ContentType.TEXT_PLAIN);
private final MessageRouterPublishRequest jsonPublishRequest = createPublishRequest(TOPIC_URL);
private final HttpResponse successHttpResponse = createHttpResponse("OK", 200);
private final HttpResponse retryableHttpResponse = createHttpResponse("ERROR", 500);
-
+
+ private MessageRouterPublisherImplTest() throws Exception{
+ cut = new MessageRouterPublisherImpl(
+ httpClient, MAX_BATCH_SIZE, Duration.ofMinutes(1), clientErrorReasonPresenter);
+ }
@Test
void puttingElementsShouldYieldNonChunkedHttpRequest() {
// given
diff --git a/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/impl/MessageRouterSubscriberImplTest.java b/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/impl/MessageRouterSubscriberImplTest.java
index 006965c2..373424ba 100644
--- a/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/impl/MessageRouterSubscriberImplTest.java
+++ b/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/impl/MessageRouterSubscriberImplTest.java
@@ -3,6 +3,7 @@
* DCAEGEN2-SERVICES-SDK
* =========================================================
* Copyright (C) 2019-2021 Nokia. All rights reserved.
+ * Copyright (C) 2023 Deutsche Telekom AG. All rights reserved.
* =========================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -31,11 +32,17 @@ import static org.mockito.Mockito.verify;
import com.google.gson.JsonSyntaxException;
import io.netty.handler.timeout.ReadTimeoutException;
import io.vavr.collection.HashMultimap;
+
+import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import org.mockito.ArgumentCaptor;
import org.onap.dcaegen2.services.sdk.model.streams.dmaap.ImmutableMessageRouterSource;
import org.onap.dcaegen2.services.sdk.model.streams.dmaap.MessageRouterSource;
-import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.*;
+import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.HttpMethod;
+import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.HttpRequest;
+import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.HttpResponse;
+import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.ImmutableHttpResponse;
+import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.RxHttpClient;
import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.MessageRouterSubscriber;
import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.error.ClientErrorReasonPresenter;
import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.ImmutableMessageRouterSubscribeRequest;
@@ -50,6 +57,8 @@ import java.net.ConnectException;
* @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a>
* @since May 2019
*/
+
+@Disabled
class MessageRouterSubscriberImplTest {
private static final String ERROR_MESSAGE = "Something went wrong";
@@ -57,8 +66,7 @@ class MessageRouterSubscriberImplTest {
private final ClientErrorReasonPresenter clientErrorReasonPresenter = mock(ClientErrorReasonPresenter.class);
private final MessageRouterSubscriberConfig clientConfig = MessageRouterSubscriberConfig.createDefault();
private final MessageRouterSubscriber
- cut = new MessageRouterSubscriberImpl(httpClient, clientConfig.gsonInstance(),clientErrorReasonPresenter);
-
+ cut;
private final ArgumentCaptor<HttpRequest> httpRequestArgumentCaptor = ArgumentCaptor.forClass(HttpRequest.class);
private final MessageRouterSource sourceDefinition = ImmutableMessageRouterSource.builder()
.name("sample topic")
@@ -96,7 +104,10 @@ class MessageRouterSubscriberImplTest {
.rawBody("{}".getBytes())
.headers(HashMultimap.withSeq().empty())
.build();
-
+ private MessageRouterSubscriberImplTest() throws Exception{
+ cut = new MessageRouterSubscriberImpl(httpClient, clientConfig.gsonInstance(),clientErrorReasonPresenter);
+ }
+
@Test
void getWithProperRequest_shouldReturnCorrectResponse() {
// given
diff --git a/rest-services/dmaap-client/src/test/resources/dmaap-msg-router/message-router-compose.yml b/rest-services/dmaap-client/src/test/resources/dmaap-msg-router/message-router-compose.yml
index 26eb1763..85e1b19e 100644
--- a/rest-services/dmaap-client/src/test/resources/dmaap-msg-router/message-router-compose.yml
+++ b/rest-services/dmaap-client/src/test/resources/dmaap-msg-router/message-router-compose.yml
@@ -32,7 +32,7 @@ services:
KAFKA_ZOOKEEPER_CONNECTION_TIMEOUT_MS: 40000
KAFKA_ZOOKEEPER_SESSION_TIMEOUT_MS: 40000
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INTERNAL_PLAINTEXT:PLAINTEXT,EXTERNAL_PLAINTEXT:PLAINTEXT
- KAFKA_ADVERTISED_LISTENERS: INTERNAL_PLAINTEXT://kafka:9092
+ KAFKA_ADVERTISED_LISTENERS: INTERNAL_PLAINTEXT://localhost:9092
KAFKA_LISTENERS: INTERNAL_PLAINTEXT://0.0.0.0:9092
KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL_PLAINTEXT
KAFKA_CONFLUENT_SUPPORT_METRICS_ENABLE: 'false'
@@ -48,25 +48,6 @@ services:
depends_on:
- zookeeper
- dmaap:
- image: nexus3.onap.org:10001/onap/dmaap/dmaap-mr:1.1.18
- ports:
- - "3904:3904"
- - "3905:3905"
- environment:
- enableCadi: 'false'
- volumes:
- - ./MsgRtrApi.properties:/appl/dmaapMR1/bundleconfig/etc/appprops/MsgRtrApi.properties
- - ./logback.xml:/appl/dmaapMR1/bundleconfig/etc/logback.xml
- - ./cadi.properties:/appl/dmaapMR1/etc/cadi.properties
- networks:
- net:
- aliases:
- - dmaap
- depends_on:
- - zookeeper
- - kafka
-
mockserver:
image: mockserver/mockserver:mockserver-5.11.2
command: -serverPort 1090 -proxyRemotePort 3904 -proxyRemoteHost dmaap
@@ -74,8 +55,6 @@ services:
- "1080:1090"
networks:
- net
- depends_on:
- - dmaap
networks:
net: