From e13ff2c6faf63caab2d47fa63777e965e32ec642 Mon Sep 17 00:00:00 2001 From: liamfallon Date: Fri, 14 Sep 2018 23:05:56 +0100 Subject: Re-implement Kafka tests that periodically fail The Kafla integration tests fail, this change re-implements the tests using a test framework from salesforce.com i Issue-ID: POLICY-1034 Change-Id: Iffcc9e0a9f419c8ec439771be7a7a58faa2f9860 Signed-off-by: liamfallon --- .../apex/plugins/event/carrier/kafka/ApexKafkaConsumer.java | 2 +- .../event/carrier/kafka/KafkaCarrierTechnologyParameters.java | 9 +++++++++ 2 files changed, 10 insertions(+), 1 deletion(-) (limited to 'plugins/plugins-event/plugins-event-carrier') diff --git a/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/main/java/org/onap/policy/apex/plugins/event/carrier/kafka/ApexKafkaConsumer.java b/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/main/java/org/onap/policy/apex/plugins/event/carrier/kafka/ApexKafkaConsumer.java index dfb12617c..be1b943d4 100644 --- a/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/main/java/org/onap/policy/apex/plugins/event/carrier/kafka/ApexKafkaConsumer.java +++ b/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/main/java/org/onap/policy/apex/plugins/event/carrier/kafka/ApexKafkaConsumer.java @@ -164,7 +164,7 @@ public class ApexKafkaConsumer implements ApexEventConsumer, Runnable { while (consumerThread.isAlive() && !stopOrderedFlag) { try { final ConsumerRecords records = - kafkaConsumer.poll(kafkaConsumerProperties.getConsumerPollTime()); + kafkaConsumer.poll(kafkaConsumerProperties.getConsumerPollDuration()); for (final ConsumerRecord record : records) { if (LOGGER.isTraceEnabled()) { LOGGER.trace("event received for {} for forwarding to Apex engine : {} {}", diff --git a/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/main/java/org/onap/policy/apex/plugins/event/carrier/kafka/KafkaCarrierTechnologyParameters.java b/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/main/java/org/onap/policy/apex/plugins/event/carrier/kafka/KafkaCarrierTechnologyParameters.java index 9d7cc77f3..7c24ce1aa 100644 --- a/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/main/java/org/onap/policy/apex/plugins/event/carrier/kafka/KafkaCarrierTechnologyParameters.java +++ b/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/main/java/org/onap/policy/apex/plugins/event/carrier/kafka/KafkaCarrierTechnologyParameters.java @@ -20,6 +20,7 @@ package org.onap.policy.apex.plugins.event.carrier.kafka; +import java.time.Duration; import java.util.Arrays; import java.util.Collection; import java.util.Properties; @@ -262,6 +263,14 @@ public class KafkaCarrierTechnologyParameters extends CarrierTechnologyParameter return consumerPollTime; } + /** + * Gets the consumer poll duration. + * @return The poll duration + */ + public Duration getConsumerPollDuration() { + return Duration.ofMillis(consumerPollTime); + } + /** * Gets the consumer topic list. * -- cgit 1.2.3-korg