summaryrefslogtreecommitdiffstats
path: root/src/main/java/org/onap/dmaap/dmf/mr/backends/kafka/KafkaConsumer.txt
diff options
context:
space:
mode:
Diffstat (limited to 'src/main/java/org/onap/dmaap/dmf/mr/backends/kafka/KafkaConsumer.txt')
-rw-r--r--src/main/java/org/onap/dmaap/dmf/mr/backends/kafka/KafkaConsumer.txt386
1 files changed, 386 insertions, 0 deletions
diff --git a/src/main/java/org/onap/dmaap/dmf/mr/backends/kafka/KafkaConsumer.txt b/src/main/java/org/onap/dmaap/dmf/mr/backends/kafka/KafkaConsumer.txt
new file mode 100644
index 0000000..dd6259f
--- /dev/null
+++ b/src/main/java/org/onap/dmaap/dmf/mr/backends/kafka/KafkaConsumer.txt
@@ -0,0 +1,386 @@
+package com.att.dmf.mr.backends.kafka;
+
+import java.util.Arrays;
+import java.util.Properties;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.FutureTask;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.RunnableFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.common.KafkaException;
+
+import com.att.dmf.mr.backends.Consumer;
+
+//import org.slf4j.Logger;
+//import org.slf4j.LoggerFactory;
+
+import com.att.eelf.configuration.EELFLogger;
+import com.att.eelf.configuration.EELFManager;
+
+/**
+ * A consumer instance that's created per-request. These are stateless so that
+ * clients can connect to this service as a proxy.
+ *
+ * @author peter
+ *
+ */
+public class KafkaConsumer implements Consumer {
+ private enum State {
+ OPENED, CLOSED
+ }
+
+ /**
+ * KafkaConsumer() is constructor. It has following 4 parameters:-
+ *
+ * @param topic
+ * @param group
+ * @param id
+ * @param cc
+ *
+ */
+
+ public KafkaConsumer(String topic, String group, String id, Properties prop) throws Exception {
+ fTopic = topic;
+ fGroup = group;
+ fId = id;
+ // fConnector = cc;
+
+ fCreateTimeMs = System.currentTimeMillis();
+ fLastTouch = fCreateTimeMs;
+ fPendingMsgs = new LinkedBlockingQueue<ConsumerRecord<String,String>> ();
+ fLogTag = fGroup + "(" + fId + ")/" + fTopic;
+ offset = 0;
+
+ state = KafkaConsumer.State.OPENED;
+
+ // final Map<String, Integer> topicCountMap = new HashMap<String,
+ // Integer>();
+ // topicCountMap.put(fTopic, 1);
+ // log.info(fLogTag +" kafka Consumer started at "
+ // +System.currentTimeMillis());
+ // final Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap =
+ // fConnector.createMessageStreams(topicCountMap);
+ // final List<KafkaStream<byte[], byte[]>> streams =
+ // consumerMap.get(fTopic);
+
+ kConsumer = new org.apache.kafka.clients.consumer.KafkaConsumer<>(prop);
+ // System.out.println("I am in Consumer APP " + topic + "-- " +
+ // fConsumer);
+ kConsumer.subscribe(Arrays.asList(topic));
+ log.info(fLogTag + " kafka stream created in " + (System.currentTimeMillis() - fCreateTimeMs));
+ System.out.println("-----id " +id);
+
+
+ try { ConsumerRecords<String, String> records =
+ kConsumer.poll(500); System.out.println("---" +
+ records.count());
+
+ for (ConsumerRecord<String, String> record : records) {
+ System.out.printf("offset = %d, key = %s, value = %s",
+ record.offset(), record.key(), record.value()); String t =
+ record.value();
+
+ }
+ }catch(Exception e){
+ System.out.println( e);
+ }
+ System.out.println(fLogTag + " kafka stream created in " + (System.currentTimeMillis() - fCreateTimeMs));
+ kConsumer.commitSync();
+ // fConsumer.close();
+
+
+ /*
+ * ConsumerRecords<String, String> records = fConsumer.poll(500);
+ * System.out.println("---" + records.count());
+ *
+ * for (ConsumerRecord<String, String> record : records) {
+ * System.out.printf("offset = %d, key = %s, value = %s",
+ * record.offset(), record.key(), record.value()); String t =
+ * record.value();
+ *
+ * }
+ *
+ *
+ * fConsumer.commitSync(); fConsumer.close();
+ */
+
+ // fStream = streams.iterator().next();
+ }
+
+
+
+ private Consumer.Message makeMessage ( final ConsumerRecord<String,String> msg )
+ {
+ return new Consumer.Message()
+ {
+ @Override
+ public long getOffset ()
+ {
+ return msg.offset ();
+ }
+
+ @Override
+ public String getMessage ()
+ {
+ return new String ( msg.value () );
+ }
+ };
+ }
+
+ @Override
+ public synchronized Consumer.Message nextMessage ()
+ {
+
+ try
+ {
+ if ( fPendingMsgs.size () > 0 )
+ {
+ return makeMessage ( fPendingMsgs.take () );
+ }
+ }
+ catch ( InterruptedException x )
+ {
+ log.warn ( "After size>0, pending msg take() threw InterruptedException. Ignoring. (" + x.getMessage () + ")", x );
+ }
+
+
+ try
+ {
+ boolean foundMsgs = false;
+ System.out.println("entering into pollingWWWWWWWWWWWWWWWWW");
+ final ConsumerRecords<String,String> records = kConsumer.poll ( 100 );
+ System.out.println("polling doneXXXXXXXXXXXXXXXXXXXXXXXXXXX....");
+ for ( ConsumerRecord<String,String> record : records )
+ {
+ foundMsgs = true;
+ fPendingMsgs.offer ( record );
+ }
+
+ }
+ catch ( KafkaException x )
+ {
+ log.debug ( fLogTag + ": KafkaException " + x.getMessage () );
+
+ }
+ catch ( java.lang.IllegalStateException | java.lang.IllegalArgumentException x )
+ {
+ log.error ( fLogTag + ": Illegal state/arg exception in Kafka consumer; dropping stream. " + x.getMessage () );
+
+ }
+
+ return null;
+ }
+
+
+
+ /**
+ * getName() method returns string type value. returns 3 parameters in
+ * string:- fTopic,fGroup,fId
+ *
+ * @Override
+ */
+ public String getName() {
+ return fTopic + " : " + fGroup + " : " + fId;
+ }
+
+ /**
+ * getCreateTimeMs() method returns long type value. returns fCreateTimeMs
+ * variable value
+ *
+ * @Override
+ *
+ */
+ public long getCreateTimeMs() {
+ return fCreateTimeMs;
+ }
+
+ public org.apache.kafka.clients.consumer.KafkaConsumer getConsumer() {
+ return kConsumer;
+ }
+
+ /**
+ * getLastAccessMs() method returns long type value. returns fLastTouch
+ * variable value
+ *
+ * @Override
+ *
+ */
+ public long getLastAccessMs() {
+ return fLastTouch;
+ }
+
+
+
+ /**
+ * getOffset() method returns long type value. returns offset variable value
+ *
+ * @Override
+ *
+ */
+ public long getOffset() {
+ return offset;
+ }
+
+ /**
+ * commit offsets commitOffsets() method will be called on closed of
+ * KafkaConsumer.
+ *
+ * @Override
+ *
+ *
+ * public void commitOffsets() { if (getState() ==
+ * KafkaConsumer.State.CLOSED) { log.warn("commitOffsets() called
+ * on closed KafkaConsumer " + getName()); return; }
+ * fConnector.commitOffsets(); }
+ */
+
+ /**
+ * updating fLastTouch with current time in ms
+ */
+ public void touch() {
+ fLastTouch = System.currentTimeMillis();
+ }
+
+ /**
+ * getLastTouch() method returns long type value. returns fLastTouch
+ * variable value
+ *
+ */
+ public long getLastTouch() {
+ return fLastTouch;
+ }
+
+ /**
+ * setting the kafkaConsumer state to closed
+ */
+ public synchronized boolean close() {
+
+ if (getState() == KafkaConsumer.State.CLOSED) {
+
+ log.warn("close() called on closed KafkaConsumer " + getName());
+ return true;
+ }
+
+ setState(KafkaConsumer.State.CLOSED);
+ // fConnector.shutdown();
+ boolean retVal = kafkaConnectorshuttask();
+ return retVal;
+
+ }
+
+ /* time out if the kafka shutdown fails for some reason */
+
+ private boolean kafkaConnectorshuttask() {
+ Callable<Boolean> run = new Callable<Boolean>() {
+ @Override
+ public Boolean call() throws Exception {
+ // your code to be timed
+ try {
+ System.out.println("consumer closing....." + kConsumer);
+ kConsumer.close();
+ } catch (Exception e) {
+ log.info("@@@@@@Kafka Stream shutdown erorr occurred " + getName() + " " + e);
+ }
+ log.info("Kafka connection closure with in 15 seconds by a Executors task");
+ return true;
+ }
+ };
+
+ RunnableFuture future = new FutureTask(run);
+ ExecutorService service = Executors.newSingleThreadExecutor();
+ service.execute(future);
+ Boolean result = null;
+ try {
+ result = (Boolean) future.get(15, TimeUnit.SECONDS); // wait 1
+ // second
+ } catch (TimeoutException ex) {
+ // timed out. Try to stop the code if possible.
+ log.info("Timeout Occured - Kafka connection closure with in 15 seconds by a Executors task");
+ future.cancel(true);
+ } catch (Exception ex) {
+ // timed out. Try to stop the code if possible.
+ log.info("Timeout Occured - Kafka connection closure with in 15 seconds by a Executors task" + ex);
+ future.cancel(true);
+ return false;
+ }
+ service.shutdown();
+ return true;
+ }
+
+ /**
+ * getConsumerGroup() returns Consumer group
+ *
+ * @return
+ */
+ public String getConsumerGroup() {
+ return fGroup;
+ }
+
+ /**
+ * getConsumerId returns Consumer Id
+ *
+ * @return
+ */
+ public String getConsumerId() {
+ return fId;
+ }
+
+ /**
+ * getState returns kafkaconsumer state
+ *
+ * @return
+ */
+ private KafkaConsumer.State getState() {
+ return this.state;
+ }
+
+ /**
+ * setState() sets the kafkaConsumer state
+ *
+ * @param state
+ */
+ private void setState(KafkaConsumer.State state) {
+ this.state = state;
+ }
+
+ // private ConsumerConnector fConnector;
+ private final String fTopic;
+ private final String fGroup;
+ private final String fId;
+ private final String fLogTag;
+ // private final KafkaStream<byte[], byte[]> fStream;
+ private final org.apache.kafka.clients.consumer.KafkaConsumer<String, String> kConsumer;
+ private long fCreateTimeMs;
+ private long fLastTouch;
+ private long offset;
+ private KafkaConsumer.State state;
+ private static final EELFLogger log = EELFManager.getInstance().getLogger(KafkaConsumer.class);
+ private final LinkedBlockingQueue<ConsumerRecord<String,String>> fPendingMsgs;
+ // private static final Logger log =
+ // LoggerFactory.getLogger(KafkaConsumer.class);
+
+ @Override
+ public void commitOffsets() {
+ if (getState() == KafkaConsumer.State.CLOSED) {
+ log.warn("commitOffsets() called on closed KafkaConsumer " + getName());
+ return;
+ }
+ kConsumer.commitSync();
+ // fConsumer.close();
+
+ }
+
+
+
+ @Override
+ public void setOffset(long offsetval) {
+ // TODO Auto-generated method stub
+ offset = offsetval;
+ }
+}