diff options
Diffstat (limited to 'src/main/java')
-rw-r--r-- | src/main/java/com/att/dmf/mr/backends/kafka/Kafka011Consumer.java | 32 |
1 files changed, 13 insertions, 19 deletions
diff --git a/src/main/java/com/att/dmf/mr/backends/kafka/Kafka011Consumer.java b/src/main/java/com/att/dmf/mr/backends/kafka/Kafka011Consumer.java index f7f5ba7..6cd0230 100644 --- a/src/main/java/com/att/dmf/mr/backends/kafka/Kafka011Consumer.java +++ b/src/main/java/com/att/dmf/mr/backends/kafka/Kafka011Consumer.java @@ -42,8 +42,7 @@ import org.apache.kafka.common.KafkaException; import com.att.dmf.mr.backends.Consumer; import com.att.dmf.mr.constants.CambriaConstants; -//import org.slf4j.Logger; -//import org.slf4j.LoggerFactory; + import com.att.eelf.configuration.EELFLogger; import com.att.eelf.configuration.EELFManager; @@ -60,8 +59,7 @@ public class Kafka011Consumer implements Consumer { OPENED, CLOSED } - // @Autowired - // KafkaLiveLockAvoider kafkaLiveLockAvoider; + /** * KafkaConsumer() is constructor. It has following 4 parameters:- * @@ -126,7 +124,7 @@ public class Kafka011Consumer implements Consumer { records = kConsumer.poll(500); } for (ConsumerRecord<String, String> record : records) { - // foundMsgs = true; + fPendingMsgs.offer(record); } @@ -139,7 +137,7 @@ public class Kafka011Consumer implements Consumer { } - // return null; + return true; } }; @@ -258,7 +256,7 @@ public class Kafka011Consumer implements Consumer { /** * setting the kafkaConsumer state to closed */ - // public synchronized boolean close() { + public boolean close() { if (getState() == Kafka011Consumer.State.CLOSED) { @@ -266,7 +264,7 @@ public class Kafka011Consumer implements Consumer { return true; } - // fConnector.shutdown(); + boolean retVal = kafkaConnectorshuttask(); return retVal; @@ -280,13 +278,13 @@ public class Kafka011Consumer implements Consumer { public Boolean call() throws Exception { try { - // System.out.println("attempt to delete " + kConsumer); + kConsumer.close(); } catch (Exception e) { log.info("@Kafka Stream shutdown erorr occurred " + getName() + " " + e); throw new Exception("@Kafka Stream shutdown erorr occurred " + getName() + " " + e); - // return false; + } log.info("Kafka connection closure with in 15 seconds by a Executors task"); @@ -360,12 +358,12 @@ public class Kafka011Consumer implements Consumer { this.state = state; } - // private ConsumerConnector fConnector; + private final String fTopic; private final String fGroup; private final String fId; private final String fLogTag; - // private final KafkaStream<byte[], byte[]> fStream; + private KafkaConsumer<String, String> kConsumer; private long fCreateTimeMs; private long fLastTouch; @@ -374,7 +372,7 @@ public class Kafka011Consumer implements Consumer { private KafkaLiveLockAvoider2 fKafkaLiveLockAvoider; private static final EELFLogger log = EELFManager.getInstance().getLogger(Kafka011Consumer.class); private final LinkedBlockingQueue<ConsumerRecord<String, String>> fPendingMsgs; - //private ArrayList<Kafka011Consumer> fconsumerList; + @Override public void commitOffsets() { if (getState() == Kafka011Consumer.State.CLOSED) { @@ -382,7 +380,7 @@ public class Kafka011Consumer implements Consumer { return; } kConsumer.commitSync(); - // fConsumer.close(); + } @@ -395,9 +393,5 @@ public class Kafka011Consumer implements Consumer { public void setConsumerCache(KafkaConsumerCache cache) { } - //@Override - //public Message nextMessage(ArrayList<?> l) { - // TODO Auto-generated method stub - //return null; - //} + } |