From 775b9f1e14a246b2df0a65bd63c0775120659f35 Mon Sep 17 00:00:00 2001 From: Sirisha_Manchikanti Date: Thu, 25 Aug 2022 15:00:58 +0100 Subject: Publish and Subscribe to Kafka topic Issue-ID: POLICY-4134 Signed-off-by: Sirisha_Manchikanti Change-Id: Idefa5b6f3cb702a4b478b76570717e73214d235a --- .../comm/bus/IndexedKafkaTopicSourceFactory.java | 1 + .../event/comm/bus/internal/BusConsumer.java | 62 ++++++++++++++++++++-- .../event/comm/bus/internal/BusPublisher.java | 59 +++++++++++++++----- .../comm/bus/internal/InlineKafkaTopicSink.java | 5 ++ .../internal/SingleThreadedKafkaTopicSource.java | 24 +++++++-- .../properties/PolicyEndPointProperties.java | 1 - .../common/endpoints/utils/KafkaPropertyUtils.java | 6 ++- 7 files changed, 135 insertions(+), 23 deletions(-) (limited to 'policy-endpoints/src/main/java/org/onap') diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/IndexedKafkaTopicSourceFactory.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/IndexedKafkaTopicSourceFactory.java index 47279d47..45a8be3f 100644 --- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/IndexedKafkaTopicSourceFactory.java +++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/IndexedKafkaTopicSourceFactory.java @@ -65,6 +65,7 @@ class IndexedKafkaTopicSourceFactory implements KafkaTopicSourceFactory { } var kafkaTopicSource = makeSource(busTopicParams); + kafkaTopicSources.put(busTopicParams.getTopic(), kafkaTopicSource); return kafkaTopicSource; diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusConsumer.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusConsumer.java index 8d88b0d9..ee41150f 100644 --- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusConsumer.java +++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusConsumer.java @@ -32,6 +32,8 @@ import java.security.GeneralSecurityException; import java.time.Duration; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; +import java.util.List; import java.util.Map; import java.util.Properties; import java.util.concurrent.CountDownLatch; @@ -42,6 +44,8 @@ import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.consumer.OffsetAndMetadata; +import org.apache.kafka.common.TopicPartition; import org.onap.dmaap.mr.client.MRClientFactory; import org.onap.dmaap.mr.client.impl.MRConsumerImpl; import org.onap.dmaap.mr.client.impl.MRConsumerImpl.MRConsumerImplBuilder; @@ -235,10 +239,13 @@ public interface BusConsumer { */ private static Logger logger = LoggerFactory.getLogger(KafkaConsumerWrapper.class); + private static final String KEY_DESERIALIZER = "org.apache.kafka.common.serialization.StringDeserializer"; + /** * Kafka consumer. */ - private KafkaConsumer consumer; + protected KafkaConsumer consumer; + protected Properties kafkaProps; /** * Kafka Consumer Wrapper. @@ -250,20 +257,67 @@ public interface BusConsumer { * @throws GeneralSecurityException - Security exception * @throws MalformedURLException - Malformed URL exception */ - public KafkaConsumerWrapper(BusTopicParams busTopicParams) { + public KafkaConsumerWrapper(BusTopicParams busTopicParams) throws MalformedURLException { super(busTopicParams); + + if (busTopicParams.isTopicInvalid()) { + throw new IllegalArgumentException("No topic for Kafka"); + } + + //Setup Properties for consumer + kafkaProps = new Properties(); + kafkaProps.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, + busTopicParams.getServers().get(0)); + + if (busTopicParams.isAdditionalPropsValid()) { + for (Map.Entry entry : busTopicParams.getAdditionalProps().entrySet()) { + kafkaProps.put(entry.getKey(), entry.getValue()); + } + } + + if (kafkaProps.get(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG) == null) { + kafkaProps.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, KEY_DESERIALIZER); + } + if (kafkaProps.get(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG) == null) { + kafkaProps.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KEY_DESERIALIZER); + } + if (kafkaProps.get(ConsumerConfig.GROUP_ID_CONFIG) == null) { + kafkaProps.setProperty(ConsumerConfig.GROUP_ID_CONFIG, busTopicParams.getConsumerGroup()); + } + consumer = new KafkaConsumer<>(kafkaProps); + //Subscribe to the topic + consumer.subscribe(Arrays.asList(busTopicParams.getTopic())); } @Override public Iterable fetch() throws IOException { - // TODO: Not implemented yet - return new ArrayList<>(); + ConsumerRecords records = this.consumer.poll(Duration.ofMillis(fetchTimeout)); + if (records == null || records.count() <= 0) { + return Collections.emptyList(); + } + List messages = new ArrayList<>(records.count()); + try { + for (TopicPartition partition : records.partitions()) { + List> partitionRecords = records.records(partition); + for (ConsumerRecord record : partitionRecords) { + messages.add(record.value()); + } + long lastOffset = partitionRecords.get(partitionRecords.size() - 1).offset(); + consumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(lastOffset + 1))); + } + } catch (Exception e) { + logger.error("{}: cannot fetch because of {}", this, e.getMessage()); + sleepAfterFetchFailure(); + throw e; + } + return messages; } @Override public void close() { super.close(); this.consumer.close(); + logger.info("Kafka Consumer exited {}", this); } @Override diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusPublisher.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusPublisher.java index e0df7095..fe9bab23 100644 --- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusPublisher.java +++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusPublisher.java @@ -32,13 +32,14 @@ import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.Properties; -import java.util.Random; +import java.util.UUID; import java.util.concurrent.TimeUnit; import org.apache.commons.lang3.StringUtils; +import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; -import org.apache.kafka.common.record.CompressionType; import org.onap.dmaap.mr.client.impl.MRSimplerBatchPublisher; import org.onap.dmaap.mr.client.response.MRPublisherResponse; import org.onap.dmaap.mr.test.clients.ProtocolTypeConstants; @@ -155,22 +156,45 @@ public interface BusPublisher { public static class KafkaPublisherWrapper implements BusPublisher { private static Logger logger = LoggerFactory.getLogger(KafkaPublisherWrapper.class); + private static final String KEY_SERIALIZER = "org.apache.kafka.common.serialization.StringSerializer"; + + private String topic; /** - * The actual Kafka publisher. + * Kafka publisher. */ - private final KafkaProducer producer; + private Producer producer; + protected Properties kafkaProps; /** - * Constructor. + * Kafka Publisher Wrapper. * * @param busTopicParams topic parameters */ - public KafkaPublisherWrapper(BusTopicParams busTopicParams) { - // TODO Setting of topic parameters is not implemented yet. - //Setup Properties for Kafka Producer - Properties kafkaProps = new Properties(); - this.producer = new KafkaProducer(kafkaProps); + protected KafkaPublisherWrapper(BusTopicParams busTopicParams) { + + if (busTopicParams.isTopicInvalid()) { + throw new IllegalArgumentException("No topic for Kafka"); + } + + this.topic = busTopicParams.getTopic(); + + //Setup Properties for consumer + kafkaProps = new Properties(); + kafkaProps.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, busTopicParams.getServers().get(0)); + if (busTopicParams.isAdditionalPropsValid()) { + for (Map.Entry entry : busTopicParams.getAdditionalProps().entrySet()) { + kafkaProps.put(entry.getKey(), entry.getValue()); + } + } + if (kafkaProps.get(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG) == null) { + kafkaProps.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, KEY_SERIALIZER); + } + if (kafkaProps.get(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG) == null) { + kafkaProps.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KEY_SERIALIZER); + } + + producer = new KafkaProducer<>(kafkaProps); } @Override @@ -178,7 +202,18 @@ public interface BusPublisher { if (message == null) { throw new IllegalArgumentException("No message provided"); } - // TODO Sending messages is not implemented yet + + try { + //Create the record + ProducerRecord record = new ProducerRecord(topic, + UUID.randomUUID().toString(), message); + + this.producer.send(record); + producer.flush(); + } catch (Exception e) { + logger.warn("{}: SEND of {} cannot be performed because of {}", this, message, e.getMessage(), e); + return false; + } return true; } @@ -186,7 +221,7 @@ public interface BusPublisher { public void close() { logger.info("{}: CLOSE", this); - try (this.producer) { + try { this.producer.close(); } catch (Exception e) { logger.warn("{}: CLOSE FAILED because of {}", this, e.getMessage(), e); diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/InlineKafkaTopicSink.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/InlineKafkaTopicSink.java index b564229b..6574d408 100644 --- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/InlineKafkaTopicSink.java +++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/InlineKafkaTopicSink.java @@ -18,6 +18,7 @@ package org.onap.policy.common.endpoints.event.comm.bus.internal; +import java.util.Map; import org.onap.policy.common.endpoints.event.comm.Topic; import org.onap.policy.common.endpoints.event.comm.bus.KafkaTopicSink; import org.slf4j.Logger; @@ -34,6 +35,8 @@ public class InlineKafkaTopicSink extends InlineBusTopicSink implements KafkaTop */ private static Logger logger = LoggerFactory.getLogger(InlineKafkaTopicSink.class); + protected Map additionalProps = null; + /** * Argument-based KAFKA Topic Writer instantiation. BusTopicParams contains below mentioned * attributes. @@ -47,6 +50,7 @@ public class InlineKafkaTopicSink extends InlineBusTopicSink implements KafkaTop */ public InlineKafkaTopicSink(BusTopicParams busTopicParams) { super(busTopicParams); + this.additionalProps = busTopicParams.getAdditionalProps(); } /** @@ -59,6 +63,7 @@ public class InlineKafkaTopicSink extends InlineBusTopicSink implements KafkaTop .servers(this.servers) .topic(this.effectiveTopic) .useHttps(this.useHttps) + .additionalProps(this.additionalProps) .build()); logger.info("{}: KAFKA SINK created", this); } diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedKafkaTopicSource.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedKafkaTopicSource.java index b8362b83..2a651ee7 100644 --- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedKafkaTopicSource.java +++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedKafkaTopicSource.java @@ -18,6 +18,8 @@ package org.onap.policy.common.endpoints.event.comm.bus.internal; +import java.net.MalformedURLException; +import java.util.Map; import org.onap.policy.common.endpoints.event.comm.Topic; import org.onap.policy.common.endpoints.event.comm.bus.KafkaTopicSource; @@ -27,6 +29,8 @@ import org.onap.policy.common.endpoints.event.comm.bus.KafkaTopicSource; */ public class SingleThreadedKafkaTopicSource extends SingleThreadedBusTopicSource implements KafkaTopicSource { + protected Map additionalProps = null; + /** * Constructor. * @@ -35,19 +39,29 @@ public class SingleThreadedKafkaTopicSource extends SingleThreadedBusTopicSource */ public SingleThreadedKafkaTopicSource(BusTopicParams busTopicParams) { super(busTopicParams); - this.init(); + this.additionalProps = busTopicParams.getAdditionalProps(); + try { + this.init(); + } catch (Exception e) { + throw new IllegalArgumentException("ERROR during init in kafka-source: cannot create topic " + topic, e); + } } /** * Initialize the Cambria client. */ @Override - public void init() { - this.consumer = new BusConsumer.KafkaConsumerWrapper(BusTopicParams.builder() + public void init() throws MalformedURLException { + BusTopicParams.TopicParamsBuilder builder = BusTopicParams.builder() .servers(this.servers) .topic(this.effectiveTopic) - .useHttps(this.useHttps) - .build()); + .fetchTimeout(this.fetchTimeout) + .consumerGroup(this.consumerGroup) + .useHttps(this.useHttps); + + this.consumer = new BusConsumer.KafkaConsumerWrapper(builder + .additionalProps(this.additionalProps) + .build()); } @Override diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/properties/PolicyEndPointProperties.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/properties/PolicyEndPointProperties.java index 49dff287..46a6c398 100644 --- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/properties/PolicyEndPointProperties.java +++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/properties/PolicyEndPointProperties.java @@ -129,7 +129,6 @@ public final class PolicyEndPointProperties { public static final String DME2_SUBCONTEXT_PATH_PROPERTY = "SubContextPath"; public static final String DME2_SESSION_STICKINESS_REQUIRED_PROPERTY = "sessionstickinessrequired"; - /* Topic Sink Values */ /** diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/utils/KafkaPropertyUtils.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/utils/KafkaPropertyUtils.java index 3e62f98f..113a4bd1 100644 --- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/utils/KafkaPropertyUtils.java +++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/utils/KafkaPropertyUtils.java @@ -24,9 +24,13 @@ package org.onap.policy.common.endpoints.utils; import com.google.re2j.Pattern; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; import java.util.List; +import java.util.Map; import lombok.AccessLevel; import lombok.NoArgsConstructor; +import org.apache.commons.lang3.StringUtils; import org.onap.policy.common.endpoints.event.comm.bus.internal.BusTopicParams; import org.onap.policy.common.endpoints.event.comm.bus.internal.BusTopicParams.TopicParamsBuilder; import org.onap.policy.common.endpoints.properties.PolicyEndPointProperties; @@ -47,7 +51,7 @@ public class KafkaPropertyUtils { public static TopicParamsBuilder makeBuilder(PropertyUtils props, String topic, String servers) { final List serverList = new ArrayList<>(Arrays.asList(COMMA_SPACE_PAT.split(servers))); - //TODO More Kafka properties to be added + return BusTopicParams.builder() .servers(serverList) .topic(topic) -- cgit 1.2.3-korg