summaryrefslogtreecommitdiffstats
path: root/plugins/plugins-event
diff options
context:
space:
mode:
Diffstat (limited to 'plugins/plugins-event')
-rw-r--r--plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/main/java/org/onap/policy/apex/plugins/event/carrier/kafka/ApexKafkaConsumer.java65
1 files changed, 24 insertions, 41 deletions
diff --git a/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/main/java/org/onap/policy/apex/plugins/event/carrier/kafka/ApexKafkaConsumer.java b/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/main/java/org/onap/policy/apex/plugins/event/carrier/kafka/ApexKafkaConsumer.java
index 947dd5466..591f83237 100644
--- a/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/main/java/org/onap/policy/apex/plugins/event/carrier/kafka/ApexKafkaConsumer.java
+++ b/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/main/java/org/onap/policy/apex/plugins/event/carrier/kafka/ApexKafkaConsumer.java
@@ -49,70 +49,53 @@ public class ApexKafkaConsumer extends ApexPluginsEventConsumer {
// The event receiver that will receive events from this consumer
private ApexEventReceiver eventReceiver;
- // The Kafka consumer used to receive events using Kafka
- private KafkaConsumer<String, String> kafkaConsumer;
-
/**
* {@inheritDoc}.
*/
@Override
public void init(final String consumerName, final EventHandlerParameters consumerParameters,
- final ApexEventReceiver incomingEventReceiver) throws ApexEventException {
+ final ApexEventReceiver incomingEventReceiver) throws ApexEventException {
this.eventReceiver = incomingEventReceiver;
this.name = consumerName;
// Check and get the Kafka Properties
if (!(consumerParameters.getCarrierTechnologyParameters() instanceof KafkaCarrierTechnologyParameters)) {
- LOGGER.warn("specified consumer properties of type \""
- + consumerParameters.getCarrierTechnologyParameters().getClass().getName()
- + "\" are not applicable to a Kafka consumer");
throw new ApexEventException("specified consumer properties of type \""
- + consumerParameters.getCarrierTechnologyParameters().getClass().getName()
- + "\" are not applicable to a Kafka consumer");
- }
- kafkaConsumerProperties = (KafkaCarrierTechnologyParameters) consumerParameters
- .getCarrierTechnologyParameters();
-
- // Kick off the Kafka consumer
- kafkaConsumer = new KafkaConsumer<>(kafkaConsumerProperties.getKafkaConsumerProperties());
- kafkaConsumer.subscribe(kafkaConsumerProperties.getConsumerTopicListAsCollection());
- if (LOGGER.isDebugEnabled()) {
- LOGGER.debug("event receiver for " + this.getClass().getName() + ":" + this.name + " subscribed to topics: "
- + kafkaConsumerProperties.getConsumerTopicList());
+ + consumerParameters.getCarrierTechnologyParameters().getClass().getName()
+ + "\" are not applicable to a Kafka consumer");
}
+ kafkaConsumerProperties =
+ (KafkaCarrierTechnologyParameters) consumerParameters.getCarrierTechnologyParameters();
}
-
/**
* {@inheritDoc}.
*/
@Override
public void run() {
// Kick off the Kafka consumer
- kafkaConsumer = new KafkaConsumer<>(kafkaConsumerProperties.getKafkaConsumerProperties());
- kafkaConsumer.subscribe(kafkaConsumerProperties.getConsumerTopicListAsCollection());
- if (LOGGER.isDebugEnabled()) {
- LOGGER.debug("event receiver for " + this.getClass().getName() + ":" + this.name + " subscribed to topics: "
- + kafkaConsumerProperties.getConsumerTopicList());
- }
+ try (KafkaConsumer<String, String> kafkaConsumer =
+ new KafkaConsumer<>(kafkaConsumerProperties.getKafkaConsumerProperties())) {
+ kafkaConsumer.subscribe(kafkaConsumerProperties.getConsumerTopicListAsCollection());
+ if (LOGGER.isDebugEnabled()) {
+ LOGGER.debug("event receiver for {}:{} subscribed to topics: {}", this.getClass().getName(), this.name,
+ kafkaConsumerProperties.getConsumerTopicList());
+ }
- // The endless loop that receives events over Kafka
- while (consumerThread.isAlive() && !stopOrderedFlag) {
- try {
- final ConsumerRecords<String, String> records = kafkaConsumer
- .poll(kafkaConsumerProperties.getConsumerPollDuration());
- for (final ConsumerRecord<String, String> record : records) {
- traceIfTraceEnabled(record);
- eventReceiver.receiveEvent(new Properties(), record.value());
+ // The endless loop that receives events over Kafka
+ while (consumerThread.isAlive() && !stopOrderedFlag) {
+ try {
+ final ConsumerRecords<String, String> records =
+ kafkaConsumer.poll(kafkaConsumerProperties.getConsumerPollDuration());
+ for (final ConsumerRecord<String, String> record : records) {
+ traceIfTraceEnabled(record);
+ eventReceiver.receiveEvent(new Properties(), record.value());
+ }
+ } catch (final Exception e) {
+ LOGGER.warn("error receiving events on thread {}", consumerThread.getName(), e);
}
- } catch (final Exception e) {
- LOGGER.warn("error receiving events on thread {}", consumerThread.getName(), e);
}
}
-
- if (!consumerThread.isInterrupted()) {
- kafkaConsumer.close();
- }
}
/**
@@ -123,7 +106,7 @@ public class ApexKafkaConsumer extends ApexPluginsEventConsumer {
private void traceIfTraceEnabled(final ConsumerRecord<String, String> record) {
if (LOGGER.isTraceEnabled()) {
LOGGER.trace("event received for {} for forwarding to Apex engine : {} {}",
- this.getClass().getName() + ":" + this.name, record.key(), record.value());
+ this.getClass().getName() + ":" + this.name, record.key(), record.value());
}
}