diff options
author | 2023-08-11 19:45:44 +0530 | |
---|---|---|
committer | 2023-09-11 12:09:33 +0000 | |
commit | 86513b7ca5b8cc8ba93bf23176aeac57656b7c66 (patch) | |
tree | 0b1a0499dbccbb937c8eca7b2cef075ad63134df /rest-services/dmaap-client/src/main | |
parent | 9d8a9326758a162eb26236a1dd9de1c29c504554 (diff) |
[DCAEGEN2] Use kafka API directly in DMaaP library
Use kafka API directly in dmaap-client library instead of the DMaaP Rest APIs.
Issue-ID: DCAEGEN2-3364
Change-Id: I7f27d9d5f443fe3934896fa01f907b6001898495
Signed-off-by: sushant53 <sushant.jadhav@t-systems.com>
Diffstat (limited to 'rest-services/dmaap-client/src/main')
6 files changed, 417 insertions, 27 deletions
diff --git a/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/DmaapClientFactory.java b/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/DmaapClientFactory.java index ee4f6d38..6e7f6049 100644 --- a/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/DmaapClientFactory.java +++ b/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/DmaapClientFactory.java @@ -3,6 +3,7 @@ * DCAEGEN2-SERVICES-SDK * ========================================================= * Copyright (C) 2019-2021 Nokia. All rights reserved. + * Copyright (C) 2023 Deutsche Telekom AG. All rights reserved. * ========================================================= * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -35,6 +36,8 @@ import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.impl.MessageRou import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.DmaapClientConfiguration; import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.MessageRouterPublisherConfig; import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.MessageRouterSubscriberConfig; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.time.Duration; @@ -46,26 +49,37 @@ import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.co * @since 1.1.4 */ public final class DmaapClientFactory { - + private static final Logger LOGGER = LoggerFactory.getLogger(DmaapClientFactory.class); private DmaapClientFactory() { } public static @NotNull MessageRouterPublisher createMessageRouterPublisher( @NotNull MessageRouterPublisherConfig clientConfiguration) { - return new MessageRouterPublisherImpl( + try { + return new MessageRouterPublisherImpl( createHttpClient(clientConfiguration), clientConfiguration.maxBatchSize(), clientConfiguration.maxBatchDuration(), new ClientErrorReasonPresenter()); + } catch (Exception e) { + LOGGER.error("Error while creating the Message Router Publisher."); + return null; + } } public static @NotNull MessageRouterSubscriber createMessageRouterSubscriber( @NotNull MessageRouterSubscriberConfig clientConfiguration) { - return new MessageRouterSubscriberImpl( - createHttpClient(clientConfiguration), - clientConfiguration.gsonInstance(), - new ClientErrorReasonPresenter()); + try { + return new MessageRouterSubscriberImpl( + createHttpClient(clientConfiguration), + clientConfiguration.gsonInstance(), + new ClientErrorReasonPresenter()); + } catch (Exception e) { + LOGGER.error("Error while creating the Message Router Subscriber."); + return null; + } + } private static @NotNull RxHttpClient createHttpClient(DmaapClientConfiguration config) { diff --git a/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterPublisher.java b/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterPublisher.java index 08825b4c..f98e8198 100644 --- a/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterPublisher.java +++ b/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterPublisher.java @@ -3,6 +3,7 @@ * DCAEGEN2-SERVICES-SDK * ========================================================= * Copyright (C) 2019 Nokia. All rights reserved. + * Copyright (C) 2023 Deutsche Telekom AG. All rights reserved. * ========================================================= * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -21,6 +22,13 @@ package org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api; import com.google.gson.JsonElement; + +import io.vavr.collection.List; + +import java.util.Properties; + +import org.apache.kafka.clients.producer.Producer; +import org.onap.dcaegen2.services.sdk.model.streams.dmaap.KafkaSink; import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishRequest; import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishResponse; import reactor.core.publisher.Flux; @@ -30,5 +38,7 @@ import reactor.core.publisher.Flux; * @since 1.1.4 */ public interface MessageRouterPublisher { + void close(); + void setKafkaProducer(Producer<String, String> kafkaProducer); Flux<MessageRouterPublishResponse> put(MessageRouterPublishRequest request, Flux<? extends JsonElement> items); } diff --git a/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterSubscriber.java b/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterSubscriber.java index d91535d3..fbf90d9e 100644 --- a/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterSubscriber.java +++ b/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterSubscriber.java @@ -3,6 +3,7 @@ * DCAEGEN2-SERVICES-SDK * ========================================================= * Copyright (C) 2019 Nokia. All rights reserved. + * Copyright (C) 2023 Deutsche Telekom AG. All rights reserved. * ========================================================= * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -21,7 +22,14 @@ package org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api; import com.google.gson.JsonElement; + +import io.vavr.collection.List; + import java.time.Duration; + +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.onap.dcaegen2.services.sdk.model.streams.dmaap.KafkaSource; import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterSubscribeRequest; import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterSubscribeResponse; import reactor.core.publisher.Flux; @@ -33,6 +41,8 @@ import reactor.core.publisher.Mono; */ public interface MessageRouterSubscriber { + void setConsumer(Consumer<String, String> consumer); + void close(); Mono<MessageRouterSubscribeResponse> get(MessageRouterSubscribeRequest request); default Flux<JsonElement> getElements(MessageRouterSubscribeRequest request) { @@ -49,4 +59,5 @@ public interface MessageRouterSubscriber { default Flux<JsonElement> subscribeForElements(MessageRouterSubscribeRequest request, Duration period) { return Flux.interval(period).concatMap(i->getElements(request)); } + } diff --git a/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/impl/Commons.java b/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/impl/Commons.java index 9f534d8f..4ea80e48 100644 --- a/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/impl/Commons.java +++ b/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/impl/Commons.java @@ -3,6 +3,7 @@ * DCAEGEN2-SERVICES-SDK * ========================================================= * Copyright (C) 2019-2021 Nokia. All rights reserved. + * Copyright (C) 2023 Deutsche Telekom AG. All rights reserved. * ========================================================= * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -23,19 +24,43 @@ package org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.impl; import io.vavr.Tuple; import io.vavr.Tuple2; import io.vavr.control.Option; + +import org.apache.kafka.clients.admin.AdminClient; import org.onap.dcaegen2.services.sdk.model.streams.AafCredentials; import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.HttpResponse; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.nio.charset.StandardCharsets; import java.util.Base64; +import java.util.HashMap; +import java.util.Map; import java.util.Objects; +import java.util.Properties; +import java.util.concurrent.ExecutionException; /** * @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a> * @since April 2019 */ -final class Commons { +public final class Commons { + static String commonInURL = "/events/"; + static String KAFKA_PROPS_PREFIX = "kafka."; + + private static final Logger LOGGER = LoggerFactory.getLogger(Commons.class); + private static AdminClient kafkaAdminClient; + private static Map<String,Object> map = new HashMap<>(); + static { + map.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); + map.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); + map.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); + map.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); + map.put("max.poll.interval.ms", 300000); + map.put("heartbeat.interval.ms", 60000); + map.put("session.timeout.ms", 240000); + map.put("max.poll.records", 1000); + } private Commons() { } @@ -67,4 +92,69 @@ final class Commons { .map(s -> s.getBytes(StandardCharsets.UTF_8)) .getOrElse(new byte[0]); } + /** + * Extracts the topic name from the topicUrl. + * + * <p>Condition for extracting topic name : Substring after '/events/' in the topicUrl</p> + * + * @param topicUrl + * @return topic + */ + public static String getTopicFromTopicUrl(String topicUrl) { + if(topicUrl.endsWith("/")) { + return topicUrl.substring(topicUrl.indexOf(commonInURL)+commonInURL.length(), topicUrl.lastIndexOf("/")); + } + return topicUrl.substring(topicUrl.indexOf(commonInURL)+commonInURL.length()); + } + + public static Properties setKafkaPropertiesFromSystemEnv(Map<String, String> envs) { + Map<String, Object> propMap= getKafkaPropertiesMap(envs); + Properties props = new Properties(); + propMap.forEach((k ,v) -> props.put(k, v)); + map.forEach((k,v) -> { + if(!propMap.containsKey(k)) { + props.put(k, v); + } + }); + + return props; + } + + static Map<String, Object> getKafkaPropertiesMap(Map<String, String> envs){ + Map<String, Object> propMap = new HashMap<>(); + envs.forEach((k ,v) -> { + if(k.startsWith(KAFKA_PROPS_PREFIX)){ + String key = k.substring(KAFKA_PROPS_PREFIX.length()); + propMap.put(key, v); + } + }); + return propMap; + } + + public static void closeKafkaAdminClient() { + if(kafkaAdminClient != null) { + LOGGER.info("Closing the Kafka AdminClient."); + kafkaAdminClient.close(); + kafkaAdminClient=null; + } + } + + public static boolean checkIfTopicIsPresentInKafka(String topic, Properties adminProps) { + if(kafkaAdminClient == null) { + kafkaAdminClient = AdminClient.create(adminProps); + } + try { + for (String name : kafkaAdminClient.listTopics().names().get()) { + if (name.equals(topic)) { + LOGGER.debug("TOPIC_NAME: {} is equal to : {}", name, topic); + return true; + } + } + } catch (InterruptedException | ExecutionException e) { + LOGGER.error("GetTopicFromKafka: Failed to retrieve topic list from kafka.", e); + return false; + } + return false; + } + } diff --git a/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/impl/MessageRouterPublisherImpl.java b/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/impl/MessageRouterPublisherImpl.java index 534fca6b..329c2a36 100644 --- a/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/impl/MessageRouterPublisherImpl.java +++ b/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/impl/MessageRouterPublisherImpl.java @@ -3,6 +3,7 @@ * DCAEGEN2-SERVICES-SDK * ========================================================= * Copyright (C) 2019-2021 Nokia. All rights reserved. + * Copyright (C) 2023 Deutsche Telekom AG. All rights reserved. * ========================================================= * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -22,12 +23,28 @@ package org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.impl; import com.google.gson.JsonArray; import com.google.gson.JsonElement; +import com.google.gson.JsonObject; + import io.netty.handler.timeout.ReadTimeoutException; import io.vavr.collection.HashMap; import io.vavr.collection.List; import io.vavr.collection.Map; +import io.vavr.collection.Stream; import io.vavr.control.Option; + +import org.apache.kafka.clients.CommonClientConfigs; +import org.apache.kafka.clients.producer.Callback; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.Producer; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.clients.producer.RecordMetadata; +import org.apache.kafka.common.config.SaslConfigs; +import org.apache.kafka.common.errors.TimeoutException; +import org.apache.kafka.common.security.auth.SecurityProtocol; +import org.apache.kafka.common.security.scram.internals.ScramMechanism; import org.jetbrains.annotations.NotNull; +import org.onap.dcaegen2.services.sdk.model.streams.dmaap.ImmutableKafkaSink; +import org.onap.dcaegen2.services.sdk.model.streams.dmaap.KafkaSink; import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.HttpHeaders; import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.HttpMethod; import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.HttpRequest; @@ -54,9 +71,14 @@ import reactor.netty.internal.shaded.reactor.pool.PoolAcquirePendingLimitExcepti import java.net.ConnectException; import java.time.Duration; +import java.util.ArrayList; +import java.util.Properties; +import java.util.concurrent.Future; import java.util.stream.Collectors; import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.impl.Commons.extractFailReason; +import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.impl.Commons.getTopicFromTopicUrl; +import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.impl.Commons.setKafkaPropertiesFromSystemEnv; /** * @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a> @@ -67,24 +89,136 @@ public class MessageRouterPublisherImpl implements MessageRouterPublisher { private final int maxBatchSize; private final Duration maxBatchDuration; private final ClientErrorReasonPresenter clientErrorReasonPresenter; - - private static final Logger LOGGER = LoggerFactory.getLogger(MessageRouterPublisherImpl.class); - - public MessageRouterPublisherImpl(RxHttpClient httpClient, int maxBatchSize, Duration maxBatchDuration, ClientErrorReasonPresenter clientErrorReasonPresenter) { + + private static Properties props; + private static final String kafkaBootstrapServers = "BOOTSTRAP_SERVERS"; + private static final Logger LOGGER = LoggerFactory.getLogger(MessageRouterPublisherImpl.class); + private static Producer<String, String> kafkaProducer; + public static Future<RecordMetadata> future; + static boolean flag; + static Exception exception; + public MessageRouterPublisherImpl(RxHttpClient httpClient, int maxBatchSize, Duration maxBatchDuration, ClientErrorReasonPresenter clientErrorReasonPresenter) throws Exception { this.httpClient = httpClient; this.maxBatchSize = maxBatchSize; this.maxBatchDuration = maxBatchDuration; this.clientErrorReasonPresenter = clientErrorReasonPresenter; + setProperties(); } - + + /** + * New constructor that does not take DMaaP parameters as arguments. + * + * @throws Exception + */ + public MessageRouterPublisherImpl() throws Exception { + this.httpClient = null; + this.maxBatchSize = 0; + this.maxBatchDuration = null; + this.clientErrorReasonPresenter = null; + setProperties(); + } + +// @Override +// public Flux<MessageRouterPublishResponse> put( +// MessageRouterPublishRequest request, +// Flux<? extends JsonElement> items) { +// return items.bufferTimeout(maxBatchSize, maxBatchDuration) +// .flatMap(subItems -> subItems.isEmpty() ? Mono.empty() : pushBatchToMr(request, List.ofAll(subItems))); +// } + + @Override public Flux<MessageRouterPublishResponse> put( MessageRouterPublishRequest request, Flux<? extends JsonElement> items) { - return items.bufferTimeout(maxBatchSize, maxBatchDuration) - .flatMap(subItems -> subItems.isEmpty() ? Mono.empty() : pushBatchToMr(request, List.ofAll(subItems))); + flag = true; + exception=null; + future = null; + List<String> batch = getBatch(items); + String topic = getTopicFromTopicUrl(request.sinkDefinition().topicUrl()); + LOGGER.info("Topic extracted from URL {} is : {} ",request.sinkDefinition().topicUrl(),topic); + LOGGER.info("Sending a batch of {} items for topic {} to kafka", batch.size(),topic); + LOGGER.trace("The items to be sent: {}", batch); + if(kafkaProducer == null) { + kafkaProducer = new KafkaProducer<>(props); + } + Flux<MessageRouterPublishResponse> response; + try { + + for (String msg : batch) { + ProducerRecord<String, String> data = + new ProducerRecord<>(topic, msg); + future = kafkaProducer.send(data,new Callback() { + + @Override + public void onCompletion(RecordMetadata metadata, Exception e) { + + if(e != null) { + flag=false; + exception = e; + } + } + }); + } + if(flag) { + LOGGER.info("Sent a batch of {} items for topic {} to kafka", batch.size(),topic); + response = Flux.just(ImmutableMessageRouterPublishResponse.builder().items(List.ofAll(items.collectList().block())).build()); + }else { + throw exception; + } + }catch(Exception e) { + LOGGER.error("Error while publishing the messages : {}",e.getStackTrace()); + response = Flux.just(ImmutableMessageRouterPublishResponse.builder() + .failReason(e.getMessage()) + .build()); + } + return response; } - + + @Override + public void close() { + LOGGER.info("Closing the Kafka Producer"); + if(kafkaProducer != null) { + kafkaProducer.close(); + kafkaProducer=null; + } + } + + @Override + public void setKafkaProducer(Producer<String, String> kafkaProducer) { + this.kafkaProducer = kafkaProducer; + } + + public static Future<RecordMetadata> getFuture(){ + return future; + } + + void setProperties() throws Exception { + props = setKafkaPropertiesFromSystemEnv(System.getenv()); + + if(System.getenv(kafkaBootstrapServers) == null) { + LOGGER.error("Environment Variable "+ kafkaBootstrapServers+" is missing"); + throw new Exception("Environment Variable "+ kafkaBootstrapServers+" is missing"); + }else { + props.put("bootstrap.servers", System.getenv(kafkaBootstrapServers)); + } + if(System.getenv("JAAS_CONFIG") == null) { + LOGGER.info("Not using any authentication for kafka interaction"); + }else { + LOGGER.info("Using {} authentication provided for kafka interaction",ScramMechanism.SCRAM_SHA_512.mechanismName()); + props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG,SecurityProtocol.SASL_PLAINTEXT.name); + props.put(SaslConfigs.SASL_MECHANISM, ScramMechanism.SCRAM_SHA_512.mechanismName()); + props.put(SaslConfigs.SASL_JAAS_CONFIG, System.getenv("JAAS_CONFIG")); + } + } + + static List<String> getBatch(Flux<? extends JsonElement> items){ + java.util.List<String> list = new ArrayList<>(); + items.map(msg -> msg.toString()).collectList().subscribe(data -> list.addAll(data)); + return List.ofAll(list); + + } + private Publisher<? extends MessageRouterPublishResponse> pushBatchToMr( MessageRouterPublishRequest request, List<JsonElement> batch) { @@ -153,4 +287,5 @@ public class MessageRouterPublisherImpl implements MessageRouterPublisher { .getOrElse(HashMap.empty()); return headers.put(HttpHeaders.CONTENT_TYPE, request.contentType().toString()); } + } diff --git a/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/impl/MessageRouterSubscriberImpl.java b/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/impl/MessageRouterSubscriberImpl.java index d98e8d3a..c90a8064 100644 --- a/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/impl/MessageRouterSubscriberImpl.java +++ b/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/impl/MessageRouterSubscriberImpl.java @@ -3,6 +3,7 @@ * DCAEGEN2-SERVICES-SDK * ========================================================= * Copyright (C) 2019-2021 Nokia. All rights reserved. + * Copyright (C) 2023 Deutsche Telekom AG. All rights reserved. * ========================================================= * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -29,6 +30,16 @@ import io.vavr.collection.HashMap; import io.vavr.collection.List; import io.vavr.collection.Map; import io.vavr.control.Option; + +import org.apache.kafka.clients.CommonClientConfigs; +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.common.config.SaslConfigs; +import org.apache.kafka.common.security.auth.SecurityProtocol; +import org.apache.kafka.common.security.scram.internals.ScramMechanism; import org.jetbrains.annotations.NotNull; import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.HttpMethod; import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.HttpRequest; @@ -40,19 +51,28 @@ import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.MessageRout import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.error.ClientErrorReason; import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.error.ClientErrorReasonPresenter; import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.error.ClientErrorReasons; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.ImmutableMessageRouterPublishResponse; import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.ImmutableMessageRouterSubscribeResponse; import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterSubscribeRequest; import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterSubscribeResponse; import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.DmaapTimeoutConfig; import org.slf4j.Logger; import org.slf4j.LoggerFactory; + +import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import java.net.ConnectException; import java.nio.charset.StandardCharsets; import java.time.Duration; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Properties; import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.impl.Commons.extractFailReason; +import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.impl.Commons.getTopicFromTopicUrl; +import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.impl.Commons.checkIfTopicIsPresentInKafka; +import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.impl.Commons.setKafkaPropertiesFromSystemEnv; /** * @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a> @@ -63,27 +83,134 @@ public class MessageRouterSubscriberImpl implements MessageRouterSubscriber { private final Gson gson; private final ClientErrorReasonPresenter clientErrorReasonPresenter; private static final Logger LOGGER = LoggerFactory.getLogger(MessageRouterSubscriberImpl.class); - + private static Properties props; + private static final String kafkaBootstrapServers = "BOOTSTRAP_SERVERS"; + private static Consumer<String, String> consumer; + public MessageRouterSubscriberImpl(RxHttpClient httpClient, Gson gson, - ClientErrorReasonPresenter clientErrorReasonPresenter) { + ClientErrorReasonPresenter clientErrorReasonPresenter) throws Exception { this.httpClient = httpClient; this.gson = gson; this.clientErrorReasonPresenter = clientErrorReasonPresenter; + setProperties(); + } + + /** + * New constructor that does not take DMaaP parameters as arguments. + * + * @throws Exception + */ + public MessageRouterSubscriberImpl() throws Exception { + this.httpClient = null; + this.gson = null; + this.clientErrorReasonPresenter = null; + setProperties(); } @Override public Mono<MessageRouterSubscribeResponse> get(MessageRouterSubscribeRequest request) { - LOGGER.debug("Requesting new items from DMaaP MR: {}", request); - return httpClient.call(buildGetHttpRequest(request)) - .map(this::buildGetResponse) - .doOnError(ReadTimeoutException.class, - e -> LOGGER.error("Timeout exception occurred when subscribe items from DMaaP MR", e)) - .onErrorResume(ReadTimeoutException.class, e -> buildErrorResponse(ClientErrorReasons.TIMEOUT)) - .doOnError(ConnectException.class, e -> LOGGER.error("DMaaP MR is unavailable, {}", e.getMessage())) - .onErrorResume(ConnectException.class, e -> buildErrorResponse(ClientErrorReasons.SERVICE_UNAVAILABLE)) - .onErrorResume(RetryableException.class, e -> Mono.just(buildGetResponse(e.getResponse()))); + LOGGER.info("Requesting new items from DMaaP MR: {}", request); + String topic = getTopicFromTopicUrl(request.sourceDefinition().topicUrl()); + + String fakeGroupName = request.consumerGroup(); + props.put("client.id", request.consumerId()); + props.put("group.id", fakeGroupName); + + try{ + if (consumer == null) { + if(!checkIfTopicIsPresentInKafka(topic,getAdminProps())) { + LOGGER.error("No such topic exists, TOPIC_NAME : {}", topic); + return Mono.just(ImmutableMessageRouterSubscribeResponse.builder() + .failReason("404 Topic Not Found") + .build()); + } + consumer = getKafkaConsumer(props); + consumer.subscribe(Arrays.asList(topic)); + } + ArrayList<String> msgs = new ArrayList<>(); + + ConsumerRecords<String, String> records = null; + synchronized (consumer) { + records = consumer.poll(Duration.ofMillis(500)); + } + for (ConsumerRecord<String, String> rec : records) { + msgs.add(rec.value()); + } + List<JsonElement> list = List.ofAll(msgs).map(r -> JsonParser.parseString(r)); + return Mono.just(ImmutableMessageRouterSubscribeResponse.builder() + .items(list) + .build()); + } catch(Exception e) { + LOGGER.error("Error while consuming the messages : {}",e.getMessage()); + return Mono.just(ImmutableMessageRouterSubscribeResponse.builder() + .failReason(e.getMessage()) + .build()); + } } - + + @Override + public void setConsumer(Consumer<String, String> consumer) { + MessageRouterSubscriberImpl.consumer = consumer; + } + + public static KafkaConsumer<String, String> getKafkaConsumer(Properties props){ + return new KafkaConsumer<>(props); + } + + @Override + public void close(){ + if(consumer != null) { + LOGGER.info("Closing the Kafka Consumer"); + consumer.close(); + consumer = null; + } + Commons.closeKafkaAdminClient(); + } + + void setProperties() throws Exception { + props = setKafkaPropertiesFromSystemEnv(System.getenv()); + + if(System.getenv(kafkaBootstrapServers) == null) { + LOGGER.error("Environment Variable "+ kafkaBootstrapServers+" is missing"); + throw new Exception("Environment Variable "+ kafkaBootstrapServers+" is missing"); + }else { + props.put("bootstrap.servers", System.getenv(kafkaBootstrapServers)); + } + props.put(ConsumerConfig.ALLOW_AUTO_CREATE_TOPICS_CONFIG,false); + + if(System.getenv("JAAS_CONFIG") == null) { + LOGGER.info("Not using any authentication for kafka interaction"); + }else { + LOGGER.info("Using {} authentication provided for kafka interaction",ScramMechanism.SCRAM_SHA_512.mechanismName()); + props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG,SecurityProtocol.SASL_PLAINTEXT.name); + props.put(SaslConfigs.SASL_MECHANISM, ScramMechanism.SCRAM_SHA_512.mechanismName()); + props.put(SaslConfigs.SASL_JAAS_CONFIG, System.getenv("JAAS_CONFIG")); + } + } + +// @Override +// public Mono<MessageRouterSubscribeResponse> get(MessageRouterSubscribeRequest request) { +// LOGGER.debug("Requesting new items from DMaaP MR: {}", request); +// return httpClient.call(buildGetHttpRequest(request)) +// .map(this::buildGetResponse) +// .doOnError(ReadTimeoutException.class, +// e -> LOGGER.error("Timeout exception occurred when subscribe items from DMaaP MR", e)) +// .onErrorResume(ReadTimeoutException.class, e -> buildErrorResponse(ClientErrorReasons.TIMEOUT)) +// .doOnError(ConnectException.class, e -> LOGGER.error("DMaaP MR is unavailable, {}", e.getMessage())) +// .onErrorResume(ConnectException.class, e -> buildErrorResponse(ClientErrorReasons.SERVICE_UNAVAILABLE)) +// .onErrorResume(RetryableException.class, e -> Mono.just(buildGetResponse(e.getResponse()))); +// } + public static Properties getAdminProps() { + Properties adminProps = new Properties(); + adminProps.put("bootstrap.servers", System.getenv(kafkaBootstrapServers)); + if(System.getenv("JAAS_CONFIG") != null) { + adminProps.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG,SecurityProtocol.SASL_PLAINTEXT.name); + adminProps.put(SaslConfigs.SASL_MECHANISM, ScramMechanism.SCRAM_SHA_512.mechanismName()); + adminProps.put(SaslConfigs.SASL_JAAS_CONFIG, System.getenv("JAAS_CONFIG")); + } + return adminProps; + } + private @NotNull HttpRequest buildGetHttpRequest(MessageRouterSubscribeRequest request) { return ImmutableHttpRequest.builder() .method(HttpMethod.GET) @@ -132,4 +259,7 @@ public class MessageRouterSubscriberImpl implements MessageRouterSubscriber { .map(HashMap::of) .getOrElse(HashMap.empty()); } + + + } |