summaryrefslogtreecommitdiffstats
path: root/src/main/java/com/att/dmf/mr/beans/DMaaPKafkaConsumerFactory.java
diff options
context:
space:
mode:
Diffstat (limited to 'src/main/java/com/att/dmf/mr/beans/DMaaPKafkaConsumerFactory.java')
-rw-r--r--src/main/java/com/att/dmf/mr/beans/DMaaPKafkaConsumerFactory.java8
1 files changed, 5 insertions, 3 deletions
diff --git a/src/main/java/com/att/dmf/mr/beans/DMaaPKafkaConsumerFactory.java b/src/main/java/com/att/dmf/mr/beans/DMaaPKafkaConsumerFactory.java
index e4e09c8..f60fd53 100644
--- a/src/main/java/com/att/dmf/mr/beans/DMaaPKafkaConsumerFactory.java
+++ b/src/main/java/com/att/dmf/mr/beans/DMaaPKafkaConsumerFactory.java
@@ -185,9 +185,11 @@ public class DMaaPKafkaConsumerFactory implements ConsumerFactory {
log.info("Creating Kafka consumer for group [" + consumerGroupName + "], consumer [" + consumerId
+ "], on topic [" + topic + "].");
-
- fCache.signalOwnership(topic, consumerGroupName, consumerId);
-
+
+ if (fCache != null) {
+ fCache.signalOwnership(topic, consumerGroupName, consumerId);
+ }
+
final Properties props = createConsumerConfig(topic,consumerGroupName, consumerId);
long fCreateTimeMs = System.currentTimeMillis();
KafkaConsumer<String, String> cc = new KafkaConsumer<>(props);