diff options
author | Sirisha_Manchikanti <sirisha.manchikanti@est.tech> | 2022-07-01 07:15:00 +0100 |
---|---|---|
committer | Sirisha_Manchikanti <sirisha.manchikanti@est.tech> | 2022-07-22 20:17:46 +0100 |
commit | 2a2b5d085876480c1b0d9470a57c6cab4f51008c (patch) | |
tree | 64bd2ded017ad43bd995686e2e7b8f1ffa412a40 /policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusConsumer.java | |
parent | 15ede09d6ef4b52db05fab534eed7192991b1f98 (diff) |
Introduce Custom Kafka End point
Issue-ID: POLICY-4133
Signed-off-by: Sirisha_Manchikanti <sirisha.manchikanti@est.tech>
Change-Id: I2745f3af97e9bb83d94c5cb6d29dfd452d315506
Diffstat (limited to 'policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusConsumer.java')
-rw-r--r-- | policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusConsumer.java | 54 |
1 files changed, 54 insertions, 0 deletions
diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusConsumer.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusConsumer.java index 20f4c91c..8d88b0d9 100644 --- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusConsumer.java +++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusConsumer.java @@ -5,6 +5,7 @@ * Copyright (C) 2017-2021 AT&T Intellectual Property. All rights reserved. * Modifications Copyright (C) 2018 Samsung Electronics Co., Ltd. * Modifications Copyright (C) 2020 Bell Canada. All rights reserved. + * Copyright (C) 2022 Nordix Foundation. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -28,13 +29,19 @@ import com.att.nsa.cambria.client.CambriaConsumer; import java.io.IOException; import java.net.MalformedURLException; import java.security.GeneralSecurityException; +import java.time.Duration; import java.util.ArrayList; +import java.util.Arrays; import java.util.Map; import java.util.Properties; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import lombok.Getter; import org.apache.commons.lang3.StringUtils; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; import org.onap.dmaap.mr.client.MRClientFactory; import org.onap.dmaap.mr.client.impl.MRConsumerImpl; import org.onap.dmaap.mr.client.impl.MRConsumerImpl.MRConsumerImplBuilder; @@ -219,6 +226,53 @@ public interface BusConsumer { } /** + * Kafka based consumer. + */ + public static class KafkaConsumerWrapper extends FetchingBusConsumer { + + /** + * logger. + */ + private static Logger logger = LoggerFactory.getLogger(KafkaConsumerWrapper.class); + + /** + * Kafka consumer. + */ + private KafkaConsumer<String, String> consumer; + + /** + * Kafka Consumer Wrapper. + * BusTopicParam object contains the following parameters + * servers messaging bus hosts. + * topic topic + * + * @param busTopicParams - The parameters for the bus topic + * @throws GeneralSecurityException - Security exception + * @throws MalformedURLException - Malformed URL exception + */ + public KafkaConsumerWrapper(BusTopicParams busTopicParams) { + super(busTopicParams); + } + + @Override + public Iterable<String> fetch() throws IOException { + // TODO: Not implemented yet + return new ArrayList<>(); + } + + @Override + public void close() { + super.close(); + this.consumer.close(); + } + + @Override + public String toString() { + return "KafkaConsumerWrapper [fetchTimeout=" + fetchTimeout + "]"; + } + } + + /** * MR based consumer. */ public abstract class DmaapConsumerWrapper extends FetchingBusConsumer { |