summaryrefslogtreecommitdiffstats
path: root/plugins/plugins-event
diff options
context:
space:
mode:
authorJorge Hernandez <jh1730@att.com>2018-09-17 14:38:09 +0000
committerGerrit Code Review <gerrit@onap.org>2018-09-17 14:38:09 +0000
commit421672e34425963b97184104416bb131d5e7903a (patch)
treed74a9827ddc5d1c7bf899f5cf63ec2da37d0c502 /plugins/plugins-event
parent952d4f5aab9913f961e77fa0457da5d38f31d94f (diff)
parente13ff2c6faf63caab2d47fa63777e965e32ec642 (diff)
Merge "Re-implement Kafka tests that periodically fail"
Diffstat (limited to 'plugins/plugins-event')
-rw-r--r--plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/main/java/org/onap/policy/apex/plugins/event/carrier/kafka/ApexKafkaConsumer.java2
-rw-r--r--plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/main/java/org/onap/policy/apex/plugins/event/carrier/kafka/KafkaCarrierTechnologyParameters.java9
2 files changed, 10 insertions, 1 deletions
diff --git a/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/main/java/org/onap/policy/apex/plugins/event/carrier/kafka/ApexKafkaConsumer.java b/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/main/java/org/onap/policy/apex/plugins/event/carrier/kafka/ApexKafkaConsumer.java
index dfb12617c..be1b943d4 100644
--- a/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/main/java/org/onap/policy/apex/plugins/event/carrier/kafka/ApexKafkaConsumer.java
+++ b/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/main/java/org/onap/policy/apex/plugins/event/carrier/kafka/ApexKafkaConsumer.java
@@ -164,7 +164,7 @@ public class ApexKafkaConsumer implements ApexEventConsumer, Runnable {
while (consumerThread.isAlive() && !stopOrderedFlag) {
try {
final ConsumerRecords<String, String> records =
- kafkaConsumer.poll(kafkaConsumerProperties.getConsumerPollTime());
+ kafkaConsumer.poll(kafkaConsumerProperties.getConsumerPollDuration());
for (final ConsumerRecord<String, String> record : records) {
if (LOGGER.isTraceEnabled()) {
LOGGER.trace("event received for {} for forwarding to Apex engine : {} {}",
diff --git a/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/main/java/org/onap/policy/apex/plugins/event/carrier/kafka/KafkaCarrierTechnologyParameters.java b/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/main/java/org/onap/policy/apex/plugins/event/carrier/kafka/KafkaCarrierTechnologyParameters.java
index 9d7cc77f3..7c24ce1aa 100644
--- a/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/main/java/org/onap/policy/apex/plugins/event/carrier/kafka/KafkaCarrierTechnologyParameters.java
+++ b/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/main/java/org/onap/policy/apex/plugins/event/carrier/kafka/KafkaCarrierTechnologyParameters.java
@@ -20,6 +20,7 @@
package org.onap.policy.apex.plugins.event.carrier.kafka;
+import java.time.Duration;
import java.util.Arrays;
import java.util.Collection;
import java.util.Properties;
@@ -263,6 +264,14 @@ public class KafkaCarrierTechnologyParameters extends CarrierTechnologyParameter
}
/**
+ * Gets the consumer poll duration.
+ * @return The poll duration
+ */
+ public Duration getConsumerPollDuration() {
+ return Duration.ofMillis(consumerPollTime);
+ }
+
+ /**
* Gets the consumer topic list.
*
* @return the consumer topic list