diff options
Diffstat (limited to 'policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusConsumer.java')
-rw-r--r-- | policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusConsumer.java | 54 |
1 files changed, 54 insertions, 0 deletions
diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusConsumer.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusConsumer.java index 20f4c91c..8d88b0d9 100644 --- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusConsumer.java +++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusConsumer.java @@ -5,6 +5,7 @@ * Copyright (C) 2017-2021 AT&T Intellectual Property. All rights reserved. * Modifications Copyright (C) 2018 Samsung Electronics Co., Ltd. * Modifications Copyright (C) 2020 Bell Canada. All rights reserved. + * Copyright (C) 2022 Nordix Foundation. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -28,13 +29,19 @@ import com.att.nsa.cambria.client.CambriaConsumer; import java.io.IOException; import java.net.MalformedURLException; import java.security.GeneralSecurityException; +import java.time.Duration; import java.util.ArrayList; +import java.util.Arrays; import java.util.Map; import java.util.Properties; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import lombok.Getter; import org.apache.commons.lang3.StringUtils; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; import org.onap.dmaap.mr.client.MRClientFactory; import org.onap.dmaap.mr.client.impl.MRConsumerImpl; import org.onap.dmaap.mr.client.impl.MRConsumerImpl.MRConsumerImplBuilder; @@ -219,6 +226,53 @@ public interface BusConsumer { } /** + * Kafka based consumer. + */ + public static class KafkaConsumerWrapper extends FetchingBusConsumer { + + /** + * logger. + */ + private static Logger logger = LoggerFactory.getLogger(KafkaConsumerWrapper.class); + + /** + * Kafka consumer. + */ + private KafkaConsumer<String, String> consumer; + + /** + * Kafka Consumer Wrapper. + * BusTopicParam object contains the following parameters + * servers messaging bus hosts. + * topic topic + * + * @param busTopicParams - The parameters for the bus topic + * @throws GeneralSecurityException - Security exception + * @throws MalformedURLException - Malformed URL exception + */ + public KafkaConsumerWrapper(BusTopicParams busTopicParams) { + super(busTopicParams); + } + + @Override + public Iterable<String> fetch() throws IOException { + // TODO: Not implemented yet + return new ArrayList<>(); + } + + @Override + public void close() { + super.close(); + this.consumer.close(); + } + + @Override + public String toString() { + return "KafkaConsumerWrapper [fetchTimeout=" + fetchTimeout + "]"; + } + } + + /** * MR based consumer. */ public abstract class DmaapConsumerWrapper extends FetchingBusConsumer { |