From f134a5eb8bc9ddb6c1dea1a58d440bfdec6dab5c Mon Sep 17 00:00:00 2001 From: liamfallon Date: Thu, 2 Apr 2020 20:54:52 +0100 Subject: Fix failing Kafka tests All the Kafka components need something near 10 seconds to come up completely. This review tweaks the timing to allow the test Kafka server to come up and to allow the consumers to connect to it. Issue-ID: POLICY-2106 Change-Id: I6dd8ace0848bdc2549e658ef8908b4d85d5ea789 Signed-off-by: liamfallon --- .../event/carrier/kafka/ApexKafkaConsumer.java | 65 ++++++++-------------- 1 file changed, 24 insertions(+), 41 deletions(-) (limited to 'plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/main/java') diff --git a/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/main/java/org/onap/policy/apex/plugins/event/carrier/kafka/ApexKafkaConsumer.java b/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/main/java/org/onap/policy/apex/plugins/event/carrier/kafka/ApexKafkaConsumer.java index 947dd5466..591f83237 100644 --- a/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/main/java/org/onap/policy/apex/plugins/event/carrier/kafka/ApexKafkaConsumer.java +++ b/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/main/java/org/onap/policy/apex/plugins/event/carrier/kafka/ApexKafkaConsumer.java @@ -49,70 +49,53 @@ public class ApexKafkaConsumer extends ApexPluginsEventConsumer { // The event receiver that will receive events from this consumer private ApexEventReceiver eventReceiver; - // The Kafka consumer used to receive events using Kafka - private KafkaConsumer kafkaConsumer; - /** * {@inheritDoc}. */ @Override public void init(final String consumerName, final EventHandlerParameters consumerParameters, - final ApexEventReceiver incomingEventReceiver) throws ApexEventException { + final ApexEventReceiver incomingEventReceiver) throws ApexEventException { this.eventReceiver = incomingEventReceiver; this.name = consumerName; // Check and get the Kafka Properties if (!(consumerParameters.getCarrierTechnologyParameters() instanceof KafkaCarrierTechnologyParameters)) { - LOGGER.warn("specified consumer properties of type \"" - + consumerParameters.getCarrierTechnologyParameters().getClass().getName() - + "\" are not applicable to a Kafka consumer"); throw new ApexEventException("specified consumer properties of type \"" - + consumerParameters.getCarrierTechnologyParameters().getClass().getName() - + "\" are not applicable to a Kafka consumer"); - } - kafkaConsumerProperties = (KafkaCarrierTechnologyParameters) consumerParameters - .getCarrierTechnologyParameters(); - - // Kick off the Kafka consumer - kafkaConsumer = new KafkaConsumer<>(kafkaConsumerProperties.getKafkaConsumerProperties()); - kafkaConsumer.subscribe(kafkaConsumerProperties.getConsumerTopicListAsCollection()); - if (LOGGER.isDebugEnabled()) { - LOGGER.debug("event receiver for " + this.getClass().getName() + ":" + this.name + " subscribed to topics: " - + kafkaConsumerProperties.getConsumerTopicList()); + + consumerParameters.getCarrierTechnologyParameters().getClass().getName() + + "\" are not applicable to a Kafka consumer"); } + kafkaConsumerProperties = + (KafkaCarrierTechnologyParameters) consumerParameters.getCarrierTechnologyParameters(); } - /** * {@inheritDoc}. */ @Override public void run() { // Kick off the Kafka consumer - kafkaConsumer = new KafkaConsumer<>(kafkaConsumerProperties.getKafkaConsumerProperties()); - kafkaConsumer.subscribe(kafkaConsumerProperties.getConsumerTopicListAsCollection()); - if (LOGGER.isDebugEnabled()) { - LOGGER.debug("event receiver for " + this.getClass().getName() + ":" + this.name + " subscribed to topics: " - + kafkaConsumerProperties.getConsumerTopicList()); - } + try (KafkaConsumer kafkaConsumer = + new KafkaConsumer<>(kafkaConsumerProperties.getKafkaConsumerProperties())) { + kafkaConsumer.subscribe(kafkaConsumerProperties.getConsumerTopicListAsCollection()); + if (LOGGER.isDebugEnabled()) { + LOGGER.debug("event receiver for {}:{} subscribed to topics: {}", this.getClass().getName(), this.name, + kafkaConsumerProperties.getConsumerTopicList()); + } - // The endless loop that receives events over Kafka - while (consumerThread.isAlive() && !stopOrderedFlag) { - try { - final ConsumerRecords records = kafkaConsumer - .poll(kafkaConsumerProperties.getConsumerPollDuration()); - for (final ConsumerRecord record : records) { - traceIfTraceEnabled(record); - eventReceiver.receiveEvent(new Properties(), record.value()); + // The endless loop that receives events over Kafka + while (consumerThread.isAlive() && !stopOrderedFlag) { + try { + final ConsumerRecords records = + kafkaConsumer.poll(kafkaConsumerProperties.getConsumerPollDuration()); + for (final ConsumerRecord record : records) { + traceIfTraceEnabled(record); + eventReceiver.receiveEvent(new Properties(), record.value()); + } + } catch (final Exception e) { + LOGGER.warn("error receiving events on thread {}", consumerThread.getName(), e); } - } catch (final Exception e) { - LOGGER.warn("error receiving events on thread {}", consumerThread.getName(), e); } } - - if (!consumerThread.isInterrupted()) { - kafkaConsumer.close(); - } } /** @@ -123,7 +106,7 @@ public class ApexKafkaConsumer extends ApexPluginsEventConsumer { private void traceIfTraceEnabled(final ConsumerRecord record) { if (LOGGER.isTraceEnabled()) { LOGGER.trace("event received for {} for forwarding to Apex engine : {} {}", - this.getClass().getName() + ":" + this.name, record.key(), record.value()); + this.getClass().getName() + ":" + this.name, record.key(), record.value()); } } -- cgit 1.2.3-korg