summaryrefslogtreecommitdiffstats
path: root/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src
diff options
context:
space:
mode:
Diffstat (limited to 'plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src')
-rw-r--r--plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/main/java/org/onap/policy/apex/plugins/event/carrier/kafka/ApexKafkaConsumer.java25
1 files changed, 13 insertions, 12 deletions
diff --git a/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/main/java/org/onap/policy/apex/plugins/event/carrier/kafka/ApexKafkaConsumer.java b/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/main/java/org/onap/policy/apex/plugins/event/carrier/kafka/ApexKafkaConsumer.java
index 17738f194..a99258a48 100644
--- a/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/main/java/org/onap/policy/apex/plugins/event/carrier/kafka/ApexKafkaConsumer.java
+++ b/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/main/java/org/onap/policy/apex/plugins/event/carrier/kafka/ApexKafkaConsumer.java
@@ -1,6 +1,7 @@
/*-
* ============LICENSE_START=======================================================
* Copyright (C) 2016-2018 Ericsson. All rights reserved.
+ * Modifications Copyright (C) 2019 Nordix Foundation.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -71,28 +72,28 @@ public class ApexKafkaConsumer implements ApexEventConsumer, Runnable {
*/
@Override
public void init(final String consumerName, final EventHandlerParameters consumerParameters,
- final ApexEventReceiver incomingEventReceiver) throws ApexEventException {
+ final ApexEventReceiver incomingEventReceiver) throws ApexEventException {
this.eventReceiver = incomingEventReceiver;
this.name = consumerName;
// Check and get the Kafka Properties
if (!(consumerParameters.getCarrierTechnologyParameters() instanceof KafkaCarrierTechnologyParameters)) {
LOGGER.warn("specified consumer properties of type \""
- + consumerParameters.getCarrierTechnologyParameters().getClass().getCanonicalName()
- + "\" are not applicable to a Kafka consumer");
+ + consumerParameters.getCarrierTechnologyParameters().getClass().getName()
+ + "\" are not applicable to a Kafka consumer");
throw new ApexEventException("specified consumer properties of type \""
- + consumerParameters.getCarrierTechnologyParameters().getClass().getCanonicalName()
- + "\" are not applicable to a Kafka consumer");
+ + consumerParameters.getCarrierTechnologyParameters().getClass().getName()
+ + "\" are not applicable to a Kafka consumer");
}
- kafkaConsumerProperties =
- (KafkaCarrierTechnologyParameters) consumerParameters.getCarrierTechnologyParameters();
+ kafkaConsumerProperties = (KafkaCarrierTechnologyParameters) consumerParameters
+ .getCarrierTechnologyParameters();
// Kick off the Kafka consumer
kafkaConsumer = new KafkaConsumer<>(kafkaConsumerProperties.getKafkaConsumerProperties());
kafkaConsumer.subscribe(kafkaConsumerProperties.getConsumerTopicListAsCollection());
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("event receiver for " + this.getClass().getName() + ":" + this.name + " subscribed to topics: "
- + kafkaConsumerProperties.getConsumerTopicList());
+ + kafkaConsumerProperties.getConsumerTopicList());
}
}
@@ -142,14 +143,14 @@ public class ApexKafkaConsumer implements ApexEventConsumer, Runnable {
kafkaConsumer.subscribe(kafkaConsumerProperties.getConsumerTopicListAsCollection());
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("event receiver for " + this.getClass().getName() + ":" + this.name + " subscribed to topics: "
- + kafkaConsumerProperties.getConsumerTopicList());
+ + kafkaConsumerProperties.getConsumerTopicList());
}
// The endless loop that receives events over Kafka
while (consumerThread.isAlive() && !stopOrderedFlag) {
try {
- final ConsumerRecords<String, String> records =
- kafkaConsumer.poll(kafkaConsumerProperties.getConsumerPollDuration());
+ final ConsumerRecords<String, String> records = kafkaConsumer
+ .poll(kafkaConsumerProperties.getConsumerPollDuration());
for (final ConsumerRecord<String, String> record : records) {
traceIfTraceEnabled(record);
eventReceiver.receiveEvent(new Properties(), record.value());
@@ -172,7 +173,7 @@ public class ApexKafkaConsumer implements ApexEventConsumer, Runnable {
private void traceIfTraceEnabled(final ConsumerRecord<String, String> record) {
if (LOGGER.isTraceEnabled()) {
LOGGER.trace("event received for {} for forwarding to Apex engine : {} {}",
- this.getClass().getName() + ":" + this.name, record.key(), record.value());
+ this.getClass().getName() + ":" + this.name, record.key(), record.value());
}
}