diff options
author | liamfallon <liam.fallon@ericsson.com> | 2018-09-14 23:05:56 +0100 |
---|---|---|
committer | liamfallon <liam.fallon@ericsson.com> | 2018-09-15 20:00:59 +0100 |
commit | e13ff2c6faf63caab2d47fa63777e965e32ec642 (patch) | |
tree | df11a546dbb03af6cf7a9eeaddd9d4a8275ce4cc /plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/main/java | |
parent | a65e4772f4557a109917532b2d9c49680ce3bb15 (diff) |
Re-implement Kafka tests that periodically fail
The Kafla integration tests fail, this change re-implements
the tests using a test framework from salesforce.com
i
Issue-ID: POLICY-1034
Change-Id: Iffcc9e0a9f419c8ec439771be7a7a58faa2f9860
Signed-off-by: liamfallon <liam.fallon@ericsson.com>
Diffstat (limited to 'plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/main/java')
2 files changed, 10 insertions, 1 deletions
diff --git a/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/main/java/org/onap/policy/apex/plugins/event/carrier/kafka/ApexKafkaConsumer.java b/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/main/java/org/onap/policy/apex/plugins/event/carrier/kafka/ApexKafkaConsumer.java index dfb12617c..be1b943d4 100644 --- a/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/main/java/org/onap/policy/apex/plugins/event/carrier/kafka/ApexKafkaConsumer.java +++ b/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/main/java/org/onap/policy/apex/plugins/event/carrier/kafka/ApexKafkaConsumer.java @@ -164,7 +164,7 @@ public class ApexKafkaConsumer implements ApexEventConsumer, Runnable { while (consumerThread.isAlive() && !stopOrderedFlag) { try { final ConsumerRecords<String, String> records = - kafkaConsumer.poll(kafkaConsumerProperties.getConsumerPollTime()); + kafkaConsumer.poll(kafkaConsumerProperties.getConsumerPollDuration()); for (final ConsumerRecord<String, String> record : records) { if (LOGGER.isTraceEnabled()) { LOGGER.trace("event received for {} for forwarding to Apex engine : {} {}", diff --git a/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/main/java/org/onap/policy/apex/plugins/event/carrier/kafka/KafkaCarrierTechnologyParameters.java b/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/main/java/org/onap/policy/apex/plugins/event/carrier/kafka/KafkaCarrierTechnologyParameters.java index 9d7cc77f3..7c24ce1aa 100644 --- a/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/main/java/org/onap/policy/apex/plugins/event/carrier/kafka/KafkaCarrierTechnologyParameters.java +++ b/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/main/java/org/onap/policy/apex/plugins/event/carrier/kafka/KafkaCarrierTechnologyParameters.java @@ -20,6 +20,7 @@ package org.onap.policy.apex.plugins.event.carrier.kafka; +import java.time.Duration; import java.util.Arrays; import java.util.Collection; import java.util.Properties; @@ -263,6 +264,14 @@ public class KafkaCarrierTechnologyParameters extends CarrierTechnologyParameter } /** + * Gets the consumer poll duration. + * @return The poll duration + */ + public Duration getConsumerPollDuration() { + return Duration.ofMillis(consumerPollTime); + } + + /** * Gets the consumer topic list. * * @return the consumer topic list |