From 775b9f1e14a246b2df0a65bd63c0775120659f35 Mon Sep 17 00:00:00 2001 From: Sirisha_Manchikanti Date: Thu, 25 Aug 2022 15:00:58 +0100 Subject: Publish and Subscribe to Kafka topic Issue-ID: POLICY-4134 Signed-off-by: Sirisha_Manchikanti Change-Id: Idefa5b6f3cb702a4b478b76570717e73214d235a --- .../event/comm/bus/internal/BusPublisher.java | 59 +++++++++++++++++----- 1 file changed, 47 insertions(+), 12 deletions(-) (limited to 'policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusPublisher.java') diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusPublisher.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusPublisher.java index e0df7095..fe9bab23 100644 --- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusPublisher.java +++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusPublisher.java @@ -32,13 +32,14 @@ import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.Properties; -import java.util.Random; +import java.util.UUID; import java.util.concurrent.TimeUnit; import org.apache.commons.lang3.StringUtils; +import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; -import org.apache.kafka.common.record.CompressionType; import org.onap.dmaap.mr.client.impl.MRSimplerBatchPublisher; import org.onap.dmaap.mr.client.response.MRPublisherResponse; import org.onap.dmaap.mr.test.clients.ProtocolTypeConstants; @@ -155,22 +156,45 @@ public interface BusPublisher { public static class KafkaPublisherWrapper implements BusPublisher { private static Logger logger = LoggerFactory.getLogger(KafkaPublisherWrapper.class); + private static final String KEY_SERIALIZER = "org.apache.kafka.common.serialization.StringSerializer"; + + private String topic; /** - * The actual Kafka publisher. + * Kafka publisher. */ - private final KafkaProducer producer; + private Producer producer; + protected Properties kafkaProps; /** - * Constructor. + * Kafka Publisher Wrapper. * * @param busTopicParams topic parameters */ - public KafkaPublisherWrapper(BusTopicParams busTopicParams) { - // TODO Setting of topic parameters is not implemented yet. - //Setup Properties for Kafka Producer - Properties kafkaProps = new Properties(); - this.producer = new KafkaProducer(kafkaProps); + protected KafkaPublisherWrapper(BusTopicParams busTopicParams) { + + if (busTopicParams.isTopicInvalid()) { + throw new IllegalArgumentException("No topic for Kafka"); + } + + this.topic = busTopicParams.getTopic(); + + //Setup Properties for consumer + kafkaProps = new Properties(); + kafkaProps.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, busTopicParams.getServers().get(0)); + if (busTopicParams.isAdditionalPropsValid()) { + for (Map.Entry entry : busTopicParams.getAdditionalProps().entrySet()) { + kafkaProps.put(entry.getKey(), entry.getValue()); + } + } + if (kafkaProps.get(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG) == null) { + kafkaProps.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, KEY_SERIALIZER); + } + if (kafkaProps.get(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG) == null) { + kafkaProps.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KEY_SERIALIZER); + } + + producer = new KafkaProducer<>(kafkaProps); } @Override @@ -178,7 +202,18 @@ public interface BusPublisher { if (message == null) { throw new IllegalArgumentException("No message provided"); } - // TODO Sending messages is not implemented yet + + try { + //Create the record + ProducerRecord record = new ProducerRecord(topic, + UUID.randomUUID().toString(), message); + + this.producer.send(record); + producer.flush(); + } catch (Exception e) { + logger.warn("{}: SEND of {} cannot be performed because of {}", this, message, e.getMessage(), e); + return false; + } return true; } @@ -186,7 +221,7 @@ public interface BusPublisher { public void close() { logger.info("{}: CLOSE", this); - try (this.producer) { + try { this.producer.close(); } catch (Exception e) { logger.warn("{}: CLOSE FAILED because of {}", this, e.getMessage(), e); -- cgit 1.2.3-korg