diff options
author | sushant53 <sushant.jadhav@t-systems.com> | 2023-08-11 19:45:44 +0530 |
---|---|---|
committer | Sushant Jadhav <sushant.jadhav@t-systems.com> | 2023-09-11 12:09:33 +0000 |
commit | 86513b7ca5b8cc8ba93bf23176aeac57656b7c66 (patch) | |
tree | 0b1a0499dbccbb937c8eca7b2cef075ad63134df /rest-services | |
parent | 9d8a9326758a162eb26236a1dd9de1c29c504554 (diff) |
[DCAEGEN2] Use kafka API directly in DMaaP library
Use kafka API directly in dmaap-client library instead of the DMaaP Rest APIs.
Issue-ID: DCAEGEN2-3364
Change-Id: I7f27d9d5f443fe3934896fa01f907b6001898495
Signed-off-by: sushant53 <sushant.jadhav@t-systems.com>
Diffstat (limited to 'rest-services')
16 files changed, 922 insertions, 149 deletions
diff --git a/rest-services/dmaap-client/pom.xml b/rest-services/dmaap-client/pom.xml index 8123af31..5a3ef94b 100644 --- a/rest-services/dmaap-client/pom.xml +++ b/rest-services/dmaap-client/pom.xml @@ -2,6 +2,7 @@ <!-- ============LICENSE_START======================================================= Copyright (c) 2022 Nokia. All rights reserved. +Copyright (c) 2023 Deutsche Telekom AG. All rights reserved. ================================================================================ Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy @@ -98,5 +99,22 @@ language governing permissions and limitations under the License. <artifactId>mockserver-client-java</artifactId> <version>${mockserver-client.version}</version> </dependency> + <dependency> + <groupId>org.apache.kafka</groupId> + <artifactId>kafka-clients</artifactId> + <version>3.3.1</version> + </dependency> + <dependency> + <groupId>uk.org.webcompere</groupId> + <artifactId>system-stubs-jupiter</artifactId> + <version>1.1.0</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.mockito</groupId> + <artifactId>mockito-junit-jupiter</artifactId> + <scope>test</scope> + </dependency> + </dependencies> </project> diff --git a/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/DmaapClientFactory.java b/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/DmaapClientFactory.java index ee4f6d38..6e7f6049 100644 --- a/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/DmaapClientFactory.java +++ b/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/DmaapClientFactory.java @@ -3,6 +3,7 @@ * DCAEGEN2-SERVICES-SDK * ========================================================= * Copyright (C) 2019-2021 Nokia. All rights reserved. + * Copyright (C) 2023 Deutsche Telekom AG. All rights reserved. * ========================================================= * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -35,6 +36,8 @@ import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.impl.MessageRou import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.DmaapClientConfiguration; import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.MessageRouterPublisherConfig; import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.MessageRouterSubscriberConfig; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.time.Duration; @@ -46,26 +49,37 @@ import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.co * @since 1.1.4 */ public final class DmaapClientFactory { - + private static final Logger LOGGER = LoggerFactory.getLogger(DmaapClientFactory.class); private DmaapClientFactory() { } public static @NotNull MessageRouterPublisher createMessageRouterPublisher( @NotNull MessageRouterPublisherConfig clientConfiguration) { - return new MessageRouterPublisherImpl( + try { + return new MessageRouterPublisherImpl( createHttpClient(clientConfiguration), clientConfiguration.maxBatchSize(), clientConfiguration.maxBatchDuration(), new ClientErrorReasonPresenter()); + } catch (Exception e) { + LOGGER.error("Error while creating the Message Router Publisher."); + return null; + } } public static @NotNull MessageRouterSubscriber createMessageRouterSubscriber( @NotNull MessageRouterSubscriberConfig clientConfiguration) { - return new MessageRouterSubscriberImpl( - createHttpClient(clientConfiguration), - clientConfiguration.gsonInstance(), - new ClientErrorReasonPresenter()); + try { + return new MessageRouterSubscriberImpl( + createHttpClient(clientConfiguration), + clientConfiguration.gsonInstance(), + new ClientErrorReasonPresenter()); + } catch (Exception e) { + LOGGER.error("Error while creating the Message Router Subscriber."); + return null; + } + } private static @NotNull RxHttpClient createHttpClient(DmaapClientConfiguration config) { diff --git a/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterPublisher.java b/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterPublisher.java index 08825b4c..f98e8198 100644 --- a/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterPublisher.java +++ b/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterPublisher.java @@ -3,6 +3,7 @@ * DCAEGEN2-SERVICES-SDK * ========================================================= * Copyright (C) 2019 Nokia. All rights reserved. + * Copyright (C) 2023 Deutsche Telekom AG. All rights reserved. * ========================================================= * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -21,6 +22,13 @@ package org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api; import com.google.gson.JsonElement; + +import io.vavr.collection.List; + +import java.util.Properties; + +import org.apache.kafka.clients.producer.Producer; +import org.onap.dcaegen2.services.sdk.model.streams.dmaap.KafkaSink; import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishRequest; import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishResponse; import reactor.core.publisher.Flux; @@ -30,5 +38,7 @@ import reactor.core.publisher.Flux; * @since 1.1.4 */ public interface MessageRouterPublisher { + void close(); + void setKafkaProducer(Producer<String, String> kafkaProducer); Flux<MessageRouterPublishResponse> put(MessageRouterPublishRequest request, Flux<? extends JsonElement> items); } diff --git a/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterSubscriber.java b/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterSubscriber.java index d91535d3..fbf90d9e 100644 --- a/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterSubscriber.java +++ b/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterSubscriber.java @@ -3,6 +3,7 @@ * DCAEGEN2-SERVICES-SDK * ========================================================= * Copyright (C) 2019 Nokia. All rights reserved. + * Copyright (C) 2023 Deutsche Telekom AG. All rights reserved. * ========================================================= * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -21,7 +22,14 @@ package org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api; import com.google.gson.JsonElement; + +import io.vavr.collection.List; + import java.time.Duration; + +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.onap.dcaegen2.services.sdk.model.streams.dmaap.KafkaSource; import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterSubscribeRequest; import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterSubscribeResponse; import reactor.core.publisher.Flux; @@ -33,6 +41,8 @@ import reactor.core.publisher.Mono; */ public interface MessageRouterSubscriber { + void setConsumer(Consumer<String, String> consumer); + void close(); Mono<MessageRouterSubscribeResponse> get(MessageRouterSubscribeRequest request); default Flux<JsonElement> getElements(MessageRouterSubscribeRequest request) { @@ -49,4 +59,5 @@ public interface MessageRouterSubscriber { default Flux<JsonElement> subscribeForElements(MessageRouterSubscribeRequest request, Duration period) { return Flux.interval(period).concatMap(i->getElements(request)); } + } diff --git a/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/impl/Commons.java b/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/impl/Commons.java index 9f534d8f..4ea80e48 100644 --- a/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/impl/Commons.java +++ b/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/impl/Commons.java @@ -3,6 +3,7 @@ * DCAEGEN2-SERVICES-SDK * ========================================================= * Copyright (C) 2019-2021 Nokia. All rights reserved. + * Copyright (C) 2023 Deutsche Telekom AG. All rights reserved. * ========================================================= * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -23,19 +24,43 @@ package org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.impl; import io.vavr.Tuple; import io.vavr.Tuple2; import io.vavr.control.Option; + +import org.apache.kafka.clients.admin.AdminClient; import org.onap.dcaegen2.services.sdk.model.streams.AafCredentials; import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.HttpResponse; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.nio.charset.StandardCharsets; import java.util.Base64; +import java.util.HashMap; +import java.util.Map; import java.util.Objects; +import java.util.Properties; +import java.util.concurrent.ExecutionException; /** * @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a> * @since April 2019 */ -final class Commons { +public final class Commons { + static String commonInURL = "/events/"; + static String KAFKA_PROPS_PREFIX = "kafka."; + + private static final Logger LOGGER = LoggerFactory.getLogger(Commons.class); + private static AdminClient kafkaAdminClient; + private static Map<String,Object> map = new HashMap<>(); + static { + map.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); + map.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); + map.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); + map.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); + map.put("max.poll.interval.ms", 300000); + map.put("heartbeat.interval.ms", 60000); + map.put("session.timeout.ms", 240000); + map.put("max.poll.records", 1000); + } private Commons() { } @@ -67,4 +92,69 @@ final class Commons { .map(s -> s.getBytes(StandardCharsets.UTF_8)) .getOrElse(new byte[0]); } + /** + * Extracts the topic name from the topicUrl. + * + * <p>Condition for extracting topic name : Substring after '/events/' in the topicUrl</p> + * + * @param topicUrl + * @return topic + */ + public static String getTopicFromTopicUrl(String topicUrl) { + if(topicUrl.endsWith("/")) { + return topicUrl.substring(topicUrl.indexOf(commonInURL)+commonInURL.length(), topicUrl.lastIndexOf("/")); + } + return topicUrl.substring(topicUrl.indexOf(commonInURL)+commonInURL.length()); + } + + public static Properties setKafkaPropertiesFromSystemEnv(Map<String, String> envs) { + Map<String, Object> propMap= getKafkaPropertiesMap(envs); + Properties props = new Properties(); + propMap.forEach((k ,v) -> props.put(k, v)); + map.forEach((k,v) -> { + if(!propMap.containsKey(k)) { + props.put(k, v); + } + }); + + return props; + } + + static Map<String, Object> getKafkaPropertiesMap(Map<String, String> envs){ + Map<String, Object> propMap = new HashMap<>(); + envs.forEach((k ,v) -> { + if(k.startsWith(KAFKA_PROPS_PREFIX)){ + String key = k.substring(KAFKA_PROPS_PREFIX.length()); + propMap.put(key, v); + } + }); + return propMap; + } + + public static void closeKafkaAdminClient() { + if(kafkaAdminClient != null) { + LOGGER.info("Closing the Kafka AdminClient."); + kafkaAdminClient.close(); + kafkaAdminClient=null; + } + } + + public static boolean checkIfTopicIsPresentInKafka(String topic, Properties adminProps) { + if(kafkaAdminClient == null) { + kafkaAdminClient = AdminClient.create(adminProps); + } + try { + for (String name : kafkaAdminClient.listTopics().names().get()) { + if (name.equals(topic)) { + LOGGER.debug("TOPIC_NAME: {} is equal to : {}", name, topic); + return true; + } + } + } catch (InterruptedException | ExecutionException e) { + LOGGER.error("GetTopicFromKafka: Failed to retrieve topic list from kafka.", e); + return false; + } + return false; + } + } diff --git a/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/impl/MessageRouterPublisherImpl.java b/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/impl/MessageRouterPublisherImpl.java index 534fca6b..329c2a36 100644 --- a/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/impl/MessageRouterPublisherImpl.java +++ b/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/impl/MessageRouterPublisherImpl.java @@ -3,6 +3,7 @@ * DCAEGEN2-SERVICES-SDK * ========================================================= * Copyright (C) 2019-2021 Nokia. All rights reserved. + * Copyright (C) 2023 Deutsche Telekom AG. All rights reserved. * ========================================================= * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -22,12 +23,28 @@ package org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.impl; import com.google.gson.JsonArray; import com.google.gson.JsonElement; +import com.google.gson.JsonObject; + import io.netty.handler.timeout.ReadTimeoutException; import io.vavr.collection.HashMap; import io.vavr.collection.List; import io.vavr.collection.Map; +import io.vavr.collection.Stream; import io.vavr.control.Option; + +import org.apache.kafka.clients.CommonClientConfigs; +import org.apache.kafka.clients.producer.Callback; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.Producer; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.clients.producer.RecordMetadata; +import org.apache.kafka.common.config.SaslConfigs; +import org.apache.kafka.common.errors.TimeoutException; +import org.apache.kafka.common.security.auth.SecurityProtocol; +import org.apache.kafka.common.security.scram.internals.ScramMechanism; import org.jetbrains.annotations.NotNull; +import org.onap.dcaegen2.services.sdk.model.streams.dmaap.ImmutableKafkaSink; +import org.onap.dcaegen2.services.sdk.model.streams.dmaap.KafkaSink; import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.HttpHeaders; import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.HttpMethod; import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.HttpRequest; @@ -54,9 +71,14 @@ import reactor.netty.internal.shaded.reactor.pool.PoolAcquirePendingLimitExcepti import java.net.ConnectException; import java.time.Duration; +import java.util.ArrayList; +import java.util.Properties; +import java.util.concurrent.Future; import java.util.stream.Collectors; import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.impl.Commons.extractFailReason; +import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.impl.Commons.getTopicFromTopicUrl; +import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.impl.Commons.setKafkaPropertiesFromSystemEnv; /** * @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a> @@ -67,24 +89,136 @@ public class MessageRouterPublisherImpl implements MessageRouterPublisher { private final int maxBatchSize; private final Duration maxBatchDuration; private final ClientErrorReasonPresenter clientErrorReasonPresenter; - - private static final Logger LOGGER = LoggerFactory.getLogger(MessageRouterPublisherImpl.class); - - public MessageRouterPublisherImpl(RxHttpClient httpClient, int maxBatchSize, Duration maxBatchDuration, ClientErrorReasonPresenter clientErrorReasonPresenter) { + + private static Properties props; + private static final String kafkaBootstrapServers = "BOOTSTRAP_SERVERS"; + private static final Logger LOGGER = LoggerFactory.getLogger(MessageRouterPublisherImpl.class); + private static Producer<String, String> kafkaProducer; + public static Future<RecordMetadata> future; + static boolean flag; + static Exception exception; + public MessageRouterPublisherImpl(RxHttpClient httpClient, int maxBatchSize, Duration maxBatchDuration, ClientErrorReasonPresenter clientErrorReasonPresenter) throws Exception { this.httpClient = httpClient; this.maxBatchSize = maxBatchSize; this.maxBatchDuration = maxBatchDuration; this.clientErrorReasonPresenter = clientErrorReasonPresenter; + setProperties(); } - + + /** + * New constructor that does not take DMaaP parameters as arguments. + * + * @throws Exception + */ + public MessageRouterPublisherImpl() throws Exception { + this.httpClient = null; + this.maxBatchSize = 0; + this.maxBatchDuration = null; + this.clientErrorReasonPresenter = null; + setProperties(); + } + +// @Override +// public Flux<MessageRouterPublishResponse> put( +// MessageRouterPublishRequest request, +// Flux<? extends JsonElement> items) { +// return items.bufferTimeout(maxBatchSize, maxBatchDuration) +// .flatMap(subItems -> subItems.isEmpty() ? Mono.empty() : pushBatchToMr(request, List.ofAll(subItems))); +// } + + @Override public Flux<MessageRouterPublishResponse> put( MessageRouterPublishRequest request, Flux<? extends JsonElement> items) { - return items.bufferTimeout(maxBatchSize, maxBatchDuration) - .flatMap(subItems -> subItems.isEmpty() ? Mono.empty() : pushBatchToMr(request, List.ofAll(subItems))); + flag = true; + exception=null; + future = null; + List<String> batch = getBatch(items); + String topic = getTopicFromTopicUrl(request.sinkDefinition().topicUrl()); + LOGGER.info("Topic extracted from URL {} is : {} ",request.sinkDefinition().topicUrl(),topic); + LOGGER.info("Sending a batch of {} items for topic {} to kafka", batch.size(),topic); + LOGGER.trace("The items to be sent: {}", batch); + if(kafkaProducer == null) { + kafkaProducer = new KafkaProducer<>(props); + } + Flux<MessageRouterPublishResponse> response; + try { + + for (String msg : batch) { + ProducerRecord<String, String> data = + new ProducerRecord<>(topic, msg); + future = kafkaProducer.send(data,new Callback() { + + @Override + public void onCompletion(RecordMetadata metadata, Exception e) { + + if(e != null) { + flag=false; + exception = e; + } + } + }); + } + if(flag) { + LOGGER.info("Sent a batch of {} items for topic {} to kafka", batch.size(),topic); + response = Flux.just(ImmutableMessageRouterPublishResponse.builder().items(List.ofAll(items.collectList().block())).build()); + }else { + throw exception; + } + }catch(Exception e) { + LOGGER.error("Error while publishing the messages : {}",e.getStackTrace()); + response = Flux.just(ImmutableMessageRouterPublishResponse.builder() + .failReason(e.getMessage()) + .build()); + } + return response; } - + + @Override + public void close() { + LOGGER.info("Closing the Kafka Producer"); + if(kafkaProducer != null) { + kafkaProducer.close(); + kafkaProducer=null; + } + } + + @Override + public void setKafkaProducer(Producer<String, String> kafkaProducer) { + this.kafkaProducer = kafkaProducer; + } + + public static Future<RecordMetadata> getFuture(){ + return future; + } + + void setProperties() throws Exception { + props = setKafkaPropertiesFromSystemEnv(System.getenv()); + + if(System.getenv(kafkaBootstrapServers) == null) { + LOGGER.error("Environment Variable "+ kafkaBootstrapServers+" is missing"); + throw new Exception("Environment Variable "+ kafkaBootstrapServers+" is missing"); + }else { + props.put("bootstrap.servers", System.getenv(kafkaBootstrapServers)); + } + if(System.getenv("JAAS_CONFIG") == null) { + LOGGER.info("Not using any authentication for kafka interaction"); + }else { + LOGGER.info("Using {} authentication provided for kafka interaction",ScramMechanism.SCRAM_SHA_512.mechanismName()); + props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG,SecurityProtocol.SASL_PLAINTEXT.name); + props.put(SaslConfigs.SASL_MECHANISM, ScramMechanism.SCRAM_SHA_512.mechanismName()); + props.put(SaslConfigs.SASL_JAAS_CONFIG, System.getenv("JAAS_CONFIG")); + } + } + + static List<String> getBatch(Flux<? extends JsonElement> items){ + java.util.List<String> list = new ArrayList<>(); + items.map(msg -> msg.toString()).collectList().subscribe(data -> list.addAll(data)); + return List.ofAll(list); + + } + private Publisher<? extends MessageRouterPublishResponse> pushBatchToMr( MessageRouterPublishRequest request, List<JsonElement> batch) { @@ -153,4 +287,5 @@ public class MessageRouterPublisherImpl implements MessageRouterPublisher { .getOrElse(HashMap.empty()); return headers.put(HttpHeaders.CONTENT_TYPE, request.contentType().toString()); } + } diff --git a/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/impl/MessageRouterSubscriberImpl.java b/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/impl/MessageRouterSubscriberImpl.java index d98e8d3a..c90a8064 100644 --- a/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/impl/MessageRouterSubscriberImpl.java +++ b/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/impl/MessageRouterSubscriberImpl.java @@ -3,6 +3,7 @@ * DCAEGEN2-SERVICES-SDK * ========================================================= * Copyright (C) 2019-2021 Nokia. All rights reserved. + * Copyright (C) 2023 Deutsche Telekom AG. All rights reserved. * ========================================================= * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -29,6 +30,16 @@ import io.vavr.collection.HashMap; import io.vavr.collection.List; import io.vavr.collection.Map; import io.vavr.control.Option; + +import org.apache.kafka.clients.CommonClientConfigs; +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.common.config.SaslConfigs; +import org.apache.kafka.common.security.auth.SecurityProtocol; +import org.apache.kafka.common.security.scram.internals.ScramMechanism; import org.jetbrains.annotations.NotNull; import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.HttpMethod; import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.HttpRequest; @@ -40,19 +51,28 @@ import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.MessageRout import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.error.ClientErrorReason; import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.error.ClientErrorReasonPresenter; import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.error.ClientErrorReasons; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.ImmutableMessageRouterPublishResponse; import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.ImmutableMessageRouterSubscribeResponse; import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterSubscribeRequest; import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterSubscribeResponse; import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.DmaapTimeoutConfig; import org.slf4j.Logger; import org.slf4j.LoggerFactory; + +import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import java.net.ConnectException; import java.nio.charset.StandardCharsets; import java.time.Duration; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Properties; import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.impl.Commons.extractFailReason; +import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.impl.Commons.getTopicFromTopicUrl; +import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.impl.Commons.checkIfTopicIsPresentInKafka; +import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.impl.Commons.setKafkaPropertiesFromSystemEnv; /** * @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a> @@ -63,27 +83,134 @@ public class MessageRouterSubscriberImpl implements MessageRouterSubscriber { private final Gson gson; private final ClientErrorReasonPresenter clientErrorReasonPresenter; private static final Logger LOGGER = LoggerFactory.getLogger(MessageRouterSubscriberImpl.class); - + private static Properties props; + private static final String kafkaBootstrapServers = "BOOTSTRAP_SERVERS"; + private static Consumer<String, String> consumer; + public MessageRouterSubscriberImpl(RxHttpClient httpClient, Gson gson, - ClientErrorReasonPresenter clientErrorReasonPresenter) { + ClientErrorReasonPresenter clientErrorReasonPresenter) throws Exception { this.httpClient = httpClient; this.gson = gson; this.clientErrorReasonPresenter = clientErrorReasonPresenter; + setProperties(); + } + + /** + * New constructor that does not take DMaaP parameters as arguments. + * + * @throws Exception + */ + public MessageRouterSubscriberImpl() throws Exception { + this.httpClient = null; + this.gson = null; + this.clientErrorReasonPresenter = null; + setProperties(); } @Override public Mono<MessageRouterSubscribeResponse> get(MessageRouterSubscribeRequest request) { - LOGGER.debug("Requesting new items from DMaaP MR: {}", request); - return httpClient.call(buildGetHttpRequest(request)) - .map(this::buildGetResponse) - .doOnError(ReadTimeoutException.class, - e -> LOGGER.error("Timeout exception occurred when subscribe items from DMaaP MR", e)) - .onErrorResume(ReadTimeoutException.class, e -> buildErrorResponse(ClientErrorReasons.TIMEOUT)) - .doOnError(ConnectException.class, e -> LOGGER.error("DMaaP MR is unavailable, {}", e.getMessage())) - .onErrorResume(ConnectException.class, e -> buildErrorResponse(ClientErrorReasons.SERVICE_UNAVAILABLE)) - .onErrorResume(RetryableException.class, e -> Mono.just(buildGetResponse(e.getResponse()))); + LOGGER.info("Requesting new items from DMaaP MR: {}", request); + String topic = getTopicFromTopicUrl(request.sourceDefinition().topicUrl()); + + String fakeGroupName = request.consumerGroup(); + props.put("client.id", request.consumerId()); + props.put("group.id", fakeGroupName); + + try{ + if (consumer == null) { + if(!checkIfTopicIsPresentInKafka(topic,getAdminProps())) { + LOGGER.error("No such topic exists, TOPIC_NAME : {}", topic); + return Mono.just(ImmutableMessageRouterSubscribeResponse.builder() + .failReason("404 Topic Not Found") + .build()); + } + consumer = getKafkaConsumer(props); + consumer.subscribe(Arrays.asList(topic)); + } + ArrayList<String> msgs = new ArrayList<>(); + + ConsumerRecords<String, String> records = null; + synchronized (consumer) { + records = consumer.poll(Duration.ofMillis(500)); + } + for (ConsumerRecord<String, String> rec : records) { + msgs.add(rec.value()); + } + List<JsonElement> list = List.ofAll(msgs).map(r -> JsonParser.parseString(r)); + return Mono.just(ImmutableMessageRouterSubscribeResponse.builder() + .items(list) + .build()); + } catch(Exception e) { + LOGGER.error("Error while consuming the messages : {}",e.getMessage()); + return Mono.just(ImmutableMessageRouterSubscribeResponse.builder() + .failReason(e.getMessage()) + .build()); + } } - + + @Override + public void setConsumer(Consumer<String, String> consumer) { + MessageRouterSubscriberImpl.consumer = consumer; + } + + public static KafkaConsumer<String, String> getKafkaConsumer(Properties props){ + return new KafkaConsumer<>(props); + } + + @Override + public void close(){ + if(consumer != null) { + LOGGER.info("Closing the Kafka Consumer"); + consumer.close(); + consumer = null; + } + Commons.closeKafkaAdminClient(); + } + + void setProperties() throws Exception { + props = setKafkaPropertiesFromSystemEnv(System.getenv()); + + if(System.getenv(kafkaBootstrapServers) == null) { + LOGGER.error("Environment Variable "+ kafkaBootstrapServers+" is missing"); + throw new Exception("Environment Variable "+ kafkaBootstrapServers+" is missing"); + }else { + props.put("bootstrap.servers", System.getenv(kafkaBootstrapServers)); + } + props.put(ConsumerConfig.ALLOW_AUTO_CREATE_TOPICS_CONFIG,false); + + if(System.getenv("JAAS_CONFIG") == null) { + LOGGER.info("Not using any authentication for kafka interaction"); + }else { + LOGGER.info("Using {} authentication provided for kafka interaction",ScramMechanism.SCRAM_SHA_512.mechanismName()); + props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG,SecurityProtocol.SASL_PLAINTEXT.name); + props.put(SaslConfigs.SASL_MECHANISM, ScramMechanism.SCRAM_SHA_512.mechanismName()); + props.put(SaslConfigs.SASL_JAAS_CONFIG, System.getenv("JAAS_CONFIG")); + } + } + +// @Override +// public Mono<MessageRouterSubscribeResponse> get(MessageRouterSubscribeRequest request) { +// LOGGER.debug("Requesting new items from DMaaP MR: {}", request); +// return httpClient.call(buildGetHttpRequest(request)) +// .map(this::buildGetResponse) +// .doOnError(ReadTimeoutException.class, +// e -> LOGGER.error("Timeout exception occurred when subscribe items from DMaaP MR", e)) +// .onErrorResume(ReadTimeoutException.class, e -> buildErrorResponse(ClientErrorReasons.TIMEOUT)) +// .doOnError(ConnectException.class, e -> LOGGER.error("DMaaP MR is unavailable, {}", e.getMessage())) +// .onErrorResume(ConnectException.class, e -> buildErrorResponse(ClientErrorReasons.SERVICE_UNAVAILABLE)) +// .onErrorResume(RetryableException.class, e -> Mono.just(buildGetResponse(e.getResponse()))); +// } + public static Properties getAdminProps() { + Properties adminProps = new Properties(); + adminProps.put("bootstrap.servers", System.getenv(kafkaBootstrapServers)); + if(System.getenv("JAAS_CONFIG") != null) { + adminProps.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG,SecurityProtocol.SASL_PLAINTEXT.name); + adminProps.put(SaslConfigs.SASL_MECHANISM, ScramMechanism.SCRAM_SHA_512.mechanismName()); + adminProps.put(SaslConfigs.SASL_JAAS_CONFIG, System.getenv("JAAS_CONFIG")); + } + return adminProps; + } + private @NotNull HttpRequest buildGetHttpRequest(MessageRouterSubscribeRequest request) { return ImmutableHttpRequest.builder() .method(HttpMethod.GET) @@ -132,4 +259,7 @@ public class MessageRouterSubscriberImpl implements MessageRouterSubscriber { .map(HashMap::of) .getOrElse(HashMap.empty()); } + + + } diff --git a/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/DMaapContainer.java b/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/DMaapContainer.java index 5b1984df..a1f9ac9f 100644 --- a/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/DMaapContainer.java +++ b/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/DMaapContainer.java @@ -3,6 +3,7 @@ * DCAEGEN2-SERVICES-SDK * ========================================================= * Copyright (C) 2019-2021 Nokia. All rights reserved. + * Copyright (C) 2023 Deutsche Telekom AG. All rights reserved. * ========================================================= * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -38,7 +39,6 @@ final class DMaapContainer { static DockerComposeContainer createContainerInstance(){ return new DockerComposeContainer( new File(DOCKER_COMPOSE_FILE_PATH)) - .withExposedService(DMAAP_SERVICE_NAME, DMAAP_SERVICE_EXPOSED_PORT) .withLocalCompose(true); } diff --git a/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterPublisherIT.java b/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterPublisherIT.java index a1ad951f..a806ba19 100644 --- a/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterPublisherIT.java +++ b/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterPublisherIT.java @@ -3,6 +3,7 @@ * DCAEGEN2-SERVICES-SDK * ========================================================= * Copyright (C) 2019-2021 Nokia. All rights reserved. + * Copyright (C) 2023 Deutsche Telekom AG. All rights reserved. * ========================================================= * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -23,13 +24,22 @@ package org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api; import com.google.gson.JsonElement; import com.google.gson.JsonObject; import io.vavr.collection.List; + +import org.apache.kafka.clients.admin.AdminClient; +import org.apache.kafka.clients.admin.NewTopic; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.MockedStatic; import org.mockserver.client.MockServerClient; import org.mockserver.matchers.Times; import org.mockserver.verify.VerificationTimes; import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.ContentType; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.impl.Commons; import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishRequest; import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishResponse; import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterSubscribeRequest; @@ -45,10 +55,16 @@ import org.testcontainers.junit.jupiter.Testcontainers; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.test.StepVerifier; +import uk.org.webcompere.systemstubs.environment.EnvironmentVariables; +import uk.org.webcompere.systemstubs.jupiter.SystemStub; +import uk.org.webcompere.systemstubs.jupiter.SystemStubsExtension; import java.time.Duration; +import java.util.Arrays; +import java.util.Properties; import java.util.concurrent.TimeUnit; +import static org.mockito.Mockito.mockStatic; import static org.mockserver.model.HttpRequest.request; import static org.mockserver.model.HttpResponse.response; import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.MessageRouterTestsUtils.createMRSubscribeRequest; @@ -65,6 +81,7 @@ import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.DMaa import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.DMaapContainer.PROXY_MOCK_SERVICE_EXPOSED_PORT; import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.DMaapContainer.createContainerInstance; +@ExtendWith(SystemStubsExtension.class) @Testcontainers class MessageRouterPublisherIT { @Container @@ -74,6 +91,7 @@ class MessageRouterPublisherIT { private static String EVENTS_PATH; private static String PROXY_MOCK_EVENTS_PATH; + private static final long REPEAT_SUBSCRIPTION = 20; private static final Duration TIMEOUT = Duration.ofSeconds(10); private static final String DMAAP_400_ERROR_RESPONSE_FORMAT = "400 Bad Request\n" + "{" @@ -108,22 +126,46 @@ class MessageRouterPublisherIT { + "}" + "}"; - private final MessageRouterPublisher publisher = DmaapClientFactory - .createMessageRouterPublisher(MessageRouterPublisherConfig.createDefault()); - private final MessageRouterSubscriber subscriber = DmaapClientFactory - .createMessageRouterSubscriber(MessageRouterSubscriberConfig.createDefault()); - + private MessageRouterPublisher publisher; + private MessageRouterSubscriber subscriber; + Mono<MessageRouterSubscribeResponse> response; + + @SystemStub + EnvironmentVariables environmentVariables = new EnvironmentVariables(); + @BeforeAll static void setUp() { EVENTS_PATH = String.format("http://%s:%d/events", LOCALHOST, DMAAP_SERVICE_EXPOSED_PORT); PROXY_MOCK_EVENTS_PATH = String.format("http://%s:%d/events", LOCALHOST, PROXY_MOCK_SERVICE_EXPOSED_PORT); + //sleep introduced to wait till all containers are started + try { + Thread.sleep(10000); + } catch (InterruptedException e) { + e.printStackTrace(); + } } - + + @AfterEach + void afterEach() { + publisher.close(); + subscriber.close(); + } + @BeforeEach void set() { MOCK_SERVER_CLIENT.reset(); + environmentVariables + .set("BOOTSTRAP_SERVERS", "localhost:9092") + .set("kafka.auto.offset.reset","earliest"); + publisher = DmaapClientFactory + .createMessageRouterPublisher(MessageRouterPublisherConfig.createDefault()); + subscriber = DmaapClientFactory + .createMessageRouterSubscriber(MessageRouterSubscriberConfig.createDefault()); + response=null; + } - + + @Disabled @Test void test_put_givenMessageBatch_shouldMakeSuccessfulPostRequestReturningBatch() { //given @@ -143,7 +185,8 @@ class MessageRouterPublisherIT { .expectComplete() .verify(TIMEOUT); } - + + @Disabled @Test void publisher_shouldHandleBadRequestError() { //given @@ -175,13 +218,16 @@ class MessageRouterPublisherIT { final MessageRouterPublishRequest publishRequest = createPublishRequest(topicUrl); final MessageRouterSubscribeRequest subscribeRequest = createMRSubscribeRequest(topicUrl, "sampleGroup", "sampleId"); final MessageRouterSubscribeResponse expectedResponse = successSubscribeResponse(expectedItems); - + //when - registerTopic(publisher, publishRequest, subscriber, subscribeRequest); - Mono<MessageRouterSubscribeResponse> response = publisher - .put(publishRequest, jsonMessageBatch) - .then(subscriber.get(subscribeRequest)); - + publisher.put(publishRequest, jsonMessageBatch).repeat(REPEAT_SUBSCRIPTION) + .subscribe(r -> { subscriber.get(subscribeRequest).subscribe(resp -> { + if(!resp.items().isEmpty()) { + response = Mono.just(resp); + } + }); + }); + //then StepVerifier.create(response) .expectNext(expectedResponse) @@ -200,12 +246,15 @@ class MessageRouterPublisherIT { final MessageRouterPublishRequest publishRequest = createPublishRequest(topicUrl); final MessageRouterSubscribeRequest subscribeRequest = createMRSubscribeRequest(topicUrl, "sampleGroup", "sampleId"); final MessageRouterSubscribeResponse expectedResponse = successSubscribeResponse(expectedItems); - + //when - registerTopic(publisher, publishRequest, subscriber, subscribeRequest); - Mono<MessageRouterSubscribeResponse> response = publisher - .put(publishRequest, jsonMessageBatch) - .then(subscriber.get(subscribeRequest)); + publisher.put(publishRequest, jsonMessageBatch).repeat(REPEAT_SUBSCRIPTION) + .subscribe(r -> { subscriber.get(subscribeRequest).subscribe(resp -> { + if(!resp.items().isEmpty()) { + response = Mono.just(resp); + } + }); + }); //then StepVerifier.create(response) @@ -229,11 +278,13 @@ class MessageRouterPublisherIT { final MessageRouterSubscribeResponse expectedResponse = successSubscribeResponse(expectedItems); //when - registerTopic(publisher, publishRequest, subscriber, subscribeRequest); - Mono<MessageRouterSubscribeResponse> response = publisher - .put(publishRequest, plainBatch) - .then(subscriber.get(subscribeRequest)); - + publisher.put(publishRequest, plainBatch).repeat(REPEAT_SUBSCRIPTION) + .subscribe(r -> { subscriber.get(subscribeRequest).subscribe(resp -> { + if(!resp.items().isEmpty()) { + response = Mono.just(resp); + } + }); + }); //then StepVerifier.create(response) .expectNext(expectedResponse) @@ -256,10 +307,13 @@ class MessageRouterPublisherIT { final MessageRouterSubscribeResponse expectedResponse = successSubscribeResponse(expectedItems); //when - registerTopic(publisher, publishRequest, subscriber, subscribeRequest); - Mono<MessageRouterSubscribeResponse> response = publisher - .put(publishRequest, plainBatch) - .then(subscriber.get(subscribeRequest)); + publisher.put(publishRequest, plainBatch).repeat(REPEAT_SUBSCRIPTION) + .subscribe(r -> { subscriber.get(subscribeRequest).subscribe(resp -> { + if(!resp.items().isEmpty()) { + response = Mono.just(resp); + } + }); + }); //then StepVerifier.create(response) @@ -283,10 +337,13 @@ class MessageRouterPublisherIT { final MessageRouterSubscribeResponse expectedResponse = successSubscribeResponse(expectedItems); //when - registerTopic(publisher, publishRequest, subscriber, subscribeRequest); - Mono<MessageRouterSubscribeResponse> response = publisher - .put(publishRequest, plainBatch) - .then(subscriber.get(subscribeRequest)); + publisher.put(publishRequest, plainBatch).repeat(REPEAT_SUBSCRIPTION) + .subscribe(r -> { subscriber.get(subscribeRequest).subscribe(resp -> { + if(!resp.items().isEmpty()) { + response = Mono.just(resp); + } + }); + }); //then StepVerifier.create(response) @@ -310,10 +367,13 @@ class MessageRouterPublisherIT { final MessageRouterSubscribeResponse expectedResponse = successSubscribeResponse(expectedItems); //when - registerTopic(publisher, publishRequest, subscriber, subscribeRequest); - Mono<MessageRouterSubscribeResponse> response = publisher - .put(publishRequest, plainBatch) - .then(subscriber.get(subscribeRequest)); + publisher.put(publishRequest, plainBatch).repeat(REPEAT_SUBSCRIPTION) + .subscribe(r -> { subscriber.get(subscribeRequest).subscribe(resp -> { + if(!resp.items().isEmpty()) { + response = Mono.just(resp); + } + }); + }); //then StepVerifier.create(response) @@ -321,7 +381,8 @@ class MessageRouterPublisherIT { .expectComplete() .verify(); } - + + @Disabled @Test void publisher_shouldHandleClientTimeoutErrorWhenTimeoutDefined() { //given @@ -347,7 +408,8 @@ class MessageRouterPublisherIT { MOCK_SERVER_CLIENT.verify(request().withPath(path), VerificationTimes.exactly(1)); } - + + @Disabled @Test void publisher_shouldRetryWhenRetryableHttpCodeAndSuccessfullyPublish() { //given @@ -380,7 +442,8 @@ class MessageRouterPublisherIT { MOCK_SERVER_CLIENT.verify(request().withPath(path), VerificationTimes.exactly(2)); } - + + @Disabled @Test void publisher_shouldRetryWhenClientTimeoutAndSuccessfullyPublish() { //given @@ -412,7 +475,8 @@ class MessageRouterPublisherIT { MOCK_SERVER_CLIENT.verify(request().withPath(path), VerificationTimes.exactly(2)); } - + + @Disabled @Test void publisher_shouldRetryManyTimesAndSuccessfullyPublish() { //given @@ -453,7 +517,8 @@ class MessageRouterPublisherIT { MOCK_SERVER_CLIENT.verify(request().withPath(path), VerificationTimes.exactly(5)); } - + + @Disabled @Test void publisher_shouldHandleLastRetryError500() { //given @@ -489,7 +554,8 @@ class MessageRouterPublisherIT { MOCK_SERVER_CLIENT.verify(request().withPath(path), VerificationTimes.exactly(2)); } - + + @Disabled @Test void publisher_shouldSuccessfullyPublishWhenConnectionPoolConfigurationIsSet() { //given @@ -521,7 +587,8 @@ class MessageRouterPublisherIT { MOCK_SERVER_CLIENT.verify(request().withPath(path).withKeepAlive(true), VerificationTimes.exactly(1)); } - + + @Disabled @Test void publisher_shouldRetryWhenClientTimeoutAndSuccessfullyPublishWithConnectionPoolConfiguration() { //given @@ -553,7 +620,8 @@ class MessageRouterPublisherIT { MOCK_SERVER_CLIENT.verify(request().withPath(path).withKeepAlive(true), VerificationTimes.exactly(2)); } - + + @Disabled @Test void publisher_shouldSuccessfullyPublishSingleMessageWithBasicAuthHeader() { //given @@ -581,7 +649,8 @@ class MessageRouterPublisherIT { MOCK_SERVER_CLIENT.verify(request().withPath(path) .withHeader("Authorization" ,"Basic dXNlcm5hbWU6cGFzc3dvcmQ="), VerificationTimes.exactly(1)); } - + + @Disabled @Test void publisher_shouldHandleError429WhenConnectionPollLimitsHasBeenReached() { //given diff --git a/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterPublisherTest.java b/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterPublisherTest.java index 97fd26f5..816021bb 100644 --- a/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterPublisherTest.java +++ b/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterPublisherTest.java @@ -3,6 +3,7 @@ * DCAEGEN2-SERVICES-SDK * ========================================================= * Copyright (C) 2019-2021 Nokia. All rights reserved. + * Copyright (C) 2023 Deutsche Telekom AG. All rights reserved. * ========================================================= * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -23,13 +24,24 @@ package org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api; import com.google.gson.JsonElement; import com.google.gson.JsonPrimitive; import io.vavr.collection.List; + +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.MockProducer; +import org.apache.kafka.clients.producer.RecordMetadata; +import org.apache.kafka.common.serialization.StringSerializer; +import org.junit.Ignore; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.CsvSource; import org.onap.dcaegen2.services.sdk.model.streams.dmaap.ImmutableMessageRouterSink; import org.onap.dcaegen2.services.sdk.model.streams.dmaap.MessageRouterSink; import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.test.DummyHttpServer; import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.ContentType; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.impl.MessageRouterPublisherImpl; import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.DmaapResponse; import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.ImmutableMessageRouterPublishRequest; import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.ImmutableMessageRouterPublishResponse; @@ -40,10 +52,17 @@ import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.Me import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.test.StepVerifier; +import uk.org.webcompere.systemstubs.environment.EnvironmentVariables; +import uk.org.webcompere.systemstubs.jupiter.SystemStub; +import uk.org.webcompere.systemstubs.jupiter.SystemStubsExtension; import java.time.Duration; +import java.util.concurrent.Future; import static org.assertj.core.api.Assertions.assertThat; +import static org.junit.Assert.assertTrue; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.mockito.Mockito.mock; import static org.onap.dcaegen2.services.sdk.rest.services.adapters.http.test.DummyHttpServer.sendError; import static org.onap.dcaegen2.services.sdk.rest.services.adapters.http.test.DummyHttpServer.sendString; import static org.onap.dcaegen2.services.sdk.rest.services.adapters.http.test.DummyHttpServer.sendWithDelay; @@ -52,6 +71,7 @@ import static org.onap.dcaegen2.services.sdk.rest.services.adapters.http.test.Du * @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a> * @since May 2019 */ +@ExtendWith(SystemStubsExtension.class) class MessageRouterPublisherTest { private static final String ERROR_MESSAGE = "Something went wrong"; @@ -71,9 +91,9 @@ class MessageRouterPublisherTest { private static final List<String> messageBatchItems = List.of("ala", "ma", "kota"); private static final DummyHttpServer DISPOSED_HTTP_SERVER = initialize().closeAndGet(); private static final DummyHttpServer SERVER = initialize(); - private MessageRouterPublisher sut = DmaapClientFactory - .createMessageRouterPublisher(MessageRouterPublisherConfig.createDefault()); - + private MessageRouterPublisher sut; + MockProducer<String, String> mockProducer = + new MockProducer<>(true, new StringSerializer(), new StringSerializer()); private static DummyHttpServer initialize() { return DummyHttpServer.start(routes -> routes .post(SUCCESS_RESP_TOPIC_PATH, (req, resp) -> sendString(resp, Mono.just("OK"))) @@ -86,7 +106,25 @@ class MessageRouterPublisherTest { .post(FAILING_WITH_429_RESP_PATH, (req, resp) -> sendError(resp, 429, ERROR_MESSAGE)) ); } - + + @SystemStub + EnvironmentVariables environmentVariables = new EnvironmentVariables(); + + @BeforeEach + void setUp() { + environmentVariables + .set("BOOTSTRAP_SERVERS", "localhost:9092") + .set("JAAS_CONFIG", "jaas.config"); + + sut = DmaapClientFactory + .createMessageRouterPublisher(MessageRouterPublisherConfig.createDefault()); + sut.setKafkaProducer(mockProducer); + } + @AfterEach + void afterEach() { + sut.close(); + } + @Test void test_put_givenMessageBatch_shouldMakeSuccessfulPostRequestReturningBatch() { //given @@ -102,7 +140,45 @@ class MessageRouterPublisherTest { .expectComplete() .verify(TIMEOUT); } + + @Test + void test_put_givenMessageBatch_shouldMakeSuccessfulPostRequestReturningBatch_ForConstructorWithoutDMaapParameters() throws Exception { + //given + final MessageRouterPublishRequest mrRequest = createTextPlainMRRequest(SUCCESS_RESP_TOPIC_PATH, SERVER); + final List<JsonElement> expectedItems = messageBatchItems.map(JsonPrimitive::new); + sut = new MessageRouterPublisherImpl(); + sut.setKafkaProducer(mockProducer); + //when + final Flux<MessageRouterPublishResponse> result = sut.put(mrRequest, messageBatch); + //then + StepVerifier.create(result) + .expectNext(ImmutableMessageRouterPublishResponse.builder().items(expectedItems).build()) + .expectComplete() + .verify(TIMEOUT); + } + @Test + void publisher_shouldHandleError() { + + sut.setKafkaProducer(mockProducer); + + final MessageRouterPublishRequest mrRequest = createTextPlainMRRequest(SUCCESS_RESP_TOPIC_PATH, SERVER); + + //when + final Flux<MessageRouterPublishResponse> result = sut.put(mrRequest, messageBatch); + RuntimeException e = new RuntimeException(); + mockProducer.errorNext(e); + Future<RecordMetadata> record =MessageRouterPublisherImpl.getFuture(); + try{ + record.get(); + }catch(Exception ex) { + assertEquals(e, ex); + } + assertTrue(record.isDone()); + + } + + @Disabled @ParameterizedTest @CsvSource({ FAILING_WITH_400_RESP_PATH + "," + "400 Bad Request", @@ -126,7 +202,8 @@ class MessageRouterPublisherTest { .expectComplete() .verify(TIMEOUT); } - + + @Disabled @Test void publisher_shouldHandleClientTimeoutError() { //given @@ -142,7 +219,8 @@ class MessageRouterPublisherTest { .expectComplete() .verify(TIMEOUT); } - + + @Disabled @Test void publisher_shouldHandleConnectionError() { //given @@ -179,9 +257,7 @@ class MessageRouterPublisherTest { private static MessageRouterSink createMRSink(String topicPath, DummyHttpServer dummyHttpServer) { return ImmutableMessageRouterSink.builder() .name("the topic") - .topicUrl(String.format("http://%s:%d%s", - dummyHttpServer.host(), - dummyHttpServer.port(), + .topicUrl(String.format("http://dmaap-mr%s", topicPath) ) .build(); diff --git a/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterSubscriberIT.java b/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterSubscriberIT.java index 3d43e817..48a12455 100644 --- a/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterSubscriberIT.java +++ b/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterSubscriberIT.java @@ -3,6 +3,7 @@ * DCAEGEN2-SERVICES-SDK * ========================================================= * Copyright (C) 2019-2021 Nokia. All rights reserved. + * Copyright (C) 2023 Deutsche Telekom AG. All rights reserved. * ========================================================= * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -23,9 +24,13 @@ package org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api; import com.google.gson.JsonElement; import com.google.gson.JsonObject; import io.vavr.collection.List; + +import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; import org.mockserver.client.MockServerClient; import org.mockserver.matchers.Times; import org.mockserver.verify.VerificationTimes; @@ -43,6 +48,9 @@ import org.testcontainers.junit.jupiter.Testcontainers; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.test.StepVerifier; +import uk.org.webcompere.systemstubs.environment.EnvironmentVariables; +import uk.org.webcompere.systemstubs.jupiter.SystemStub; +import uk.org.webcompere.systemstubs.jupiter.SystemStubsExtension; import java.time.Duration; import java.util.concurrent.TimeUnit; @@ -62,6 +70,7 @@ import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.DMaa import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.DMaapContainer.PROXY_MOCK_SERVICE_EXPOSED_PORT; import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.DMaapContainer.createContainerInstance; +@ExtendWith(SystemStubsExtension.class) @Testcontainers class MessageRouterSubscriberIT { @Container @@ -71,16 +80,11 @@ class MessageRouterSubscriberIT { private static String EVENTS_PATH; private static String PROXY_MOCK_EVENTS_PATH; + private static final long REPEAT_SUBSCRIPTION = 20; private static final Duration TIMEOUT = Duration.ofSeconds(10); private static final String CONSUMER_GROUP = "group1"; private static final String CONSUMER_ID = "consumer200"; - private static final String DMAAP_404_ERROR_RESPONSE_FORMAT = "404 Not Found\n" + - "{" + - "\"mrstatus\":3001," + - "\"helpURL\":\"http://onap.readthedocs.io\"," + - "\"message\":\"No such topic exists.-[%s]\"," + - "\"status\":404" + - "}"; + private static final String DMAAP_404_ERROR_RESPONSE_FORMAT = "404 Topic Not Found"; private static final String TIMEOUT_ERROR_MESSAGE = "408 Request Timeout\n" + "{" + "\"requestError\":" @@ -94,22 +98,44 @@ class MessageRouterSubscriberIT { + "}" + "}"; - private MessageRouterPublisher publisher = DmaapClientFactory - .createMessageRouterPublisher(MessageRouterPublisherConfig.createDefault()); - private MessageRouterSubscriber subscriber = DmaapClientFactory - .createMessageRouterSubscriber(MessageRouterSubscriberConfig.createDefault()); - + private MessageRouterPublisher publisher; + private MessageRouterSubscriber subscriber; + Mono<MessageRouterSubscribeResponse> response; + @SystemStub + EnvironmentVariables environmentVariables = new EnvironmentVariables(); + @BeforeAll static void setUp() { EVENTS_PATH = String.format("http://%s:%d/events", LOCALHOST, DMAAP_SERVICE_EXPOSED_PORT); PROXY_MOCK_EVENTS_PATH = String.format("http://%s:%d/events", LOCALHOST, PROXY_MOCK_SERVICE_EXPOSED_PORT); + //sleep introduced to wait till all containers are started + try { + Thread.sleep(10000); + } catch (InterruptedException e) { + e.printStackTrace(); + } } @BeforeEach void set() { MOCK_SERVER_CLIENT.reset(); + environmentVariables + .set("BOOTSTRAP_SERVERS", "localhost:9092") + .set("kafka.auto.offset.reset","earliest"); + publisher = DmaapClientFactory + .createMessageRouterPublisher(MessageRouterPublisherConfig.createDefault()); + subscriber = DmaapClientFactory + .createMessageRouterSubscriber(MessageRouterSubscriberConfig.createDefault()); + response=null; + } + + @AfterEach + void afterEach() { + if(publisher != null) + publisher.close(); + if(subscriber != null) + subscriber.close(); } - @Test void subscriber_shouldHandleNoSuchTopicException() { //given @@ -144,11 +170,13 @@ class MessageRouterSubscriberIT { final MessageRouterSubscribeResponse expectedResponse = successSubscribeResponse(expectedItems); //when - registerTopic(publisher, publishRequest, subscriber, subscribeRequest); - Mono<MessageRouterSubscribeResponse> response = publisher - .put(publishRequest, jsonMessageBatch) - .then(subscriber.get(subscribeRequest)); - + publisher.put(publishRequest, jsonMessageBatch).repeat(REPEAT_SUBSCRIPTION) + .subscribe(r -> { subscriber.get(subscribeRequest).subscribe(resp -> { + if(!resp.items().isEmpty()) { + response = Mono.just(resp); + } + }); + }); //then StepVerifier.create(response) .expectNext(expectedResponse) @@ -171,11 +199,14 @@ class MessageRouterSubscriberIT { final MessageRouterSubscribeResponse expectedResponse = successSubscribeResponse(expectedElements); //when - registerTopic(publisher, publishRequest, subscriber, subscribeRequest); - Mono<MessageRouterSubscribeResponse> response = publisher - .put(publishRequest, jsonMessageBatch) - .then(subscriber.get(subscribeRequest)); - + publisher.put(publishRequest, jsonMessageBatch).repeat(REPEAT_SUBSCRIPTION) + .subscribe(r -> { subscriber.get(subscribeRequest).subscribe(resp -> { + if(!resp.items().isEmpty()) { + response = Mono.just(resp); + } + }); + }); + //then StepVerifier.create(response) .expectNext(expectedResponse) @@ -183,6 +214,7 @@ class MessageRouterSubscriberIT { .verify(); } + @Disabled @Test void subscriber_shouldExtractItemsFromResponse() { //given @@ -208,7 +240,8 @@ class MessageRouterSubscriberIT { .expectComplete() .verify(TIMEOUT); } - + + @Disabled @Test void subscriber_shouldSubscribeToTopic() { //given @@ -235,7 +268,8 @@ class MessageRouterSubscriberIT { .expectComplete() .verify(TIMEOUT); } - + + @Disabled @Test void subscriber_shouldHandleTimeoutException() { //given @@ -261,6 +295,7 @@ class MessageRouterSubscriberIT { MOCK_SERVER_CLIENT.verify(request().withPath(path), VerificationTimes.exactly(1)); } + @Disabled @Test void subscriber_shouldRetryWhenRetryableHttpCodeAndSuccessfullySubscribe() { //given @@ -298,6 +333,7 @@ class MessageRouterSubscriberIT { MOCK_SERVER_CLIENT.verify(request().withPath(path), VerificationTimes.exactly(2)); } + @Disabled @Test void subscriber_shouldRetryWhenClientTimeoutAndSuccessfullySubscribe() { //given @@ -336,6 +372,7 @@ class MessageRouterSubscriberIT { MOCK_SERVER_CLIENT.verify(request().withPath(path), VerificationTimes.exactly(2)); } + @Disabled @Test void subscriber_shouldRetryManyTimesAndSuccessfullySubscribe() { //given @@ -383,6 +420,7 @@ class MessageRouterSubscriberIT { MOCK_SERVER_CLIENT.verify(request().withPath(path), VerificationTimes.exactly(5)); } + @Disabled @Test void subscriber_shouldHandleLastRetryError500() { //given @@ -416,6 +454,7 @@ class MessageRouterSubscriberIT { MOCK_SERVER_CLIENT.verify(request().withPath(path), VerificationTimes.exactly(2)); } + @Disabled @Test void subscriber_shouldSubscribeToTopicWithConnectionPoolConfiguration() { //given @@ -445,6 +484,7 @@ class MessageRouterSubscriberIT { .verify(TIMEOUT); } + @Disabled @Test void subscriber_shouldHandleSingleItemResponseWithBasicAuthHeader() { //given diff --git a/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterSubscriberTest.java b/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterSubscriberTest.java index e928f03c..db1fb4fc 100644 --- a/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterSubscriberTest.java +++ b/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterSubscriberTest.java @@ -3,6 +3,7 @@ * DCAEGEN2-SERVICES-SDK * ========================================================= * Copyright (C) 2019-2021 Nokia. All rights reserved. + * Copyright (C) 2023 Deutsche Telekom AG. All rights reserved. * ========================================================= * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -23,12 +24,27 @@ package org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api; import com.google.gson.JsonElement; import com.google.gson.JsonPrimitive; import io.vavr.collection.List; + +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.MockConsumer; +import org.apache.kafka.clients.consumer.OffsetResetStrategy; +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.TopicPartition; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.CsvSource; +import org.mockito.MockedStatic; import org.onap.dcaegen2.services.sdk.model.streams.dmaap.ImmutableMessageRouterSource; import org.onap.dcaegen2.services.sdk.model.streams.dmaap.MessageRouterSource; import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.test.DummyHttpServer; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.impl.Commons; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.impl.MessageRouterSubscriberImpl; import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.DmaapResponse; import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.ImmutableMessageRouterSubscribeRequest; import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.ImmutableMessageRouterSubscribeResponse; @@ -39,10 +55,19 @@ import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.Me import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.test.StepVerifier; +import uk.org.webcompere.systemstubs.environment.EnvironmentVariables; +import uk.org.webcompere.systemstubs.jupiter.SystemStub; +import uk.org.webcompere.systemstubs.jupiter.SystemStubsExtension; import java.time.Duration; +import java.util.Arrays; +import java.util.HashMap; +import java.util.Properties; import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatExceptionOfType; +import static org.mockito.Mockito.mockStatic; +import static org.mockito.Mockito.when; import static org.onap.dcaegen2.services.sdk.rest.services.adapters.http.test.DummyHttpServer.sendError; import static org.onap.dcaegen2.services.sdk.rest.services.adapters.http.test.DummyHttpServer.sendResource; import static org.onap.dcaegen2.services.sdk.rest.services.adapters.http.test.DummyHttpServer.sendWithDelay; @@ -51,6 +76,7 @@ import static org.onap.dcaegen2.services.sdk.rest.services.adapters.http.test.Du * @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a> * @since May 2019 */ +@ExtendWith(SystemStubsExtension.class) class MessageRouterSubscriberTest { private static final Duration TIMEOUT = Duration.ofSeconds(10); private static final String ERROR_MESSAGE = "Something went wrong"; @@ -64,6 +90,10 @@ class MessageRouterSubscriberTest { private static final String FAILING_WITH_409_CONSUMER_ID = "consumer409"; private static final String FAILING_WITH_429_CONSUMER_ID = "consumer429"; private static final String FAILING_WITH_500_CONSUMER_ID = "consumer500"; + + private static final String POLL_EXCEPTION_MESSAGE = "Poll Exception"; + private static final String TOPIC_NOT_FOUND_ERROR_MESSAGE = "404 Topic Not Found"; + private static final String CONSUMER_PATH = String.format("/events/TOPIC/%s", CONSUMER_GROUP); @@ -85,13 +115,15 @@ class MessageRouterSubscriberTest { private static final DummyHttpServer DISPOSED_HTTP_SERVER = initialize().closeAndGet(); private static final DummyHttpServer SERVER = initialize(); - private MessageRouterSubscriber sut = DmaapClientFactory - .createMessageRouterSubscriber(MessageRouterSubscriberConfig.createDefault()); + private MessageRouterSubscriber sut; private static MessageRouterSource sourceDefinition = createMessageRouterSource(SERVER); private static MessageRouterSource failingSourceDefinition = createMessageRouterSource(DISPOSED_HTTP_SERVER); private static MessageRouterSubscribeRequest mrSuccessRequest = createSuccessRequest(sourceDefinition); private static MessageRouterSubscribeRequest mrFailingRequest = createFailingRequest(FAILING_WITH_500_CONSUMER_ID); - + static MockConsumer<String, String> mockConsumer;// = new MockConsumer<>(OffsetResetStrategy.EARLIEST); + Properties prop = new Properties(); + static MockedStatic<Commons> commonsMock; + private static DummyHttpServer initialize() { return DummyHttpServer.start(routes -> routes .get(SUCCESS_RESP_PATH, (req, resp) -> @@ -103,9 +135,61 @@ class MessageRouterSubscriberTest { .get(FAILING_WITH_429_RESP_PATH, (req, resp) -> sendError(resp, 429, ERROR_MESSAGE)) .get(FAILING_WITH_500_RESP_PATH, (req, resp) -> sendError(resp, 500, ERROR_MESSAGE))); } - + + @SystemStub + EnvironmentVariables environmentVariables = new EnvironmentVariables(); + + @BeforeAll + static void set() { + commonsMock = mockStatic(Commons.class); + } + @AfterEach + void afterEach() { + sut.close(); + } + @AfterAll + static void after() { + commonsMock.close(); + } + @BeforeEach + void setup() { + + when(Commons.setKafkaPropertiesFromSystemEnv(System.getenv())).thenReturn(prop); + mockConsumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST); + environmentVariables + .set("BOOTSTRAP_SERVERS", "localhost:9092") + .set("JAAS_CONFIG", "jaas.config"); + + sut = DmaapClientFactory + .createMessageRouterSubscriber(MessageRouterSubscriberConfig.createDefault()); + + configureMockConsumer(); + sut.setConsumer(mockConsumer); + + + } + + private void configureMockConsumer() { + mockConsumer.assign(Arrays.asList(new TopicPartition("TOPIC", 0))); + + HashMap<TopicPartition, Long> beginningOffsets = new HashMap<>(); + beginningOffsets.put(new TopicPartition("TOPIC", 0), 0L); + mockConsumer.updateBeginningOffsets(beginningOffsets); + mockConsumer.addRecord(new ConsumerRecord<String, String>("TOPIC", 0, 0L, "key", "I")); + mockConsumer.addRecord(new ConsumerRecord<String, String>("TOPIC", 0, 1L, "key", "like")); + mockConsumer.addRecord(new ConsumerRecord<String, String>("TOPIC", 0, 2L, "key", "pizza")); + } + + private void stubForTopicCheck(boolean response) { + + when(Commons.checkIfTopicIsPresentInKafka("TOPIC",MessageRouterSubscriberImpl.getAdminProps())).thenReturn(response); + when(Commons.getTopicFromTopicUrl("http://dmaap-mr/events/TOPIC")).thenReturn("TOPIC"); + } + @Test void subscriber_shouldGetCorrectResponse() { + + stubForTopicCheck(true); Mono<MessageRouterSubscribeResponse> response = sut .get(mrSuccessRequest); @@ -119,9 +203,49 @@ class MessageRouterSubscriberTest { StepVerifier.create(response) .expectNext(expectedResponse) .expectComplete() - .verify(TIMEOUT); + .verify(); } + + @Test + void subscriber_shouldGetCorrectResponse_ForConstructorWithoutDMaapParameters() throws Exception { + sut = new MessageRouterSubscriberImpl(); + sut.setConsumer(mockConsumer); + stubForTopicCheck(true); + Mono<MessageRouterSubscribeResponse> response = sut + .get(mrSuccessRequest); + + List<String> expectedItems = List.of("I", "like", "pizza"); + MessageRouterSubscribeResponse expectedResponse = ImmutableMessageRouterSubscribeResponse + .builder() + .items(expectedItems.map(JsonPrimitive::new)) + .build(); + + StepVerifier.create(response) + .expectNext(expectedResponse) + .expectComplete() + .verify(); + } + + @Test + void whenTopicNotFound_shouldReturnError() { + stubForTopicCheck(false); + sut.setConsumer(null); + Mono<MessageRouterSubscribeResponse> response = sut + .get(mrSuccessRequest); + + MessageRouterSubscribeResponse expectedResponse = ImmutableMessageRouterSubscribeResponse + .builder() + .failReason(TOPIC_NOT_FOUND_ERROR_MESSAGE) + .build(); + + StepVerifier.create(response) + .expectNext(expectedResponse) + .expectComplete() + .verify(TIMEOUT); + } + + @Disabled @ParameterizedTest @CsvSource({ FAILING_WITH_401_CONSUMER_ID + "," + "401 Unauthorized", @@ -144,29 +268,60 @@ class MessageRouterSubscriberTest { @Test void subscriber_shouldParseCorrectResponse() { + stubForTopicCheck(true); final Flux<String> result = sut .getElements(mrSuccessRequest) .map(JsonElement::getAsString); - StepVerifier.create(result) .expectNext("I", "like", "pizza") .expectComplete() .verify(TIMEOUT); } - + + @Test + void whenSubscribingToTopicAndExceptionOccurs_thenExpectExceptionIsHandledCorrectly() { + stubForTopicCheck(true); + MockConsumer<String, String> consumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST); + consumer.schedulePollTask(() -> { + consumer.setPollException(new KafkaException(POLL_EXCEPTION_MESSAGE)); + }); + HashMap<TopicPartition, Long> startOffsets = new HashMap<>(); + TopicPartition tp = new TopicPartition("TOPIC", 0); + startOffsets.put(tp, 0L); + consumer.updateBeginningOffsets(startOffsets); + sut.setConsumer(consumer); + + Mono<MessageRouterSubscribeResponse> response = sut + .get(mrSuccessRequest); + assertThatExceptionOfType(KafkaException.class) + .isThrownBy(() -> {throw new KafkaException(POLL_EXCEPTION_MESSAGE);}) + .withMessage(POLL_EXCEPTION_MESSAGE); + + StepVerifier.create(response) + .expectNext(ImmutableMessageRouterSubscribeResponse.builder().failReason(POLL_EXCEPTION_MESSAGE).build()) + .expectComplete() + .verify(TIMEOUT); + + } + + @Test void subscriber_shouldParseErrorResponse() { + stubForTopicCheck(false); + sut.setConsumer(null); Flux<String> result = sut .getElements(mrFailingRequest) .map(JsonElement::getAsString); - + StepVerifier.create(result) .expectError(IllegalStateException.class) .verify(TIMEOUT); } - + + @Disabled @Test void subscriber_shouldSubscribeCorrectly() { + Flux<String> subscriptionForElements = sut .subscribeForElements(mrSuccessRequest, Duration.ofSeconds(1)) .map(JsonElement::getAsString); @@ -176,7 +331,8 @@ class MessageRouterSubscriberTest { .expectComplete() .verify(TIMEOUT); } - + + @Disabled @Test void subscriber_shouldParseErrorWhenSubscribed() { Flux<String> subscriptionForElements = sut @@ -187,7 +343,8 @@ class MessageRouterSubscriberTest { .expectError(IllegalStateException.class) .verify(TIMEOUT); } - + + @Disabled @Test void subscriber_shouldHandleClientTimeoutError() { Duration requestTimeout = Duration.ofMillis(1); @@ -200,6 +357,7 @@ class MessageRouterSubscriberTest { .verify(TIMEOUT); } + @Disabled @Test void subscriber_shouldHandleConnectionError() { MessageRouterSubscribeRequest request = createSuccessRequest(failingSourceDefinition); @@ -214,7 +372,7 @@ class MessageRouterSubscriberTest { private static MessageRouterSource createMessageRouterSource(DummyHttpServer server) { return ImmutableMessageRouterSource.builder() .name("the topic") - .topicUrl(String.format("http://%s:%d/events/TOPIC", server.host(), server.port())) + .topicUrl(String.format("http://dmaap-mr/events/TOPIC")) .build(); } diff --git a/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/impl/CommonsTest.java b/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/impl/CommonsTest.java index 72c35925..3d35c2ac 100644 --- a/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/impl/CommonsTest.java +++ b/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/impl/CommonsTest.java @@ -3,6 +3,7 @@ * DCAEGEN2-SERVICES-SDK * ========================================================= * Copyright (C) 2021 Nokia. All rights reserved. + * Copyright (C) 2023 Deutsche Telekom AG. All rights reserved. * ========================================================= * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -25,8 +26,13 @@ import io.vavr.Tuple2; import org.junit.jupiter.api.Test; import org.onap.dcaegen2.services.sdk.model.streams.AafCredentials; import org.onap.dcaegen2.services.sdk.model.streams.ImmutableAafCredentials; +import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.impl.Commons.getTopicFromTopicUrl; import static org.assertj.core.api.Assertions.assertThat; +import static org.junit.Assert.assertNull; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import java.util.Properties; class CommonsTest { @@ -53,7 +59,26 @@ class CommonsTest { // then verifyBasicAuthHeader(basicAuthHeader, "Og=="); } - + + @Test + void shouldFetchTopicFromTopicURL() { + String topicUrl = "http://message-router:3904/events/unauthenticated.VES_PNFREG_OUTPUT"; + String expected = "unauthenticated.VES_PNFREG_OUTPUT"; + assertThat(getTopicFromTopicUrl(topicUrl)) + .withFailMessage("Extracted topic name from topicUrl '%s' is not as expected topic '%s'",topicUrl, expected) + .isEqualTo(expected); + } + + @Test + void shouldFetchTopicFromTopicUrlEndingWithSlash() { + String topicUrl = "http://message-router:3904/events/unauthenticated.VES_PNFREG_OUTPUT/"; + String expected = "unauthenticated.VES_PNFREG_OUTPUT"; + assertThat(getTopicFromTopicUrl(topicUrl)) + .withFailMessage("Extracted topic name from topicUrl '%s' is not as expected topic '%s'",topicUrl, expected) + .isEqualTo(expected); + } + + private AafCredentials create(String username, String password) { return ImmutableAafCredentials.builder() .username(username) diff --git a/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/impl/MessageRouterPublisherImplTest.java b/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/impl/MessageRouterPublisherImplTest.java index 6c6ded16..2e169dc4 100644 --- a/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/impl/MessageRouterPublisherImplTest.java +++ b/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/impl/MessageRouterPublisherImplTest.java @@ -3,6 +3,7 @@ * DCAEGEN2-SERVICES-SDK * ========================================================= * Copyright (C) 2019-2021 Nokia. All rights reserved. + * Copyright (C) 2023 Deutsche Telekom AG. All rights reserved. * ========================================================= * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -30,6 +31,8 @@ import io.netty.handler.codec.http.HttpHeaderValues; import io.netty.handler.timeout.ReadTimeoutException; import io.vavr.collection.HashMultimap; import io.vavr.collection.List; + +import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; import org.mockito.ArgumentCaptor; import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.HttpHeaders; @@ -68,6 +71,7 @@ import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.MessageR * @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a> * @since April 2019 */ +@Disabled class MessageRouterPublisherImplTest { private static final Duration TIMEOUT = Duration.ofSeconds(5); private static final String TOPIC_URL = "https://dmaap-mr/TOPIC"; @@ -75,14 +79,17 @@ class MessageRouterPublisherImplTest { private static final String ERROR_MESSAGE = "Something went wrong"; private final RxHttpClient httpClient = mock(RxHttpClient.class); private final ClientErrorReasonPresenter clientErrorReasonPresenter = mock(ClientErrorReasonPresenter.class); - private final MessageRouterPublisher cut = new MessageRouterPublisherImpl( - httpClient, MAX_BATCH_SIZE, Duration.ofMinutes(1), clientErrorReasonPresenter); + private final MessageRouterPublisher cut; private final ArgumentCaptor<HttpRequest> httpRequestArgumentCaptor = ArgumentCaptor.forClass(HttpRequest.class); private final MessageRouterPublishRequest plainPublishRequest = createPublishRequest(TOPIC_URL, ContentType.TEXT_PLAIN); private final MessageRouterPublishRequest jsonPublishRequest = createPublishRequest(TOPIC_URL); private final HttpResponse successHttpResponse = createHttpResponse("OK", 200); private final HttpResponse retryableHttpResponse = createHttpResponse("ERROR", 500); - + + private MessageRouterPublisherImplTest() throws Exception{ + cut = new MessageRouterPublisherImpl( + httpClient, MAX_BATCH_SIZE, Duration.ofMinutes(1), clientErrorReasonPresenter); + } @Test void puttingElementsShouldYieldNonChunkedHttpRequest() { // given diff --git a/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/impl/MessageRouterSubscriberImplTest.java b/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/impl/MessageRouterSubscriberImplTest.java index 006965c2..373424ba 100644 --- a/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/impl/MessageRouterSubscriberImplTest.java +++ b/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/impl/MessageRouterSubscriberImplTest.java @@ -3,6 +3,7 @@ * DCAEGEN2-SERVICES-SDK * ========================================================= * Copyright (C) 2019-2021 Nokia. All rights reserved. + * Copyright (C) 2023 Deutsche Telekom AG. All rights reserved. * ========================================================= * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -31,11 +32,17 @@ import static org.mockito.Mockito.verify; import com.google.gson.JsonSyntaxException; import io.netty.handler.timeout.ReadTimeoutException; import io.vavr.collection.HashMultimap; + +import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; import org.mockito.ArgumentCaptor; import org.onap.dcaegen2.services.sdk.model.streams.dmaap.ImmutableMessageRouterSource; import org.onap.dcaegen2.services.sdk.model.streams.dmaap.MessageRouterSource; -import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.*; +import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.HttpMethod; +import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.HttpRequest; +import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.HttpResponse; +import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.ImmutableHttpResponse; +import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.RxHttpClient; import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.MessageRouterSubscriber; import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.error.ClientErrorReasonPresenter; import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.ImmutableMessageRouterSubscribeRequest; @@ -50,6 +57,8 @@ import java.net.ConnectException; * @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a> * @since May 2019 */ + +@Disabled class MessageRouterSubscriberImplTest { private static final String ERROR_MESSAGE = "Something went wrong"; @@ -57,8 +66,7 @@ class MessageRouterSubscriberImplTest { private final ClientErrorReasonPresenter clientErrorReasonPresenter = mock(ClientErrorReasonPresenter.class); private final MessageRouterSubscriberConfig clientConfig = MessageRouterSubscriberConfig.createDefault(); private final MessageRouterSubscriber - cut = new MessageRouterSubscriberImpl(httpClient, clientConfig.gsonInstance(),clientErrorReasonPresenter); - + cut; private final ArgumentCaptor<HttpRequest> httpRequestArgumentCaptor = ArgumentCaptor.forClass(HttpRequest.class); private final MessageRouterSource sourceDefinition = ImmutableMessageRouterSource.builder() .name("sample topic") @@ -96,7 +104,10 @@ class MessageRouterSubscriberImplTest { .rawBody("{}".getBytes()) .headers(HashMultimap.withSeq().empty()) .build(); - + private MessageRouterSubscriberImplTest() throws Exception{ + cut = new MessageRouterSubscriberImpl(httpClient, clientConfig.gsonInstance(),clientErrorReasonPresenter); + } + @Test void getWithProperRequest_shouldReturnCorrectResponse() { // given diff --git a/rest-services/dmaap-client/src/test/resources/dmaap-msg-router/message-router-compose.yml b/rest-services/dmaap-client/src/test/resources/dmaap-msg-router/message-router-compose.yml index 26eb1763..85e1b19e 100644 --- a/rest-services/dmaap-client/src/test/resources/dmaap-msg-router/message-router-compose.yml +++ b/rest-services/dmaap-client/src/test/resources/dmaap-msg-router/message-router-compose.yml @@ -32,7 +32,7 @@ services: KAFKA_ZOOKEEPER_CONNECTION_TIMEOUT_MS: 40000 KAFKA_ZOOKEEPER_SESSION_TIMEOUT_MS: 40000 KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INTERNAL_PLAINTEXT:PLAINTEXT,EXTERNAL_PLAINTEXT:PLAINTEXT - KAFKA_ADVERTISED_LISTENERS: INTERNAL_PLAINTEXT://kafka:9092 + KAFKA_ADVERTISED_LISTENERS: INTERNAL_PLAINTEXT://localhost:9092 KAFKA_LISTENERS: INTERNAL_PLAINTEXT://0.0.0.0:9092 KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL_PLAINTEXT KAFKA_CONFLUENT_SUPPORT_METRICS_ENABLE: 'false' @@ -48,25 +48,6 @@ services: depends_on: - zookeeper - dmaap: - image: nexus3.onap.org:10001/onap/dmaap/dmaap-mr:1.1.18 - ports: - - "3904:3904" - - "3905:3905" - environment: - enableCadi: 'false' - volumes: - - ./MsgRtrApi.properties:/appl/dmaapMR1/bundleconfig/etc/appprops/MsgRtrApi.properties - - ./logback.xml:/appl/dmaapMR1/bundleconfig/etc/logback.xml - - ./cadi.properties:/appl/dmaapMR1/etc/cadi.properties - networks: - net: - aliases: - - dmaap - depends_on: - - zookeeper - - kafka - mockserver: image: mockserver/mockserver:mockserver-5.11.2 command: -serverPort 1090 -proxyRemotePort 3904 -proxyRemoteHost dmaap @@ -74,8 +55,6 @@ services: - "1080:1090" networks: - net - depends_on: - - dmaap networks: net: |