summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--src/main/java/com/att/dmf/mr/backends/kafka/Kafka011Consumer.java32
1 files changed, 13 insertions, 19 deletions
diff --git a/src/main/java/com/att/dmf/mr/backends/kafka/Kafka011Consumer.java b/src/main/java/com/att/dmf/mr/backends/kafka/Kafka011Consumer.java
index f7f5ba7..6cd0230 100644
--- a/src/main/java/com/att/dmf/mr/backends/kafka/Kafka011Consumer.java
+++ b/src/main/java/com/att/dmf/mr/backends/kafka/Kafka011Consumer.java
@@ -42,8 +42,7 @@ import org.apache.kafka.common.KafkaException;
import com.att.dmf.mr.backends.Consumer;
import com.att.dmf.mr.constants.CambriaConstants;
-//import org.slf4j.Logger;
-//import org.slf4j.LoggerFactory;
+
import com.att.eelf.configuration.EELFLogger;
import com.att.eelf.configuration.EELFManager;
@@ -60,8 +59,7 @@ public class Kafka011Consumer implements Consumer {
OPENED, CLOSED
}
- // @Autowired
- // KafkaLiveLockAvoider kafkaLiveLockAvoider;
+
/**
* KafkaConsumer() is constructor. It has following 4 parameters:-
*
@@ -126,7 +124,7 @@ public class Kafka011Consumer implements Consumer {
records = kConsumer.poll(500);
}
for (ConsumerRecord<String, String> record : records) {
- // foundMsgs = true;
+
fPendingMsgs.offer(record);
}
@@ -139,7 +137,7 @@ public class Kafka011Consumer implements Consumer {
}
- // return null;
+
return true;
}
};
@@ -258,7 +256,7 @@ public class Kafka011Consumer implements Consumer {
/**
* setting the kafkaConsumer state to closed
*/
- // public synchronized boolean close() {
+
public boolean close() {
if (getState() == Kafka011Consumer.State.CLOSED) {
@@ -266,7 +264,7 @@ public class Kafka011Consumer implements Consumer {
return true;
}
- // fConnector.shutdown();
+
boolean retVal = kafkaConnectorshuttask();
return retVal;
@@ -280,13 +278,13 @@ public class Kafka011Consumer implements Consumer {
public Boolean call() throws Exception {
try {
- // System.out.println("attempt to delete " + kConsumer);
+
kConsumer.close();
} catch (Exception e) {
log.info("@Kafka Stream shutdown erorr occurred " + getName() + " " + e);
throw new Exception("@Kafka Stream shutdown erorr occurred " + getName() + " " + e);
- // return false;
+
}
log.info("Kafka connection closure with in 15 seconds by a Executors task");
@@ -360,12 +358,12 @@ public class Kafka011Consumer implements Consumer {
this.state = state;
}
- // private ConsumerConnector fConnector;
+
private final String fTopic;
private final String fGroup;
private final String fId;
private final String fLogTag;
- // private final KafkaStream<byte[], byte[]> fStream;
+
private KafkaConsumer<String, String> kConsumer;
private long fCreateTimeMs;
private long fLastTouch;
@@ -374,7 +372,7 @@ public class Kafka011Consumer implements Consumer {
private KafkaLiveLockAvoider2 fKafkaLiveLockAvoider;
private static final EELFLogger log = EELFManager.getInstance().getLogger(Kafka011Consumer.class);
private final LinkedBlockingQueue<ConsumerRecord<String, String>> fPendingMsgs;
- //private ArrayList<Kafka011Consumer> fconsumerList;
+
@Override
public void commitOffsets() {
if (getState() == Kafka011Consumer.State.CLOSED) {
@@ -382,7 +380,7 @@ public class Kafka011Consumer implements Consumer {
return;
}
kConsumer.commitSync();
- // fConsumer.close();
+
}
@@ -395,9 +393,5 @@ public class Kafka011Consumer implements Consumer {
public void setConsumerCache(KafkaConsumerCache cache) {
}
- //@Override
- //public Message nextMessage(ArrayList<?> l) {
- // TODO Auto-generated method stub
- //return null;
- //}
+
}