diff options
Diffstat (limited to 'plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka')
2 files changed, 10 insertions, 1 deletions
diff --git a/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/main/java/org/onap/policy/apex/plugins/event/carrier/kafka/ApexKafkaConsumer.java b/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/main/java/org/onap/policy/apex/plugins/event/carrier/kafka/ApexKafkaConsumer.java index dfb12617c..be1b943d4 100644 --- a/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/main/java/org/onap/policy/apex/plugins/event/carrier/kafka/ApexKafkaConsumer.java +++ b/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/main/java/org/onap/policy/apex/plugins/event/carrier/kafka/ApexKafkaConsumer.java @@ -164,7 +164,7 @@ public class ApexKafkaConsumer implements ApexEventConsumer, Runnable { while (consumerThread.isAlive() && !stopOrderedFlag) { try { final ConsumerRecords<String, String> records = - kafkaConsumer.poll(kafkaConsumerProperties.getConsumerPollTime()); + kafkaConsumer.poll(kafkaConsumerProperties.getConsumerPollDuration()); for (final ConsumerRecord<String, String> record : records) { if (LOGGER.isTraceEnabled()) { LOGGER.trace("event received for {} for forwarding to Apex engine : {} {}", diff --git a/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/main/java/org/onap/policy/apex/plugins/event/carrier/kafka/KafkaCarrierTechnologyParameters.java b/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/main/java/org/onap/policy/apex/plugins/event/carrier/kafka/KafkaCarrierTechnologyParameters.java index 9d7cc77f3..7c24ce1aa 100644 --- a/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/main/java/org/onap/policy/apex/plugins/event/carrier/kafka/KafkaCarrierTechnologyParameters.java +++ b/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/main/java/org/onap/policy/apex/plugins/event/carrier/kafka/KafkaCarrierTechnologyParameters.java @@ -20,6 +20,7 @@ package org.onap.policy.apex.plugins.event.carrier.kafka; +import java.time.Duration; import java.util.Arrays; import java.util.Collection; import java.util.Properties; @@ -263,6 +264,14 @@ public class KafkaCarrierTechnologyParameters extends CarrierTechnologyParameter } /** + * Gets the consumer poll duration. + * @return The poll duration + */ + public Duration getConsumerPollDuration() { + return Duration.ofMillis(consumerPollTime); + } + + /** * Gets the consumer topic list. * * @return the consumer topic list |