diff options
Diffstat (limited to 'policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusPublisher.java')
-rw-r--r-- | policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusPublisher.java | 58 |
1 files changed, 58 insertions, 0 deletions
diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusPublisher.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusPublisher.java index 8bf805bf..e0df7095 100644 --- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusPublisher.java +++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusPublisher.java @@ -5,6 +5,7 @@ * Copyright (C) 2017-2021 AT&T Intellectual Property. All rights reserved. * Modifications Copyright (C) 2018 Samsung Electronics Co., Ltd. * Modifications Copyright (C) 2020 Bell Canada. All rights reserved. + * Copyright (C) 2022 Nordix Foundation. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -31,8 +32,13 @@ import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.Properties; +import java.util.Random; import java.util.concurrent.TimeUnit; import org.apache.commons.lang3.StringUtils; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.record.CompressionType; import org.onap.dmaap.mr.client.impl.MRSimplerBatchPublisher; import org.onap.dmaap.mr.client.response.MRPublisherResponse; import org.onap.dmaap.mr.test.clients.ProtocolTypeConstants; @@ -144,6 +150,58 @@ public interface BusPublisher { } /** + * Kafka based library publisher. + */ + public static class KafkaPublisherWrapper implements BusPublisher { + + private static Logger logger = LoggerFactory.getLogger(KafkaPublisherWrapper.class); + + /** + * The actual Kafka publisher. + */ + private final KafkaProducer producer; + + /** + * Constructor. + * + * @param busTopicParams topic parameters + */ + public KafkaPublisherWrapper(BusTopicParams busTopicParams) { + // TODO Setting of topic parameters is not implemented yet. + //Setup Properties for Kafka Producer + Properties kafkaProps = new Properties(); + this.producer = new KafkaProducer(kafkaProps); + } + + @Override + public boolean send(String partitionId, String message) { + if (message == null) { + throw new IllegalArgumentException("No message provided"); + } + // TODO Sending messages is not implemented yet + return true; + } + + @Override + public void close() { + logger.info("{}: CLOSE", this); + + try (this.producer) { + this.producer.close(); + } catch (Exception e) { + logger.warn("{}: CLOSE FAILED because of {}", this, e.getMessage(), e); + } + } + + + @Override + public String toString() { + return "KafkaPublisherWrapper []"; + } + + } + + /** * DmaapClient library wrapper. */ public abstract class DmaapPublisherWrapper implements BusPublisher { |