diff options
Diffstat (limited to 'plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka')
-rw-r--r-- | plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/main/java/org/onap/policy/apex/plugins/event/carrier/kafka/ApexKafkaConsumer.java | 65 |
1 files changed, 24 insertions, 41 deletions
diff --git a/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/main/java/org/onap/policy/apex/plugins/event/carrier/kafka/ApexKafkaConsumer.java b/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/main/java/org/onap/policy/apex/plugins/event/carrier/kafka/ApexKafkaConsumer.java index 947dd5466..591f83237 100644 --- a/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/main/java/org/onap/policy/apex/plugins/event/carrier/kafka/ApexKafkaConsumer.java +++ b/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/main/java/org/onap/policy/apex/plugins/event/carrier/kafka/ApexKafkaConsumer.java @@ -49,70 +49,53 @@ public class ApexKafkaConsumer extends ApexPluginsEventConsumer { // The event receiver that will receive events from this consumer private ApexEventReceiver eventReceiver; - // The Kafka consumer used to receive events using Kafka - private KafkaConsumer<String, String> kafkaConsumer; - /** * {@inheritDoc}. */ @Override public void init(final String consumerName, final EventHandlerParameters consumerParameters, - final ApexEventReceiver incomingEventReceiver) throws ApexEventException { + final ApexEventReceiver incomingEventReceiver) throws ApexEventException { this.eventReceiver = incomingEventReceiver; this.name = consumerName; // Check and get the Kafka Properties if (!(consumerParameters.getCarrierTechnologyParameters() instanceof KafkaCarrierTechnologyParameters)) { - LOGGER.warn("specified consumer properties of type \"" - + consumerParameters.getCarrierTechnologyParameters().getClass().getName() - + "\" are not applicable to a Kafka consumer"); throw new ApexEventException("specified consumer properties of type \"" - + consumerParameters.getCarrierTechnologyParameters().getClass().getName() - + "\" are not applicable to a Kafka consumer"); - } - kafkaConsumerProperties = (KafkaCarrierTechnologyParameters) consumerParameters - .getCarrierTechnologyParameters(); - - // Kick off the Kafka consumer - kafkaConsumer = new KafkaConsumer<>(kafkaConsumerProperties.getKafkaConsumerProperties()); - kafkaConsumer.subscribe(kafkaConsumerProperties.getConsumerTopicListAsCollection()); - if (LOGGER.isDebugEnabled()) { - LOGGER.debug("event receiver for " + this.getClass().getName() + ":" + this.name + " subscribed to topics: " - + kafkaConsumerProperties.getConsumerTopicList()); + + consumerParameters.getCarrierTechnologyParameters().getClass().getName() + + "\" are not applicable to a Kafka consumer"); } + kafkaConsumerProperties = + (KafkaCarrierTechnologyParameters) consumerParameters.getCarrierTechnologyParameters(); } - /** * {@inheritDoc}. */ @Override public void run() { // Kick off the Kafka consumer - kafkaConsumer = new KafkaConsumer<>(kafkaConsumerProperties.getKafkaConsumerProperties()); - kafkaConsumer.subscribe(kafkaConsumerProperties.getConsumerTopicListAsCollection()); - if (LOGGER.isDebugEnabled()) { - LOGGER.debug("event receiver for " + this.getClass().getName() + ":" + this.name + " subscribed to topics: " - + kafkaConsumerProperties.getConsumerTopicList()); - } + try (KafkaConsumer<String, String> kafkaConsumer = + new KafkaConsumer<>(kafkaConsumerProperties.getKafkaConsumerProperties())) { + kafkaConsumer.subscribe(kafkaConsumerProperties.getConsumerTopicListAsCollection()); + if (LOGGER.isDebugEnabled()) { + LOGGER.debug("event receiver for {}:{} subscribed to topics: {}", this.getClass().getName(), this.name, + kafkaConsumerProperties.getConsumerTopicList()); + } - // The endless loop that receives events over Kafka - while (consumerThread.isAlive() && !stopOrderedFlag) { - try { - final ConsumerRecords<String, String> records = kafkaConsumer - .poll(kafkaConsumerProperties.getConsumerPollDuration()); - for (final ConsumerRecord<String, String> record : records) { - traceIfTraceEnabled(record); - eventReceiver.receiveEvent(new Properties(), record.value()); + // The endless loop that receives events over Kafka + while (consumerThread.isAlive() && !stopOrderedFlag) { + try { + final ConsumerRecords<String, String> records = + kafkaConsumer.poll(kafkaConsumerProperties.getConsumerPollDuration()); + for (final ConsumerRecord<String, String> record : records) { + traceIfTraceEnabled(record); + eventReceiver.receiveEvent(new Properties(), record.value()); + } + } catch (final Exception e) { + LOGGER.warn("error receiving events on thread {}", consumerThread.getName(), e); } - } catch (final Exception e) { - LOGGER.warn("error receiving events on thread {}", consumerThread.getName(), e); } } - - if (!consumerThread.isInterrupted()) { - kafkaConsumer.close(); - } } /** @@ -123,7 +106,7 @@ public class ApexKafkaConsumer extends ApexPluginsEventConsumer { private void traceIfTraceEnabled(final ConsumerRecord<String, String> record) { if (LOGGER.isTraceEnabled()) { LOGGER.trace("event received for {} for forwarding to Apex engine : {} {}", - this.getClass().getName() + ":" + this.name, record.key(), record.value()); + this.getClass().getName() + ":" + this.name, record.key(), record.value()); } } |