diff options
Diffstat (limited to 'policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedKafkaTopicSource.java')
-rw-r--r-- | policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedKafkaTopicSource.java | 24 |
1 files changed, 19 insertions, 5 deletions
diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedKafkaTopicSource.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedKafkaTopicSource.java index b8362b83..2a651ee7 100644 --- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedKafkaTopicSource.java +++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedKafkaTopicSource.java @@ -18,6 +18,8 @@ package org.onap.policy.common.endpoints.event.comm.bus.internal; +import java.net.MalformedURLException; +import java.util.Map; import org.onap.policy.common.endpoints.event.comm.Topic; import org.onap.policy.common.endpoints.event.comm.bus.KafkaTopicSource; @@ -27,6 +29,8 @@ import org.onap.policy.common.endpoints.event.comm.bus.KafkaTopicSource; */ public class SingleThreadedKafkaTopicSource extends SingleThreadedBusTopicSource implements KafkaTopicSource { + protected Map<String, String> additionalProps = null; + /** * Constructor. * @@ -35,19 +39,29 @@ public class SingleThreadedKafkaTopicSource extends SingleThreadedBusTopicSource */ public SingleThreadedKafkaTopicSource(BusTopicParams busTopicParams) { super(busTopicParams); - this.init(); + this.additionalProps = busTopicParams.getAdditionalProps(); + try { + this.init(); + } catch (Exception e) { + throw new IllegalArgumentException("ERROR during init in kafka-source: cannot create topic " + topic, e); + } } /** * Initialize the Cambria client. */ @Override - public void init() { - this.consumer = new BusConsumer.KafkaConsumerWrapper(BusTopicParams.builder() + public void init() throws MalformedURLException { + BusTopicParams.TopicParamsBuilder builder = BusTopicParams.builder() .servers(this.servers) .topic(this.effectiveTopic) - .useHttps(this.useHttps) - .build()); + .fetchTimeout(this.fetchTimeout) + .consumerGroup(this.consumerGroup) + .useHttps(this.useHttps); + + this.consumer = new BusConsumer.KafkaConsumerWrapper(builder + .additionalProps(this.additionalProps) + .build()); } @Override |