diff options
Diffstat (limited to 'src/main/java/com/att/dmf/mr/backends/kafka/KafkaConsumer.txt')
-rw-r--r-- | src/main/java/com/att/dmf/mr/backends/kafka/KafkaConsumer.txt | 386 |
1 files changed, 0 insertions, 386 deletions
diff --git a/src/main/java/com/att/dmf/mr/backends/kafka/KafkaConsumer.txt b/src/main/java/com/att/dmf/mr/backends/kafka/KafkaConsumer.txt deleted file mode 100644 index dd6259f..0000000 --- a/src/main/java/com/att/dmf/mr/backends/kafka/KafkaConsumer.txt +++ /dev/null @@ -1,386 +0,0 @@ -package com.att.dmf.mr.backends.kafka; - -import java.util.Arrays; -import java.util.Properties; -import java.util.concurrent.Callable; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.FutureTask; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.RunnableFuture; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; - -import org.apache.kafka.clients.consumer.ConsumerRecord; -import org.apache.kafka.clients.consumer.ConsumerRecords; -import org.apache.kafka.common.KafkaException; - -import com.att.dmf.mr.backends.Consumer; - -//import org.slf4j.Logger; -//import org.slf4j.LoggerFactory; - -import com.att.eelf.configuration.EELFLogger; -import com.att.eelf.configuration.EELFManager; - -/** - * A consumer instance that's created per-request. These are stateless so that - * clients can connect to this service as a proxy. - * - * @author peter - * - */ -public class KafkaConsumer implements Consumer { - private enum State { - OPENED, CLOSED - } - - /** - * KafkaConsumer() is constructor. It has following 4 parameters:- - * - * @param topic - * @param group - * @param id - * @param cc - * - */ - - public KafkaConsumer(String topic, String group, String id, Properties prop) throws Exception { - fTopic = topic; - fGroup = group; - fId = id; - // fConnector = cc; - - fCreateTimeMs = System.currentTimeMillis(); - fLastTouch = fCreateTimeMs; - fPendingMsgs = new LinkedBlockingQueue<ConsumerRecord<String,String>> (); - fLogTag = fGroup + "(" + fId + ")/" + fTopic; - offset = 0; - - state = KafkaConsumer.State.OPENED; - - // final Map<String, Integer> topicCountMap = new HashMap<String, - // Integer>(); - // topicCountMap.put(fTopic, 1); - // log.info(fLogTag +" kafka Consumer started at " - // +System.currentTimeMillis()); - // final Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = - // fConnector.createMessageStreams(topicCountMap); - // final List<KafkaStream<byte[], byte[]>> streams = - // consumerMap.get(fTopic); - - kConsumer = new org.apache.kafka.clients.consumer.KafkaConsumer<>(prop); - // System.out.println("I am in Consumer APP " + topic + "-- " + - // fConsumer); - kConsumer.subscribe(Arrays.asList(topic)); - log.info(fLogTag + " kafka stream created in " + (System.currentTimeMillis() - fCreateTimeMs)); - System.out.println("-----id " +id); - - - try { ConsumerRecords<String, String> records = - kConsumer.poll(500); System.out.println("---" + - records.count()); - - for (ConsumerRecord<String, String> record : records) { - System.out.printf("offset = %d, key = %s, value = %s", - record.offset(), record.key(), record.value()); String t = - record.value(); - - } - }catch(Exception e){ - System.out.println( e); - } - System.out.println(fLogTag + " kafka stream created in " + (System.currentTimeMillis() - fCreateTimeMs)); - kConsumer.commitSync(); - // fConsumer.close(); - - - /* - * ConsumerRecords<String, String> records = fConsumer.poll(500); - * System.out.println("---" + records.count()); - * - * for (ConsumerRecord<String, String> record : records) { - * System.out.printf("offset = %d, key = %s, value = %s", - * record.offset(), record.key(), record.value()); String t = - * record.value(); - * - * } - * - * - * fConsumer.commitSync(); fConsumer.close(); - */ - - // fStream = streams.iterator().next(); - } - - - - private Consumer.Message makeMessage ( final ConsumerRecord<String,String> msg ) - { - return new Consumer.Message() - { - @Override - public long getOffset () - { - return msg.offset (); - } - - @Override - public String getMessage () - { - return new String ( msg.value () ); - } - }; - } - - @Override - public synchronized Consumer.Message nextMessage () - { - - try - { - if ( fPendingMsgs.size () > 0 ) - { - return makeMessage ( fPendingMsgs.take () ); - } - } - catch ( InterruptedException x ) - { - log.warn ( "After size>0, pending msg take() threw InterruptedException. Ignoring. (" + x.getMessage () + ")", x ); - } - - - try - { - boolean foundMsgs = false; - System.out.println("entering into pollingWWWWWWWWWWWWWWWWW"); - final ConsumerRecords<String,String> records = kConsumer.poll ( 100 ); - System.out.println("polling doneXXXXXXXXXXXXXXXXXXXXXXXXXXX...."); - for ( ConsumerRecord<String,String> record : records ) - { - foundMsgs = true; - fPendingMsgs.offer ( record ); - } - - } - catch ( KafkaException x ) - { - log.debug ( fLogTag + ": KafkaException " + x.getMessage () ); - - } - catch ( java.lang.IllegalStateException | java.lang.IllegalArgumentException x ) - { - log.error ( fLogTag + ": Illegal state/arg exception in Kafka consumer; dropping stream. " + x.getMessage () ); - - } - - return null; - } - - - - /** - * getName() method returns string type value. returns 3 parameters in - * string:- fTopic,fGroup,fId - * - * @Override - */ - public String getName() { - return fTopic + " : " + fGroup + " : " + fId; - } - - /** - * getCreateTimeMs() method returns long type value. returns fCreateTimeMs - * variable value - * - * @Override - * - */ - public long getCreateTimeMs() { - return fCreateTimeMs; - } - - public org.apache.kafka.clients.consumer.KafkaConsumer getConsumer() { - return kConsumer; - } - - /** - * getLastAccessMs() method returns long type value. returns fLastTouch - * variable value - * - * @Override - * - */ - public long getLastAccessMs() { - return fLastTouch; - } - - - - /** - * getOffset() method returns long type value. returns offset variable value - * - * @Override - * - */ - public long getOffset() { - return offset; - } - - /** - * commit offsets commitOffsets() method will be called on closed of - * KafkaConsumer. - * - * @Override - * - * - * public void commitOffsets() { if (getState() == - * KafkaConsumer.State.CLOSED) { log.warn("commitOffsets() called - * on closed KafkaConsumer " + getName()); return; } - * fConnector.commitOffsets(); } - */ - - /** - * updating fLastTouch with current time in ms - */ - public void touch() { - fLastTouch = System.currentTimeMillis(); - } - - /** - * getLastTouch() method returns long type value. returns fLastTouch - * variable value - * - */ - public long getLastTouch() { - return fLastTouch; - } - - /** - * setting the kafkaConsumer state to closed - */ - public synchronized boolean close() { - - if (getState() == KafkaConsumer.State.CLOSED) { - - log.warn("close() called on closed KafkaConsumer " + getName()); - return true; - } - - setState(KafkaConsumer.State.CLOSED); - // fConnector.shutdown(); - boolean retVal = kafkaConnectorshuttask(); - return retVal; - - } - - /* time out if the kafka shutdown fails for some reason */ - - private boolean kafkaConnectorshuttask() { - Callable<Boolean> run = new Callable<Boolean>() { - @Override - public Boolean call() throws Exception { - // your code to be timed - try { - System.out.println("consumer closing....." + kConsumer); - kConsumer.close(); - } catch (Exception e) { - log.info("@@@@@@Kafka Stream shutdown erorr occurred " + getName() + " " + e); - } - log.info("Kafka connection closure with in 15 seconds by a Executors task"); - return true; - } - }; - - RunnableFuture future = new FutureTask(run); - ExecutorService service = Executors.newSingleThreadExecutor(); - service.execute(future); - Boolean result = null; - try { - result = (Boolean) future.get(15, TimeUnit.SECONDS); // wait 1 - // second - } catch (TimeoutException ex) { - // timed out. Try to stop the code if possible. - log.info("Timeout Occured - Kafka connection closure with in 15 seconds by a Executors task"); - future.cancel(true); - } catch (Exception ex) { - // timed out. Try to stop the code if possible. - log.info("Timeout Occured - Kafka connection closure with in 15 seconds by a Executors task" + ex); - future.cancel(true); - return false; - } - service.shutdown(); - return true; - } - - /** - * getConsumerGroup() returns Consumer group - * - * @return - */ - public String getConsumerGroup() { - return fGroup; - } - - /** - * getConsumerId returns Consumer Id - * - * @return - */ - public String getConsumerId() { - return fId; - } - - /** - * getState returns kafkaconsumer state - * - * @return - */ - private KafkaConsumer.State getState() { - return this.state; - } - - /** - * setState() sets the kafkaConsumer state - * - * @param state - */ - private void setState(KafkaConsumer.State state) { - this.state = state; - } - - // private ConsumerConnector fConnector; - private final String fTopic; - private final String fGroup; - private final String fId; - private final String fLogTag; - // private final KafkaStream<byte[], byte[]> fStream; - private final org.apache.kafka.clients.consumer.KafkaConsumer<String, String> kConsumer; - private long fCreateTimeMs; - private long fLastTouch; - private long offset; - private KafkaConsumer.State state; - private static final EELFLogger log = EELFManager.getInstance().getLogger(KafkaConsumer.class); - private final LinkedBlockingQueue<ConsumerRecord<String,String>> fPendingMsgs; - // private static final Logger log = - // LoggerFactory.getLogger(KafkaConsumer.class); - - @Override - public void commitOffsets() { - if (getState() == KafkaConsumer.State.CLOSED) { - log.warn("commitOffsets() called on closed KafkaConsumer " + getName()); - return; - } - kConsumer.commitSync(); - // fConsumer.close(); - - } - - - - @Override - public void setOffset(long offsetval) { - // TODO Auto-generated method stub - offset = offsetval; - } -} |