summaryrefslogtreecommitdiffstats
path: root/src/main/java/com/att/dmf/mr/backends/kafka/KafkaConsumer.txt
diff options
context:
space:
mode:
Diffstat (limited to 'src/main/java/com/att/dmf/mr/backends/kafka/KafkaConsumer.txt')
-rw-r--r--src/main/java/com/att/dmf/mr/backends/kafka/KafkaConsumer.txt386
1 files changed, 0 insertions, 386 deletions
diff --git a/src/main/java/com/att/dmf/mr/backends/kafka/KafkaConsumer.txt b/src/main/java/com/att/dmf/mr/backends/kafka/KafkaConsumer.txt
deleted file mode 100644
index dd6259f..0000000
--- a/src/main/java/com/att/dmf/mr/backends/kafka/KafkaConsumer.txt
+++ /dev/null
@@ -1,386 +0,0 @@
-package com.att.dmf.mr.backends.kafka;
-
-import java.util.Arrays;
-import java.util.Properties;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.FutureTask;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.RunnableFuture;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-
-import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.apache.kafka.clients.consumer.ConsumerRecords;
-import org.apache.kafka.common.KafkaException;
-
-import com.att.dmf.mr.backends.Consumer;
-
-//import org.slf4j.Logger;
-//import org.slf4j.LoggerFactory;
-
-import com.att.eelf.configuration.EELFLogger;
-import com.att.eelf.configuration.EELFManager;
-
-/**
- * A consumer instance that's created per-request. These are stateless so that
- * clients can connect to this service as a proxy.
- *
- * @author peter
- *
- */
-public class KafkaConsumer implements Consumer {
- private enum State {
- OPENED, CLOSED
- }
-
- /**
- * KafkaConsumer() is constructor. It has following 4 parameters:-
- *
- * @param topic
- * @param group
- * @param id
- * @param cc
- *
- */
-
- public KafkaConsumer(String topic, String group, String id, Properties prop) throws Exception {
- fTopic = topic;
- fGroup = group;
- fId = id;
- // fConnector = cc;
-
- fCreateTimeMs = System.currentTimeMillis();
- fLastTouch = fCreateTimeMs;
- fPendingMsgs = new LinkedBlockingQueue<ConsumerRecord<String,String>> ();
- fLogTag = fGroup + "(" + fId + ")/" + fTopic;
- offset = 0;
-
- state = KafkaConsumer.State.OPENED;
-
- // final Map<String, Integer> topicCountMap = new HashMap<String,
- // Integer>();
- // topicCountMap.put(fTopic, 1);
- // log.info(fLogTag +" kafka Consumer started at "
- // +System.currentTimeMillis());
- // final Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap =
- // fConnector.createMessageStreams(topicCountMap);
- // final List<KafkaStream<byte[], byte[]>> streams =
- // consumerMap.get(fTopic);
-
- kConsumer = new org.apache.kafka.clients.consumer.KafkaConsumer<>(prop);
- // System.out.println("I am in Consumer APP " + topic + "-- " +
- // fConsumer);
- kConsumer.subscribe(Arrays.asList(topic));
- log.info(fLogTag + " kafka stream created in " + (System.currentTimeMillis() - fCreateTimeMs));
- System.out.println("-----id " +id);
-
-
- try { ConsumerRecords<String, String> records =
- kConsumer.poll(500); System.out.println("---" +
- records.count());
-
- for (ConsumerRecord<String, String> record : records) {
- System.out.printf("offset = %d, key = %s, value = %s",
- record.offset(), record.key(), record.value()); String t =
- record.value();
-
- }
- }catch(Exception e){
- System.out.println( e);
- }
- System.out.println(fLogTag + " kafka stream created in " + (System.currentTimeMillis() - fCreateTimeMs));
- kConsumer.commitSync();
- // fConsumer.close();
-
-
- /*
- * ConsumerRecords<String, String> records = fConsumer.poll(500);
- * System.out.println("---" + records.count());
- *
- * for (ConsumerRecord<String, String> record : records) {
- * System.out.printf("offset = %d, key = %s, value = %s",
- * record.offset(), record.key(), record.value()); String t =
- * record.value();
- *
- * }
- *
- *
- * fConsumer.commitSync(); fConsumer.close();
- */
-
- // fStream = streams.iterator().next();
- }
-
-
-
- private Consumer.Message makeMessage ( final ConsumerRecord<String,String> msg )
- {
- return new Consumer.Message()
- {
- @Override
- public long getOffset ()
- {
- return msg.offset ();
- }
-
- @Override
- public String getMessage ()
- {
- return new String ( msg.value () );
- }
- };
- }
-
- @Override
- public synchronized Consumer.Message nextMessage ()
- {
-
- try
- {
- if ( fPendingMsgs.size () > 0 )
- {
- return makeMessage ( fPendingMsgs.take () );
- }
- }
- catch ( InterruptedException x )
- {
- log.warn ( "After size>0, pending msg take() threw InterruptedException. Ignoring. (" + x.getMessage () + ")", x );
- }
-
-
- try
- {
- boolean foundMsgs = false;
- System.out.println("entering into pollingWWWWWWWWWWWWWWWWW");
- final ConsumerRecords<String,String> records = kConsumer.poll ( 100 );
- System.out.println("polling doneXXXXXXXXXXXXXXXXXXXXXXXXXXX....");
- for ( ConsumerRecord<String,String> record : records )
- {
- foundMsgs = true;
- fPendingMsgs.offer ( record );
- }
-
- }
- catch ( KafkaException x )
- {
- log.debug ( fLogTag + ": KafkaException " + x.getMessage () );
-
- }
- catch ( java.lang.IllegalStateException | java.lang.IllegalArgumentException x )
- {
- log.error ( fLogTag + ": Illegal state/arg exception in Kafka consumer; dropping stream. " + x.getMessage () );
-
- }
-
- return null;
- }
-
-
-
- /**
- * getName() method returns string type value. returns 3 parameters in
- * string:- fTopic,fGroup,fId
- *
- * @Override
- */
- public String getName() {
- return fTopic + " : " + fGroup + " : " + fId;
- }
-
- /**
- * getCreateTimeMs() method returns long type value. returns fCreateTimeMs
- * variable value
- *
- * @Override
- *
- */
- public long getCreateTimeMs() {
- return fCreateTimeMs;
- }
-
- public org.apache.kafka.clients.consumer.KafkaConsumer getConsumer() {
- return kConsumer;
- }
-
- /**
- * getLastAccessMs() method returns long type value. returns fLastTouch
- * variable value
- *
- * @Override
- *
- */
- public long getLastAccessMs() {
- return fLastTouch;
- }
-
-
-
- /**
- * getOffset() method returns long type value. returns offset variable value
- *
- * @Override
- *
- */
- public long getOffset() {
- return offset;
- }
-
- /**
- * commit offsets commitOffsets() method will be called on closed of
- * KafkaConsumer.
- *
- * @Override
- *
- *
- * public void commitOffsets() { if (getState() ==
- * KafkaConsumer.State.CLOSED) { log.warn("commitOffsets() called
- * on closed KafkaConsumer " + getName()); return; }
- * fConnector.commitOffsets(); }
- */
-
- /**
- * updating fLastTouch with current time in ms
- */
- public void touch() {
- fLastTouch = System.currentTimeMillis();
- }
-
- /**
- * getLastTouch() method returns long type value. returns fLastTouch
- * variable value
- *
- */
- public long getLastTouch() {
- return fLastTouch;
- }
-
- /**
- * setting the kafkaConsumer state to closed
- */
- public synchronized boolean close() {
-
- if (getState() == KafkaConsumer.State.CLOSED) {
-
- log.warn("close() called on closed KafkaConsumer " + getName());
- return true;
- }
-
- setState(KafkaConsumer.State.CLOSED);
- // fConnector.shutdown();
- boolean retVal = kafkaConnectorshuttask();
- return retVal;
-
- }
-
- /* time out if the kafka shutdown fails for some reason */
-
- private boolean kafkaConnectorshuttask() {
- Callable<Boolean> run = new Callable<Boolean>() {
- @Override
- public Boolean call() throws Exception {
- // your code to be timed
- try {
- System.out.println("consumer closing....." + kConsumer);
- kConsumer.close();
- } catch (Exception e) {
- log.info("@@@@@@Kafka Stream shutdown erorr occurred " + getName() + " " + e);
- }
- log.info("Kafka connection closure with in 15 seconds by a Executors task");
- return true;
- }
- };
-
- RunnableFuture future = new FutureTask(run);
- ExecutorService service = Executors.newSingleThreadExecutor();
- service.execute(future);
- Boolean result = null;
- try {
- result = (Boolean) future.get(15, TimeUnit.SECONDS); // wait 1
- // second
- } catch (TimeoutException ex) {
- // timed out. Try to stop the code if possible.
- log.info("Timeout Occured - Kafka connection closure with in 15 seconds by a Executors task");
- future.cancel(true);
- } catch (Exception ex) {
- // timed out. Try to stop the code if possible.
- log.info("Timeout Occured - Kafka connection closure with in 15 seconds by a Executors task" + ex);
- future.cancel(true);
- return false;
- }
- service.shutdown();
- return true;
- }
-
- /**
- * getConsumerGroup() returns Consumer group
- *
- * @return
- */
- public String getConsumerGroup() {
- return fGroup;
- }
-
- /**
- * getConsumerId returns Consumer Id
- *
- * @return
- */
- public String getConsumerId() {
- return fId;
- }
-
- /**
- * getState returns kafkaconsumer state
- *
- * @return
- */
- private KafkaConsumer.State getState() {
- return this.state;
- }
-
- /**
- * setState() sets the kafkaConsumer state
- *
- * @param state
- */
- private void setState(KafkaConsumer.State state) {
- this.state = state;
- }
-
- // private ConsumerConnector fConnector;
- private final String fTopic;
- private final String fGroup;
- private final String fId;
- private final String fLogTag;
- // private final KafkaStream<byte[], byte[]> fStream;
- private final org.apache.kafka.clients.consumer.KafkaConsumer<String, String> kConsumer;
- private long fCreateTimeMs;
- private long fLastTouch;
- private long offset;
- private KafkaConsumer.State state;
- private static final EELFLogger log = EELFManager.getInstance().getLogger(KafkaConsumer.class);
- private final LinkedBlockingQueue<ConsumerRecord<String,String>> fPendingMsgs;
- // private static final Logger log =
- // LoggerFactory.getLogger(KafkaConsumer.class);
-
- @Override
- public void commitOffsets() {
- if (getState() == KafkaConsumer.State.CLOSED) {
- log.warn("commitOffsets() called on closed KafkaConsumer " + getName());
- return;
- }
- kConsumer.commitSync();
- // fConsumer.close();
-
- }
-
-
-
- @Override
- public void setOffset(long offsetval) {
- // TODO Auto-generated method stub
- offset = offsetval;
- }
-}