aboutsummaryrefslogtreecommitdiffstats
path: root/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusPublisher.java
diff options
context:
space:
mode:
authorSirisha_Manchikanti <sirisha.manchikanti@est.tech>2022-08-25 15:00:58 +0100
committerSirisha_Manchikanti <sirisha.manchikanti@est.tech>2022-09-21 10:54:56 +0100
commit775b9f1e14a246b2df0a65bd63c0775120659f35 (patch)
tree7f768cbbf9859fe420f6b4e923105425837af85b /policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusPublisher.java
parentb433dd58dc50b5c59f84ea18908b5e1a0f25f78a (diff)
Publish and Subscribe to Kafka topic
Issue-ID: POLICY-4134 Signed-off-by: Sirisha_Manchikanti <sirisha.manchikanti@est.tech> Change-Id: Idefa5b6f3cb702a4b478b76570717e73214d235a
Diffstat (limited to 'policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusPublisher.java')
-rw-r--r--policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusPublisher.java59
1 files changed, 47 insertions, 12 deletions
diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusPublisher.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusPublisher.java
index e0df7095..fe9bab23 100644
--- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusPublisher.java
+++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusPublisher.java
@@ -32,13 +32,14 @@ import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Properties;
-import java.util.Random;
+import java.util.UUID;
import java.util.concurrent.TimeUnit;
import org.apache.commons.lang3.StringUtils;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
-import org.apache.kafka.common.record.CompressionType;
import org.onap.dmaap.mr.client.impl.MRSimplerBatchPublisher;
import org.onap.dmaap.mr.client.response.MRPublisherResponse;
import org.onap.dmaap.mr.test.clients.ProtocolTypeConstants;
@@ -155,22 +156,45 @@ public interface BusPublisher {
public static class KafkaPublisherWrapper implements BusPublisher {
private static Logger logger = LoggerFactory.getLogger(KafkaPublisherWrapper.class);
+ private static final String KEY_SERIALIZER = "org.apache.kafka.common.serialization.StringSerializer";
+
+ private String topic;
/**
- * The actual Kafka publisher.
+ * Kafka publisher.
*/
- private final KafkaProducer producer;
+ private Producer<String, String> producer;
+ protected Properties kafkaProps;
/**
- * Constructor.
+ * Kafka Publisher Wrapper.
*
* @param busTopicParams topic parameters
*/
- public KafkaPublisherWrapper(BusTopicParams busTopicParams) {
- // TODO Setting of topic parameters is not implemented yet.
- //Setup Properties for Kafka Producer
- Properties kafkaProps = new Properties();
- this.producer = new KafkaProducer(kafkaProps);
+ protected KafkaPublisherWrapper(BusTopicParams busTopicParams) {
+
+ if (busTopicParams.isTopicInvalid()) {
+ throw new IllegalArgumentException("No topic for Kafka");
+ }
+
+ this.topic = busTopicParams.getTopic();
+
+ //Setup Properties for consumer
+ kafkaProps = new Properties();
+ kafkaProps.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, busTopicParams.getServers().get(0));
+ if (busTopicParams.isAdditionalPropsValid()) {
+ for (Map.Entry<String, String> entry : busTopicParams.getAdditionalProps().entrySet()) {
+ kafkaProps.put(entry.getKey(), entry.getValue());
+ }
+ }
+ if (kafkaProps.get(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG) == null) {
+ kafkaProps.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, KEY_SERIALIZER);
+ }
+ if (kafkaProps.get(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG) == null) {
+ kafkaProps.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KEY_SERIALIZER);
+ }
+
+ producer = new KafkaProducer<>(kafkaProps);
}
@Override
@@ -178,7 +202,18 @@ public interface BusPublisher {
if (message == null) {
throw new IllegalArgumentException("No message provided");
}
- // TODO Sending messages is not implemented yet
+
+ try {
+ //Create the record
+ ProducerRecord<String, String> record = new ProducerRecord<String, String>(topic,
+ UUID.randomUUID().toString(), message);
+
+ this.producer.send(record);
+ producer.flush();
+ } catch (Exception e) {
+ logger.warn("{}: SEND of {} cannot be performed because of {}", this, message, e.getMessage(), e);
+ return false;
+ }
return true;
}
@@ -186,7 +221,7 @@ public interface BusPublisher {
public void close() {
logger.info("{}: CLOSE", this);
- try (this.producer) {
+ try {
this.producer.close();
} catch (Exception e) {
logger.warn("{}: CLOSE FAILED because of {}", this, e.getMessage(), e);