diff options
author | Sirisha_Manchikanti <sirisha.manchikanti@est.tech> | 2022-08-25 15:00:58 +0100 |
---|---|---|
committer | Sirisha_Manchikanti <sirisha.manchikanti@est.tech> | 2022-09-21 10:54:56 +0100 |
commit | 775b9f1e14a246b2df0a65bd63c0775120659f35 (patch) | |
tree | 7f768cbbf9859fe420f6b4e923105425837af85b /policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusPublisher.java | |
parent | b433dd58dc50b5c59f84ea18908b5e1a0f25f78a (diff) |
Publish and Subscribe to Kafka topic
Issue-ID: POLICY-4134
Signed-off-by: Sirisha_Manchikanti <sirisha.manchikanti@est.tech>
Change-Id: Idefa5b6f3cb702a4b478b76570717e73214d235a
Diffstat (limited to 'policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusPublisher.java')
-rw-r--r-- | policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusPublisher.java | 59 |
1 files changed, 47 insertions, 12 deletions
diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusPublisher.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusPublisher.java index e0df7095..fe9bab23 100644 --- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusPublisher.java +++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusPublisher.java @@ -32,13 +32,14 @@ import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.Properties; -import java.util.Random; +import java.util.UUID; import java.util.concurrent.TimeUnit; import org.apache.commons.lang3.StringUtils; +import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; -import org.apache.kafka.common.record.CompressionType; import org.onap.dmaap.mr.client.impl.MRSimplerBatchPublisher; import org.onap.dmaap.mr.client.response.MRPublisherResponse; import org.onap.dmaap.mr.test.clients.ProtocolTypeConstants; @@ -155,22 +156,45 @@ public interface BusPublisher { public static class KafkaPublisherWrapper implements BusPublisher { private static Logger logger = LoggerFactory.getLogger(KafkaPublisherWrapper.class); + private static final String KEY_SERIALIZER = "org.apache.kafka.common.serialization.StringSerializer"; + + private String topic; /** - * The actual Kafka publisher. + * Kafka publisher. */ - private final KafkaProducer producer; + private Producer<String, String> producer; + protected Properties kafkaProps; /** - * Constructor. + * Kafka Publisher Wrapper. * * @param busTopicParams topic parameters */ - public KafkaPublisherWrapper(BusTopicParams busTopicParams) { - // TODO Setting of topic parameters is not implemented yet. - //Setup Properties for Kafka Producer - Properties kafkaProps = new Properties(); - this.producer = new KafkaProducer(kafkaProps); + protected KafkaPublisherWrapper(BusTopicParams busTopicParams) { + + if (busTopicParams.isTopicInvalid()) { + throw new IllegalArgumentException("No topic for Kafka"); + } + + this.topic = busTopicParams.getTopic(); + + //Setup Properties for consumer + kafkaProps = new Properties(); + kafkaProps.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, busTopicParams.getServers().get(0)); + if (busTopicParams.isAdditionalPropsValid()) { + for (Map.Entry<String, String> entry : busTopicParams.getAdditionalProps().entrySet()) { + kafkaProps.put(entry.getKey(), entry.getValue()); + } + } + if (kafkaProps.get(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG) == null) { + kafkaProps.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, KEY_SERIALIZER); + } + if (kafkaProps.get(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG) == null) { + kafkaProps.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KEY_SERIALIZER); + } + + producer = new KafkaProducer<>(kafkaProps); } @Override @@ -178,7 +202,18 @@ public interface BusPublisher { if (message == null) { throw new IllegalArgumentException("No message provided"); } - // TODO Sending messages is not implemented yet + + try { + //Create the record + ProducerRecord<String, String> record = new ProducerRecord<String, String>(topic, + UUID.randomUUID().toString(), message); + + this.producer.send(record); + producer.flush(); + } catch (Exception e) { + logger.warn("{}: SEND of {} cannot be performed because of {}", this, message, e.getMessage(), e); + return false; + } return true; } @@ -186,7 +221,7 @@ public interface BusPublisher { public void close() { logger.info("{}: CLOSE", this); - try (this.producer) { + try { this.producer.close(); } catch (Exception e) { logger.warn("{}: CLOSE FAILED because of {}", this, e.getMessage(), e); |